voice2text/main.py

147 lines
5.4 KiB
Python
Raw Normal View History

2024-11-20 10:53:13 +08:00
# 使用讯飞 语音转写 接口将文件夹内所有内容转为文本
2024-11-20 11:12:26 +08:00
# 使用轮训apikey防止达到qps出现错误
2024-11-20 10:57:53 +08:00
2024-11-20 00:32:18 +08:00
import os
import json
import requests
import hashlib
import hmac
import base64
import time
def is_audio_file(file_path):
audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma']
return any(file_path.lower().endswith(ext) for ext in audio_extensions)
def generate_signa(appid, secret_key):
ts = str(int(time.time()))
base_string = appid + ts
md5_base_string = hashlib.md5(base_string.encode('utf-8')).hexdigest()
hmac_sha1 = hmac.new(secret_key.encode('utf-8'), md5_base_string.encode('utf-8'), hashlib.sha1).digest()
signa = base64.b64encode(hmac_sha1).decode('utf-8')
return signa, ts
def transcribe_audio(file_path, appid, secret_key):
upload_url = "https://raasr.xfyun.cn/v2/api/upload"
get_result_url = "https://raasr.xfyun.cn/v2/api/getResult"
def upload_file(file_size):
signa, ts = generate_signa(appid, secret_key)
file_name = os.path.basename(file_path)
duration = 200
params = {
'duration': duration,
'signa': signa,
'fileName': file_name,
'fileSize': file_size,
'appId': appid,
'ts': ts
}
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'Chunked': 'false'
}
with open(file_path, 'rb') as audio_file:
files = {'file': audio_file}
response = requests.post(upload_url, params=params, headers=headers, files=files)
return response
file_size = os.path.getsize(file_path)
response = upload_file(file_size)
if response.status_code == 200:
response_data = response.json()
if response_data.get('code') == '100003':
correct_file_size = int(response_data['descInfo'].split(':')[-1])
response = upload_file(correct_file_size)
response_data = response.json()
if 'orderId' in response_data.get('content', {}):
task_id = response_data['content']['orderId']
else:
print(f"Error uploading {file_path}: {response_data}")
return None
# 解析结果
signa, ts = generate_signa(appid, secret_key)
while True:
result_response = requests.get(get_result_url, params={'appId': appid, 'signa': signa, 'ts': ts, 'orderId': task_id})
if result_response.status_code == 200:
result_data = result_response.json()
if result_data.get('code') == '000000':
order_result_str = result_data['content']['orderResult']
if order_result_str and order_result_str!="":
order_result_dict = json.loads(order_result_str)
results = ''
for lattice in order_result_dict['lattice']:
json_1best = json.loads(lattice['json_1best'])
# print(json_1best)
for rt in json_1best['st']['rt']:
for ws in rt['ws']:
for cw in ws['cw']:
# print(cw['w'])
results += cw['w']
return results
else:
time.sleep(5)
elif result_data.get('code') == '000001':
time.sleep(10)
else:
print(f"Error getting result for {file_path}: {result_data}")
return None
else:
print(f"Error getting result for {file_path}: {result_response.text}")
return None
else:
print(f"Error uploading {file_path}: {response.text}")
return None
def main():
folder_path = input("请输入文件夹路径: ")
keyList = [
{
2024-11-20 10:57:53 +08:00
'appid': 'xxxxx',
'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx'
},
{
'appid': 'xxxxx',
'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx'
2024-11-20 00:32:18 +08:00
},
]
if not os.path.isdir(folder_path):
print("无效的文件夹路径")
return
times=0
file_name_results = {}
full_path_results = {}
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
if is_audio_file(file_path):
transcription = transcribe_audio(file_path, keyList[times%len(keyList)]['appid'], keyList[times%len(keyList)]['secret_key'])
times+=1
if transcription:
file_name_results[file] = transcription
full_path_results[file_path] = transcription
print("转写结果:")
for file, transcription in file_name_results.items():
print(f"{file}: {transcription}")
with open('file_name_results.json', 'w', encoding='utf-8') as f:
json.dump(file_name_results, f, ensure_ascii=False, indent=4)
with open('full_path_results.json', 'w', encoding='utf-8') as f:
json.dump(full_path_results, f, ensure_ascii=False, indent=4)
print("转写结果已保存到 file_name_results.json 和 full_path_results.json")
if __name__ == "__main__":
main()