diff --git a/iat.py b/iat.py new file mode 100644 index 0000000..bfd8d81 --- /dev/null +++ b/iat.py @@ -0,0 +1,266 @@ +# -*- coding:utf-8 -*- +# 使用讯飞 语音听写(流式版) 将文件夹内音频文件转换为文本,存入json中 +# +# author: Randall +# +# cffi==1.12.3 +# gevent==1.4.0 +# greenlet==0.4.15 +# pycparser==2.19 +# six==1.12.0 +# websocket==0.2.1 +# websocket-client==0.56.0 +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +import websocket +import datetime +import hashlib +import base64 +import hmac +import json +from urllib.parse import urlencode +import ssl +from wsgiref.handlers import format_date_time +from datetime import datetime +from time import mktime +import _thread as thread +import os +import time + +STATUS_FIRST_FRAME = 0 # 第一帧的标识 +STATUS_CONTINUE_FRAME = 1 # 中间帧标识 +STATUS_LAST_FRAME = 2 # 最后一帧的标识 + +wsParam = None +results = '' +running = False + + +class Ws_Param(object): + # 初始化 + def __init__(self, APPID, APIKey, APISecret, AudioFile): + self.APPID = APPID + self.APIKey = APIKey + self.APISecret = APISecret + self.AudioFile = AudioFile + + # 公共参数(common) + self.CommonArgs = {"app_id": self.APPID} + # 业务参数(business),更多个性化参数可在官网查看 + self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":10000} + + # 生成url + def create_url(self): + url = 'wss://ws-api.xfyun.cn/v2/iat' + # 生成RFC1123格式的时间戳 + now = datetime.now() + date = format_date_time(mktime(now.timetuple())) + + # 拼接字符串 + signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" + signature_origin += "date: " + date + "\n" + signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" + # 进行hmac-sha256进行加密 + signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), + digestmod=hashlib.sha256).digest() + signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') + + authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( + self.APIKey, "hmac-sha256", "host date request-line", signature_sha) + authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') + # 将请求的鉴权参数组合为字典 + v = { + "authorization": authorization, + "date": date, + "host": "ws-api.xfyun.cn" + } + # 拼接鉴权参数,生成url + url = url + '?' + urlencode(v) + # print("date: ",date) + # print("v: ",v) + # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 + # print('websocket url :', url) + return url + + +def is_audio_file(file_path): + audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma'] + return any(file_path.lower().endswith(ext) for ext in audio_extensions) + + +# 收到websocket消息的处理 +def on_message(ws, message): + try: + code = json.loads(message)["code"] + sid = json.loads(message)["sid"] + if code != 0: + errMsg = json.loads(message)["message"] + print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) + + else: + data = json.loads(message)["data"]["result"]["ws"] + # print(json.loads(message)) + result = "" + for i in data: + for w in i["cw"]: + result += w["w"] + print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False))) + global results + results += result + except Exception as e: + print("receive msg,but parse exception:", e) + + + +# 收到websocket错误的处理 +def on_error(ws, error): + print("### websocket error:", error) + global running + global results + running = False + results += '*#本次转写失败#*,error:' + str(error) + + + +# 收到websocket关闭的处理 +def on_close(ws,a,b): + print("### websocket closed ###") + global running + running = False + + +# 收到websocket连接建立的处理 +def on_open(ws): + def run(*args): + frameSize = 8000 # 每一帧的音频大小 + intervel = 0.04 # 发送音频间隔(单位:s) + status = STATUS_FIRST_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 + + with open(wsParam.AudioFile, "rb") as fp: + while True: + buf = fp.read(frameSize) + # 文件结束 + if not buf: + status = STATUS_LAST_FRAME + # 第一帧处理 + # 发送第一帧音频,带business 参数 + # appid 必须带上,只需第一帧发送 + if status == STATUS_FIRST_FRAME: + + d = {"common": wsParam.CommonArgs, + "business": wsParam.BusinessArgs, + "data": {"status": 0, "format": "audio/L16;rate=16000", + "audio": str(base64.b64encode(buf), 'utf-8'), + "encoding": "raw"}} + d = json.dumps(d) + ws.send(d) + status = STATUS_CONTINUE_FRAME + # 中间帧处理 + elif status == STATUS_CONTINUE_FRAME: + d = {"data": {"status": 1, "format": "audio/L16;rate=16000", + "audio": str(base64.b64encode(buf), 'utf-8'), + "encoding": "raw"}} + ws.send(json.dumps(d)) + # 最后一帧处理 + elif status == STATUS_LAST_FRAME: + d = {"data": {"status": 2, "format": "audio/L16;rate=16000", + "audio": str(base64.b64encode(buf), 'utf-8'), + "encoding": "raw"}} + ws.send(json.dumps(d)) + time.sleep(1) + break + # 模拟音频采样间隔 + time.sleep(intervel) + ws.close() + + thread.start_new_thread(run, ()) + +def add_to_json_file(file_name, key, value): + if not os.path.exists(file_name): + with open(file_name, 'w') as f: + json.dump({}, f) + + with open(file_name, 'r') as f: + data = json.load(f) + + data[key] = value + + with open(file_name, 'w') as f: + json.dump(data, f, indent=4) + +def judge_weather_in_json(file_name, key): + if not os.path.exists(file_name): + return False + + with open(file_name, 'r') as f: + data = json.load(f) + + return key in data + + +def main(): + folder_path = input("请输入文件夹路径: ") + keyList = [ + { + 'appid': 'xxxxxxxx', + 'api_secret':'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', + 'api_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' + }, + { + 'appid': 'xxxxxxxx', + 'api_secret':'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', + 'api_key': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' + }, + ] + + if not os.path.isdir(folder_path): + print("无效的文件夹路径") + return + + file_name_results = {} + full_path_results = {} + + times = 0 + + for root, _, files in os.walk(folder_path): + for file in files: + file_path = os.path.join(root, file) + if is_audio_file(file_path): + if judge_weather_in_json('full_path_results.json', file_path): + continue + global wsParam + global running + running = True + print("### websocket started ###") + appid = keyList[times % len(keyList)]['appid'] + api_secret = keyList[times % len(keyList)]['api_secret'] + api_key = keyList[times % len(keyList)]['api_key'] + times += 1 + wsParam = Ws_Param(APPID=appid, APISecret=api_secret, + APIKey=api_key, + AudioFile=file_path) + websocket.enableTrace(False) + wsUrl = wsParam.create_url() + ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close) + ws.on_open = on_open + ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) + + while running: + time.sleep(1) + + global results + file_name_results[file] = results + full_path_results[file_path] = results + add_to_json_file('file_name_results.json', file, results) + add_to_json_file('full_path_results.json', file_path, results) + results = '' + + + print("转写结果:") + for file, transcription in file_name_results.items(): + print(f"{file}: {transcription}") + + print("转写结果已保存到 file_name_results.json 和 full_path_results.json") + + +if __name__ == "__main__": + main() \ No newline at end of file