From 218b0812d80913d7a2c17007f9ac0473ec8267bc Mon Sep 17 00:00:00 2001 From: Randall Date: Wed, 20 Nov 2024 00:32:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20main.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/main.py b/main.py index e69de29..4bde45b 100644 --- a/main.py +++ b/main.py @@ -0,0 +1,140 @@ +import os +import json +import requests +import hashlib +import hmac +import base64 +import time + +def is_audio_file(file_path): + audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma'] + return any(file_path.lower().endswith(ext) for ext in audio_extensions) + +def generate_signa(appid, secret_key): + ts = str(int(time.time())) + base_string = appid + ts + md5_base_string = hashlib.md5(base_string.encode('utf-8')).hexdigest() + hmac_sha1 = hmac.new(secret_key.encode('utf-8'), md5_base_string.encode('utf-8'), hashlib.sha1).digest() + signa = base64.b64encode(hmac_sha1).decode('utf-8') + return signa, ts + +def transcribe_audio(file_path, appid, secret_key): + upload_url = "https://raasr.xfyun.cn/v2/api/upload" + get_result_url = "https://raasr.xfyun.cn/v2/api/getResult" + + def upload_file(file_size): + signa, ts = generate_signa(appid, secret_key) + file_name = os.path.basename(file_path) + duration = 200 + + params = { + 'duration': duration, + 'signa': signa, + 'fileName': file_name, + 'fileSize': file_size, + 'appId': appid, + 'ts': ts + } + + headers = { + 'Content-Type': 'application/json; charset=UTF-8', + 'Chunked': 'false' + } + + with open(file_path, 'rb') as audio_file: + files = {'file': audio_file} + response = requests.post(upload_url, params=params, headers=headers, files=files) + return response + + file_size = os.path.getsize(file_path) + response = upload_file(file_size) + + if response.status_code == 200: + response_data = response.json() + if response_data.get('code') == '100003': + correct_file_size = int(response_data['descInfo'].split(':')[-1]) + response = upload_file(correct_file_size) + response_data = response.json() + + if 'orderId' in response_data.get('content', {}): + task_id = response_data['content']['orderId'] + else: + print(f"Error uploading {file_path}: {response_data}") + return None + + # 解析结果 + signa, ts = generate_signa(appid, secret_key) + while True: + result_response = requests.get(get_result_url, params={'appId': appid, 'signa': signa, 'ts': ts, 'orderId': task_id}) + if result_response.status_code == 200: + result_data = result_response.json() + if result_data.get('code') == '000000': + order_result_str = result_data['content']['orderResult'] + if order_result_str and order_result_str!="": + order_result_dict = json.loads(order_result_str) + results = '' + for lattice in order_result_dict['lattice']: + json_1best = json.loads(lattice['json_1best']) + # print(json_1best) + for rt in json_1best['st']['rt']: + for ws in rt['ws']: + for cw in ws['cw']: + # print(cw['w']) + results += cw['w'] + return results + else: + time.sleep(5) + elif result_data.get('code') == '000001': + time.sleep(10) + else: + print(f"Error getting result for {file_path}: {result_data}") + return None + else: + print(f"Error getting result for {file_path}: {result_response.text}") + return None + else: + print(f"Error uploading {file_path}: {response.text}") + return None + +def main(): + folder_path = input("请输入文件夹路径: ") + keyList = [ + { + 'appid': '345xxxxxxx344', + 'secret_key':'xxxxxxxxxx8y5dshu93xxxxxxxxx' + }, + ] + + if not os.path.isdir(folder_path): + print("无效的文件夹路径") + return + + times=0 + + file_name_results = {} + full_path_results = {} + + for root, _, files in os.walk(folder_path): + for file in files: + file_path = os.path.join(root, file) + if is_audio_file(file_path): + transcription = transcribe_audio(file_path, keyList[times%len(keyList)]['appid'], keyList[times%len(keyList)]['secret_key']) + times+=1 + if transcription: + file_name_results[file] = transcription + full_path_results[file_path] = transcription + + print("转写结果:") + for file, transcription in file_name_results.items(): + print(f"{file}: {transcription}") + + with open('file_name_results.json', 'w', encoding='utf-8') as f: + json.dump(file_name_results, f, ensure_ascii=False, indent=4) + + with open('full_path_results.json', 'w', encoding='utf-8') as f: + json.dump(full_path_results, f, ensure_ascii=False, indent=4) + + print("转写结果已保存到 file_name_results.json 和 full_path_results.json") + +if __name__ == "__main__": + main() \ No newline at end of file