147 lines
5.4 KiB
Python
147 lines
5.4 KiB
Python
# 使用讯飞 语音转写 接口将文件夹内所有内容转为文本
|
||
# 使用轮训apikey,防止达到qps出现错误
|
||
|
||
import os
|
||
import json
|
||
import requests
|
||
import hashlib
|
||
import hmac
|
||
import base64
|
||
import time
|
||
|
||
def is_audio_file(file_path):
|
||
audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma']
|
||
return any(file_path.lower().endswith(ext) for ext in audio_extensions)
|
||
|
||
def generate_signa(appid, secret_key):
|
||
ts = str(int(time.time()))
|
||
base_string = appid + ts
|
||
md5_base_string = hashlib.md5(base_string.encode('utf-8')).hexdigest()
|
||
hmac_sha1 = hmac.new(secret_key.encode('utf-8'), md5_base_string.encode('utf-8'), hashlib.sha1).digest()
|
||
signa = base64.b64encode(hmac_sha1).decode('utf-8')
|
||
return signa, ts
|
||
|
||
def transcribe_audio(file_path, appid, secret_key):
|
||
upload_url = "https://raasr.xfyun.cn/v2/api/upload"
|
||
get_result_url = "https://raasr.xfyun.cn/v2/api/getResult"
|
||
|
||
def upload_file(file_size):
|
||
signa, ts = generate_signa(appid, secret_key)
|
||
file_name = os.path.basename(file_path)
|
||
duration = 200
|
||
|
||
params = {
|
||
'duration': duration,
|
||
'signa': signa,
|
||
'fileName': file_name,
|
||
'fileSize': file_size,
|
||
'appId': appid,
|
||
'ts': ts
|
||
}
|
||
|
||
headers = {
|
||
'Content-Type': 'application/json; charset=UTF-8',
|
||
'Chunked': 'false'
|
||
}
|
||
|
||
with open(file_path, 'rb') as audio_file:
|
||
files = {'file': audio_file}
|
||
response = requests.post(upload_url, params=params, headers=headers, files=files)
|
||
return response
|
||
|
||
file_size = os.path.getsize(file_path)
|
||
response = upload_file(file_size)
|
||
|
||
if response.status_code == 200:
|
||
response_data = response.json()
|
||
if response_data.get('code') == '100003':
|
||
correct_file_size = int(response_data['descInfo'].split(':')[-1])
|
||
response = upload_file(correct_file_size)
|
||
response_data = response.json()
|
||
|
||
if 'orderId' in response_data.get('content', {}):
|
||
task_id = response_data['content']['orderId']
|
||
else:
|
||
print(f"Error uploading {file_path}: {response_data}")
|
||
return None
|
||
|
||
# 解析结果
|
||
signa, ts = generate_signa(appid, secret_key)
|
||
while True:
|
||
result_response = requests.get(get_result_url, params={'appId': appid, 'signa': signa, 'ts': ts, 'orderId': task_id})
|
||
if result_response.status_code == 200:
|
||
result_data = result_response.json()
|
||
if result_data.get('code') == '000000':
|
||
order_result_str = result_data['content']['orderResult']
|
||
if order_result_str and order_result_str!="":
|
||
order_result_dict = json.loads(order_result_str)
|
||
results = ''
|
||
for lattice in order_result_dict['lattice']:
|
||
json_1best = json.loads(lattice['json_1best'])
|
||
# print(json_1best)
|
||
for rt in json_1best['st']['rt']:
|
||
for ws in rt['ws']:
|
||
for cw in ws['cw']:
|
||
# print(cw['w'])
|
||
results += cw['w']
|
||
return results
|
||
else:
|
||
time.sleep(5)
|
||
elif result_data.get('code') == '000001':
|
||
time.sleep(10)
|
||
else:
|
||
print(f"Error getting result for {file_path}: {result_data}")
|
||
return None
|
||
else:
|
||
print(f"Error getting result for {file_path}: {result_response.text}")
|
||
return None
|
||
else:
|
||
print(f"Error uploading {file_path}: {response.text}")
|
||
return None
|
||
|
||
def main():
|
||
folder_path = input("请输入文件夹路径: ")
|
||
keyList = [
|
||
{
|
||
'appid': 'xxxxx',
|
||
'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx'
|
||
},
|
||
{
|
||
'appid': 'xxxxx',
|
||
'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx'
|
||
},
|
||
]
|
||
|
||
if not os.path.isdir(folder_path):
|
||
print("无效的文件夹路径")
|
||
return
|
||
|
||
times=0
|
||
|
||
file_name_results = {}
|
||
full_path_results = {}
|
||
|
||
for root, _, files in os.walk(folder_path):
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
if is_audio_file(file_path):
|
||
transcription = transcribe_audio(file_path, keyList[times%len(keyList)]['appid'], keyList[times%len(keyList)]['secret_key'])
|
||
times+=1
|
||
if transcription:
|
||
file_name_results[file] = transcription
|
||
full_path_results[file_path] = transcription
|
||
|
||
print("转写结果:")
|
||
for file, transcription in file_name_results.items():
|
||
print(f"{file}: {transcription}")
|
||
|
||
with open('file_name_results.json', 'w', encoding='utf-8') as f:
|
||
json.dump(file_name_results, f, ensure_ascii=False, indent=4)
|
||
|
||
with open('full_path_results.json', 'w', encoding='utf-8') as f:
|
||
json.dump(full_path_results, f, ensure_ascii=False, indent=4)
|
||
|
||
print("转写结果已保存到 file_name_results.json 和 full_path_results.json")
|
||
|
||
if __name__ == "__main__":
|
||
main() |