更新 main.py
This commit is contained in:
parent
2c18f65094
commit
218b0812d8
140
main.py
140
main.py
@ -0,0 +1,140 @@
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
import hashlib
|
||||
import hmac
|
||||
import base64
|
||||
import time
|
||||
|
||||
def is_audio_file(file_path):
|
||||
audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma']
|
||||
return any(file_path.lower().endswith(ext) for ext in audio_extensions)
|
||||
|
||||
def generate_signa(appid, secret_key):
|
||||
ts = str(int(time.time()))
|
||||
base_string = appid + ts
|
||||
md5_base_string = hashlib.md5(base_string.encode('utf-8')).hexdigest()
|
||||
hmac_sha1 = hmac.new(secret_key.encode('utf-8'), md5_base_string.encode('utf-8'), hashlib.sha1).digest()
|
||||
signa = base64.b64encode(hmac_sha1).decode('utf-8')
|
||||
return signa, ts
|
||||
|
||||
def transcribe_audio(file_path, appid, secret_key):
|
||||
upload_url = "https://raasr.xfyun.cn/v2/api/upload"
|
||||
get_result_url = "https://raasr.xfyun.cn/v2/api/getResult"
|
||||
|
||||
def upload_file(file_size):
|
||||
signa, ts = generate_signa(appid, secret_key)
|
||||
file_name = os.path.basename(file_path)
|
||||
duration = 200
|
||||
|
||||
params = {
|
||||
'duration': duration,
|
||||
'signa': signa,
|
||||
'fileName': file_name,
|
||||
'fileSize': file_size,
|
||||
'appId': appid,
|
||||
'ts': ts
|
||||
}
|
||||
|
||||
headers = {
|
||||
'Content-Type': 'application/json; charset=UTF-8',
|
||||
'Chunked': 'false'
|
||||
}
|
||||
|
||||
with open(file_path, 'rb') as audio_file:
|
||||
files = {'file': audio_file}
|
||||
response = requests.post(upload_url, params=params, headers=headers, files=files)
|
||||
return response
|
||||
|
||||
file_size = os.path.getsize(file_path)
|
||||
response = upload_file(file_size)
|
||||
|
||||
if response.status_code == 200:
|
||||
response_data = response.json()
|
||||
if response_data.get('code') == '100003':
|
||||
correct_file_size = int(response_data['descInfo'].split(':')[-1])
|
||||
response = upload_file(correct_file_size)
|
||||
response_data = response.json()
|
||||
|
||||
if 'orderId' in response_data.get('content', {}):
|
||||
task_id = response_data['content']['orderId']
|
||||
else:
|
||||
print(f"Error uploading {file_path}: {response_data}")
|
||||
return None
|
||||
|
||||
# 解析结果
|
||||
signa, ts = generate_signa(appid, secret_key)
|
||||
while True:
|
||||
result_response = requests.get(get_result_url, params={'appId': appid, 'signa': signa, 'ts': ts, 'orderId': task_id})
|
||||
if result_response.status_code == 200:
|
||||
result_data = result_response.json()
|
||||
if result_data.get('code') == '000000':
|
||||
order_result_str = result_data['content']['orderResult']
|
||||
if order_result_str and order_result_str!="":
|
||||
order_result_dict = json.loads(order_result_str)
|
||||
results = ''
|
||||
for lattice in order_result_dict['lattice']:
|
||||
json_1best = json.loads(lattice['json_1best'])
|
||||
# print(json_1best)
|
||||
for rt in json_1best['st']['rt']:
|
||||
for ws in rt['ws']:
|
||||
for cw in ws['cw']:
|
||||
# print(cw['w'])
|
||||
results += cw['w']
|
||||
return results
|
||||
else:
|
||||
time.sleep(5)
|
||||
elif result_data.get('code') == '000001':
|
||||
time.sleep(10)
|
||||
else:
|
||||
print(f"Error getting result for {file_path}: {result_data}")
|
||||
return None
|
||||
else:
|
||||
print(f"Error getting result for {file_path}: {result_response.text}")
|
||||
return None
|
||||
else:
|
||||
print(f"Error uploading {file_path}: {response.text}")
|
||||
return None
|
||||
|
||||
def main():
|
||||
folder_path = input("请输入文件夹路径: ")
|
||||
keyList = [
|
||||
{
|
||||
'appid': '345xxxxxxx344',
|
||||
'secret_key':'xxxxxxxxxx8y5dshu93xxxxxxxxx'
|
||||
},
|
||||
]
|
||||
|
||||
if not os.path.isdir(folder_path):
|
||||
print("无效的文件夹路径")
|
||||
return
|
||||
|
||||
times=0
|
||||
|
||||
file_name_results = {}
|
||||
full_path_results = {}
|
||||
|
||||
for root, _, files in os.walk(folder_path):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
if is_audio_file(file_path):
|
||||
transcription = transcribe_audio(file_path, keyList[times%len(keyList)]['appid'], keyList[times%len(keyList)]['secret_key'])
|
||||
times+=1
|
||||
if transcription:
|
||||
file_name_results[file] = transcription
|
||||
full_path_results[file_path] = transcription
|
||||
|
||||
print("转写结果:")
|
||||
for file, transcription in file_name_results.items():
|
||||
print(f"{file}: {transcription}")
|
||||
|
||||
with open('file_name_results.json', 'w', encoding='utf-8') as f:
|
||||
json.dump(file_name_results, f, ensure_ascii=False, indent=4)
|
||||
|
||||
with open('full_path_results.json', 'w', encoding='utf-8') as f:
|
||||
json.dump(full_path_results, f, ensure_ascii=False, indent=4)
|
||||
|
||||
print("转写结果已保存到 file_name_results.json 和 full_path_results.json")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user