# -*- coding:utf-8 -*- # 使用讯飞 语音听写(流式版) 将文件夹内音频文件转换为文本,存入json中 # # author: Randall # # cffi==1.12.3 # gevent==1.4.0 # greenlet==0.4.15 # pycparser==2.19 # six==1.12.0 # websocket==0.2.1 # websocket-client==0.56.0 # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # import websocket import datetime import hashlib import base64 import hmac import json from urllib.parse import urlencode import ssl from wsgiref.handlers import format_date_time from datetime import datetime from time import mktime import _thread as thread import os import time STATUS_FIRST_FRAME = 0 # 第一帧的标识 STATUS_CONTINUE_FRAME = 1 # 中间帧标识 STATUS_LAST_FRAME = 2 # 最后一帧的标识 wsParam = None results = '' running = False class Ws_Param(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, AudioFile): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.AudioFile = AudioFile # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":10000} # 生成url def create_url(self): url = 'wss://ws-api.xfyun.cn/v2/iat' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) # print("date: ",date) # print("v: ",v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return url def is_audio_file(file_path): audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma'] return any(file_path.lower().endswith(ext) for ext in audio_extensions) # 收到websocket消息的处理 def on_message(ws, message): try: code = json.loads(message)["code"] sid = json.loads(message)["sid"] if code != 0: errMsg = json.loads(message)["message"] print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: data = json.loads(message)["data"]["result"]["ws"] # print(json.loads(message)) result = "" for i in data: for w in i["cw"]: result += w["w"] print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False))) global results results += result except Exception as e: print("receive msg,but parse exception:", e) # 收到websocket错误的处理 def on_error(ws, error): print("### websocket error:", error) global running global results running = False results += '*#本次转写失败#*,error:' + str(error) # 收到websocket关闭的处理 def on_close(ws,a,b): print("### websocket closed ###") global running running = False # 收到websocket连接建立的处理 def on_open(ws): def run(*args): frameSize = 8000 # 每一帧的音频大小 intervel = 0.04 # 发送音频间隔(单位:s) status = STATUS_FIRST_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 with open(wsParam.AudioFile, "rb") as fp: while True: buf = fp.read(frameSize) # 文件结束 if not buf: status = STATUS_LAST_FRAME # 第一帧处理 # 发送第一帧音频,带business 参数 # appid 必须带上,只需第一帧发送 if status == STATUS_FIRST_FRAME: d = {"common": wsParam.CommonArgs, "business": wsParam.BusinessArgs, "data": {"status": 0, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} d = json.dumps(d) ws.send(d) status = STATUS_CONTINUE_FRAME # 中间帧处理 elif status == STATUS_CONTINUE_FRAME: d = {"data": {"status": 1, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) # 最后一帧处理 elif status == STATUS_LAST_FRAME: d = {"data": {"status": 2, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) time.sleep(1) break # 模拟音频采样间隔 time.sleep(intervel) ws.close() thread.start_new_thread(run, ()) def add_to_json_file(file_name, key, value): if not os.path.exists(file_name): with open(file_name, 'w') as f: json.dump({}, f) with open(file_name, 'r') as f: data = json.load(f) data[key] = value with open(file_name, 'w') as f: json.dump(data, f, indent=4) def judge_weather_in_json(file_name, key): if not os.path.exists(file_name): return False with open(file_name, 'r') as f: data = json.load(f) return key in data def main(): folder_path = input("请输入文件夹路径: ") keyList = [ { 'appid': 'xxxxxxxx', 'api_secret':'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'api_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' }, { 'appid': 'xxxxxxxx', 'api_secret':'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'api_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' }, ] if not os.path.isdir(folder_path): print("无效的文件夹路径") return file_name_results = {} full_path_results = {} times = 0 for root, _, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) if is_audio_file(file_path): if judge_weather_in_json('full_path_results.json', file_path): continue global wsParam global running running = True print("### websocket started ###") appid = keyList[times % len(keyList)]['appid'] api_secret = keyList[times % len(keyList)]['api_secret'] api_key = keyList[times % len(keyList)]['api_key'] times += 1 wsParam = Ws_Param(APPID=appid, APISecret=api_secret, APIKey=api_key, AudioFile=file_path) websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) while running: time.sleep(1) global results file_name_results[file] = results full_path_results[file_path] = results add_to_json_file('file_name_results.json', file, results) add_to_json_file('full_path_results.json', file_path, results) results = '' print("转写结果:") for file, transcription in file_name_results.items(): print(f"{file}: {transcription}") print("转写结果已保存到 file_name_results.json 和 full_path_results.json") if __name__ == "__main__": main()