# 使用讯飞 语音转写 接口将文件夹内所有内容转为文本 # 使用轮训apikey,防止达到qps出现错误 import os import json import requests import hashlib import hmac import base64 import time def is_audio_file(file_path): audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma'] return any(file_path.lower().endswith(ext) for ext in audio_extensions) def generate_signa(appid, secret_key): ts = str(int(time.time())) base_string = appid + ts md5_base_string = hashlib.md5(base_string.encode('utf-8')).hexdigest() hmac_sha1 = hmac.new(secret_key.encode('utf-8'), md5_base_string.encode('utf-8'), hashlib.sha1).digest() signa = base64.b64encode(hmac_sha1).decode('utf-8') return signa, ts def transcribe_audio(file_path, appid, secret_key): upload_url = "https://raasr.xfyun.cn/v2/api/upload" get_result_url = "https://raasr.xfyun.cn/v2/api/getResult" def upload_file(file_size): signa, ts = generate_signa(appid, secret_key) file_name = os.path.basename(file_path) duration = 200 params = { 'duration': duration, 'signa': signa, 'fileName': file_name, 'fileSize': file_size, 'appId': appid, 'ts': ts } headers = { 'Content-Type': 'application/json; charset=UTF-8', 'Chunked': 'false' } with open(file_path, 'rb') as audio_file: files = {'file': audio_file} response = requests.post(upload_url, params=params, headers=headers, files=files) return response file_size = os.path.getsize(file_path) response = upload_file(file_size) if response.status_code == 200: response_data = response.json() if response_data.get('code') == '100003': correct_file_size = int(response_data['descInfo'].split(':')[-1]) response = upload_file(correct_file_size) response_data = response.json() if 'orderId' in response_data.get('content', {}): task_id = response_data['content']['orderId'] else: print(f"Error uploading {file_path}: {response_data}") return None # 解析结果 signa, ts = generate_signa(appid, secret_key) while True: result_response = requests.get(get_result_url, params={'appId': appid, 'signa': signa, 'ts': ts, 'orderId': task_id}) if result_response.status_code == 200: result_data = result_response.json() if result_data.get('code') == '000000': order_result_str = result_data['content']['orderResult'] if order_result_str and order_result_str!="": order_result_dict = json.loads(order_result_str) results = '' for lattice in order_result_dict['lattice']: json_1best = json.loads(lattice['json_1best']) # print(json_1best) for rt in json_1best['st']['rt']: for ws in rt['ws']: for cw in ws['cw']: # print(cw['w']) results += cw['w'] return results else: time.sleep(5) elif result_data.get('code') == '000001': time.sleep(10) else: print(f"Error getting result for {file_path}: {result_data}") return None else: print(f"Error getting result for {file_path}: {result_response.text}") return None else: print(f"Error uploading {file_path}: {response.text}") return None def main(): folder_path = input("请输入文件夹路径: ") keyList = [ { 'appid': 'xxxxx', 'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx' }, { 'appid': 'xxxxx', 'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx' }, ] if not os.path.isdir(folder_path): print("无效的文件夹路径") return times=0 file_name_results = {} full_path_results = {} for root, _, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) if is_audio_file(file_path): transcription = transcribe_audio(file_path, keyList[times%len(keyList)]['appid'], keyList[times%len(keyList)]['secret_key']) times+=1 if transcription: file_name_results[file] = transcription full_path_results[file_path] = transcription print("转写结果:") for file, transcription in file_name_results.items(): print(f"{file}: {transcription}") with open('file_name_results.json', 'w', encoding='utf-8') as f: json.dump(file_name_results, f, ensure_ascii=False, indent=4) with open('full_path_results.json', 'w', encoding='utf-8') as f: json.dump(full_path_results, f, ensure_ascii=False, indent=4) print("转写结果已保存到 file_name_results.json 和 full_path_results.json") if __name__ == "__main__": main()