2024-11-20 10:53:13 +08:00
|
|
|
# 使用讯飞 语音转写 接口将文件夹内所有内容转为文本
|
2024-11-20 10:57:53 +08:00
|
|
|
|
2024-11-20 00:32:18 +08:00
|
|
|
import os
|
|
|
|
import json
|
|
|
|
import requests
|
|
|
|
import hashlib
|
|
|
|
import hmac
|
|
|
|
import base64
|
|
|
|
import time
|
|
|
|
|
|
|
|
def is_audio_file(file_path):
|
|
|
|
audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.pcm', '.aac', '.opus', '.ogg', '.amr', '.speex', '.lyb', '.ac3', '.ape', '.m4r', '.mp4', '.acc', '.wma']
|
|
|
|
return any(file_path.lower().endswith(ext) for ext in audio_extensions)
|
|
|
|
|
|
|
|
def generate_signa(appid, secret_key):
|
|
|
|
ts = str(int(time.time()))
|
|
|
|
base_string = appid + ts
|
|
|
|
md5_base_string = hashlib.md5(base_string.encode('utf-8')).hexdigest()
|
|
|
|
hmac_sha1 = hmac.new(secret_key.encode('utf-8'), md5_base_string.encode('utf-8'), hashlib.sha1).digest()
|
|
|
|
signa = base64.b64encode(hmac_sha1).decode('utf-8')
|
|
|
|
return signa, ts
|
|
|
|
|
|
|
|
def transcribe_audio(file_path, appid, secret_key):
|
|
|
|
upload_url = "https://raasr.xfyun.cn/v2/api/upload"
|
|
|
|
get_result_url = "https://raasr.xfyun.cn/v2/api/getResult"
|
|
|
|
|
|
|
|
def upload_file(file_size):
|
|
|
|
signa, ts = generate_signa(appid, secret_key)
|
|
|
|
file_name = os.path.basename(file_path)
|
|
|
|
duration = 200
|
|
|
|
|
|
|
|
params = {
|
|
|
|
'duration': duration,
|
|
|
|
'signa': signa,
|
|
|
|
'fileName': file_name,
|
|
|
|
'fileSize': file_size,
|
|
|
|
'appId': appid,
|
|
|
|
'ts': ts
|
|
|
|
}
|
|
|
|
|
|
|
|
headers = {
|
|
|
|
'Content-Type': 'application/json; charset=UTF-8',
|
|
|
|
'Chunked': 'false'
|
|
|
|
}
|
|
|
|
|
|
|
|
with open(file_path, 'rb') as audio_file:
|
|
|
|
files = {'file': audio_file}
|
|
|
|
response = requests.post(upload_url, params=params, headers=headers, files=files)
|
|
|
|
return response
|
|
|
|
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
response = upload_file(file_size)
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
response_data = response.json()
|
|
|
|
if response_data.get('code') == '100003':
|
|
|
|
correct_file_size = int(response_data['descInfo'].split(':')[-1])
|
|
|
|
response = upload_file(correct_file_size)
|
|
|
|
response_data = response.json()
|
|
|
|
|
|
|
|
if 'orderId' in response_data.get('content', {}):
|
|
|
|
task_id = response_data['content']['orderId']
|
|
|
|
else:
|
|
|
|
print(f"Error uploading {file_path}: {response_data}")
|
|
|
|
return None
|
|
|
|
|
|
|
|
# 解析结果
|
|
|
|
signa, ts = generate_signa(appid, secret_key)
|
|
|
|
while True:
|
|
|
|
result_response = requests.get(get_result_url, params={'appId': appid, 'signa': signa, 'ts': ts, 'orderId': task_id})
|
|
|
|
if result_response.status_code == 200:
|
|
|
|
result_data = result_response.json()
|
|
|
|
if result_data.get('code') == '000000':
|
|
|
|
order_result_str = result_data['content']['orderResult']
|
|
|
|
if order_result_str and order_result_str!="":
|
|
|
|
order_result_dict = json.loads(order_result_str)
|
|
|
|
results = ''
|
|
|
|
for lattice in order_result_dict['lattice']:
|
|
|
|
json_1best = json.loads(lattice['json_1best'])
|
|
|
|
# print(json_1best)
|
|
|
|
for rt in json_1best['st']['rt']:
|
|
|
|
for ws in rt['ws']:
|
|
|
|
for cw in ws['cw']:
|
|
|
|
# print(cw['w'])
|
|
|
|
results += cw['w']
|
|
|
|
return results
|
|
|
|
else:
|
|
|
|
time.sleep(5)
|
|
|
|
elif result_data.get('code') == '000001':
|
|
|
|
time.sleep(10)
|
|
|
|
else:
|
|
|
|
print(f"Error getting result for {file_path}: {result_data}")
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
print(f"Error getting result for {file_path}: {result_response.text}")
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
print(f"Error uploading {file_path}: {response.text}")
|
|
|
|
return None
|
|
|
|
|
|
|
|
def main():
|
|
|
|
folder_path = input("请输入文件夹路径: ")
|
|
|
|
keyList = [
|
|
|
|
{
|
2024-11-20 10:57:53 +08:00
|
|
|
'appid': 'xxxxx',
|
|
|
|
'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx'
|
|
|
|
},
|
|
|
|
{
|
|
|
|
'appid': 'xxxxx',
|
|
|
|
'secret_key':'xxxxxxxxxxxxxxxxxxxxxxxx'
|
2024-11-20 00:32:18 +08:00
|
|
|
},
|
|
|
|
]
|
|
|
|
|
|
|
|
if not os.path.isdir(folder_path):
|
|
|
|
print("无效的文件夹路径")
|
|
|
|
return
|
|
|
|
|
|
|
|
times=0
|
|
|
|
|
|
|
|
file_name_results = {}
|
|
|
|
full_path_results = {}
|
|
|
|
|
|
|
|
for root, _, files in os.walk(folder_path):
|
|
|
|
for file in files:
|
|
|
|
file_path = os.path.join(root, file)
|
|
|
|
if is_audio_file(file_path):
|
|
|
|
transcription = transcribe_audio(file_path, keyList[times%len(keyList)]['appid'], keyList[times%len(keyList)]['secret_key'])
|
|
|
|
times+=1
|
|
|
|
if transcription:
|
|
|
|
file_name_results[file] = transcription
|
|
|
|
full_path_results[file_path] = transcription
|
|
|
|
|
|
|
|
print("转写结果:")
|
|
|
|
for file, transcription in file_name_results.items():
|
|
|
|
print(f"{file}: {transcription}")
|
|
|
|
|
|
|
|
with open('file_name_results.json', 'w', encoding='utf-8') as f:
|
|
|
|
json.dump(file_name_results, f, ensure_ascii=False, indent=4)
|
|
|
|
|
|
|
|
with open('full_path_results.json', 'w', encoding='utf-8') as f:
|
|
|
|
json.dump(full_path_results, f, ensure_ascii=False, indent=4)
|
|
|
|
|
|
|
|
print("转写结果已保存到 file_name_results.json 和 full_path_results.json")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|