注意事項
科大訊飛語音轉寫 API 文檔鏈接: https://www.xfyun.cn/doc/asr/lfasr/API.html.
科大訊飛語音轉寫Python3的demo下載鏈接:http://xfyun-doc.ufile.ucloud.com.cn/1564736425808301/weblfasr_python3_demo.zip
上一篇寫了用百度智能云進行音頻文件轉寫的博客,但是那個效果啊,有點慘不忍睹,至少我的識別結果是這樣。然后轉而使用了一下科大訊飛的,想著科大訊飛專門做語音相關的這一塊,應該會好些。語音轉寫的Python3的demo代碼確實很不錯,函數接口很簡潔,本文代碼都是這個demo里面的。識別準確率還是可以的,而且不需要像百度那樣整點才開始識別,很快就返回了識別結果。
如果你的錄音是不止一個人,而是像電話錄音那種,想把轉寫結果中不同人說的話的分離出來,請按照下面這樣添加預處理參數(demo中默認是沒有添加這兒最后兩個參數的,不添加的話,默認是不進行角色分離的):
這樣的話,轉寫結果的speaker的值就不全是0了,而是根據不同的人對轉寫結果進行分離:
操作系統:Windows
Python:3.6
可用時長: 免費用戶時長5小時,且用且珍惜。
音頻屬性: 采樣率16k或8k、位長8bits或16bits、單聲道&多聲道
音頻格式: wav/flac/opus/m4a/mp3
音頻大小: 不超過500M
音頻時長: 不超過5小時,建議5分鐘以上
語言種類: 中文普通話、英文
轉寫結果保存時長 30天。(同一通錄音不需要重新上傳識別,如果你已經上傳識別過了,之后只需要使用api.get_result_request(taskid)的方式即可再次獲取識別結果,taskid是你第一次上傳錄音時給你分配的任務ID,避免重復上傳浪費可用時長)
APP_ID, SECRET_KEY的獲取
訊飛的好像不需要API_KEY,開放授權的方式和其他大廠的類似:
1、頁面右上方“控制臺”點擊進入,登錄訊飛賬號(沒有就注冊一個),進入訊飛開放平臺。
2、左側導航欄上方,依次選擇 語音識別->語音轉寫->離線語音轉寫識別。
3、服務申請。點擊“創建應用”,“接口選擇”已默認勾選完成,如無其他需求,無需勾選,完成其他資料后,點擊最下方“立即創建”按鈕。自己可以手動領取5小時免費試用體驗包。
4、應用成功則頁面顯示“創建完畢”,點擊”返回應用列表”, 查看新創建應用詳情,在服務接口認證信息窗口就可以看到返回的AppID,SecretKey。
話不多說,直接上代碼了
# -*- coding: utf-8 -*-
#
# author: yanmeng2
#
# 非實時轉寫調用demo
import
base64
import
hashlib
import
hmac
import
json
import
os
import
time
import
requests
lfasr_host
=
'http://raasr.xfyun.cn/api'
# 請求的接口名
api_prepare
=
'/prepare'
api_upload
=
'/upload'
api_merge
=
'/merge'
api_get_progress
=
'/getProgress'
api_get_result
=
'/getResult'
# 文件分片大小10M
file_piece_sice
=
10485760
# ——————————————————轉寫可配置參數————————————————
# 參數可在官網界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根據需求可自行在gene_params方法里添加修改
# 轉寫類型
lfasr_type
=
0
# 是否開啟分詞
has_participle
=
'false'
has_seperate
=
'true'
# 多候選詞個數
max_alternatives
=
0
# 子用戶標識
suid
=
''
class
SliceIdGenerator
:
"""slice id生成器"""
def
__init__
(
self
)
:
self
.
__ch
=
'aaaaaaaaa`'
def
getNextSliceId
(
self
)
:
ch
=
self
.
__ch
j
=
len
(
ch
)
-
1
while
j
>=
0
:
cj
=
ch
[
j
]
if
cj
!=
'z'
:
ch
=
ch
[
:
j
]
+
chr
(
ord
(
cj
)
+
1
)
+
ch
[
j
+
1
:
]
break
else
:
ch
=
ch
[
:
j
]
+
'a'
+
ch
[
j
+
1
:
]
j
=
j
-
1
self
.
__ch
=
ch
return
self
.
__ch
class
RequestApi
(
object
)
:
def
__init__
(
self
,
appid
,
secret_key
,
upload_file_path
)
:
self
.
appid
=
appid
self
.
secret_key
=
secret_key
self
.
upload_file_path
=
upload_file_path
# 根據不同的apiname生成不同的參數,本示例中未使用全部參數您可在官網(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后選擇適合業務場景的進行更換
def
gene_params
(
self
,
apiname
,
taskid
=
None
,
slice_id
=
None
)
:
appid
=
self
.
appid
secret_key
=
self
.
secret_key
upload_file_path
=
self
.
upload_file_path
ts
=
str
(
int
(
time
.
time
(
)
)
)
m2
=
hashlib
.
md5
(
)
m2
.
update
(
(
appid
+
ts
)
.
encode
(
'utf-8'
)
)
md5
=
m2
.
hexdigest
(
)
md5
=
bytes
(
md5
,
encoding
=
'utf-8'
)
# 以secret_key為key, 上面的md5為msg, 使用hashlib.sha1加密結果為signa
signa
=
hmac
.
new
(
secret_key
.
encode
(
'utf-8'
)
,
md5
,
hashlib
.
sha1
)
.
digest
(
)
signa
=
base64
.
b64encode
(
signa
)
signa
=
str
(
signa
,
'utf-8'
)
file_len
=
os
.
path
.
getsize
(
upload_file_path
)
file_name
=
os
.
path
.
basename
(
upload_file_path
)
param_dict
=
{
}
if
apiname
==
api_prepare
:
# slice_num是指分片數量,如果您使用的音頻都是較短音頻也可以不分片,直接將slice_num指定為1即可
slice_num
=
int
(
file_len
/
file_piece_sice
)
+
(
0
if
(
file_len
%
file_piece_sice
==
0
)
else
1
)
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'file_len'
]
=
str
(
file_len
)
param_dict
[
'file_name'
]
=
file_name
param_dict
[
'slice_num'
]
=
str
(
slice_num
)
elif
apiname
==
api_upload
:
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'task_id'
]
=
taskid
param_dict
[
'slice_id'
]
=
slice_id
elif
apiname
==
api_merge
:
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'task_id'
]
=
taskid
param_dict
[
'file_name'
]
=
file_name
elif
apiname
==
api_get_progress
or
apiname
==
api_get_result
:
param_dict
[
'app_id'
]
=
appid
param_dict
[
'signa'
]
=
signa
param_dict
[
'ts'
]
=
ts
param_dict
[
'task_id'
]
=
taskid
return
param_dict
# 請求和結果解析,結果中各個字段的含義可參考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html
def
gene_request
(
self
,
apiname
,
data
,
files
=
None
,
headers
=
None
)
:
response
=
requests
.
post
(
lfasr_host
+
apiname
,
data
=
data
,
files
=
files
,
headers
=
headers
)
result
=
json
.
loads
(
response
.
text
)
if
result
[
"ok"
]
==
0
:
print
(
"{} success:"
.
format
(
apiname
)
+
str
(
result
)
)
return
result
else
:
print
(
"{} error:"
.
format
(
apiname
)
+
str
(
result
)
)
exit
(
0
)
return
result
# 預處理
def
prepare_request
(
self
)
:
return
self
.
gene_request
(
apiname
=
api_prepare
,
data
=
self
.
gene_params
(
api_prepare
)
)
# 上傳
def
upload_request
(
self
,
taskid
,
upload_file_path
)
:
file_object
=
open
(
upload_file_path
,
'rb'
)
try
:
index
=
1
sig
=
SliceIdGenerator
(
)
while
True
:
content
=
file_object
.
read
(
file_piece_sice
)
if
not
content
or
len
(
content
)
==
0
:
break
files
=
{
"filename"
:
self
.
gene_params
(
api_upload
)
.
get
(
"slice_id"
)
,
"content"
:
content
}
response
=
self
.
gene_request
(
api_upload
,
data
=
self
.
gene_params
(
api_upload
,
taskid
=
taskid
,
slice_id
=
sig
.
getNextSliceId
(
)
)
,
files
=
files
)
if
response
.
get
(
'ok'
)
!=
0
:
# 上傳分片失敗
print
(
'upload slice fail, response: '
+
str
(
response
)
)
return
False
print
(
'upload slice '
+
str
(
index
)
+
' success'
)
index
+=
1
finally
:
'file index:'
+
str
(
file_object
.
tell
(
)
)
file_object
.
close
(
)
return
True
# 合并
def
merge_request
(
self
,
taskid
)
:
return
self
.
gene_request
(
api_merge
,
data
=
self
.
gene_params
(
api_merge
,
taskid
=
taskid
)
)
# 獲取進度
def
get_progress_request
(
self
,
taskid
)
:
return
self
.
gene_request
(
api_get_progress
,
data
=
self
.
gene_params
(
api_get_progress
,
taskid
=
taskid
)
)
# 獲取結果
def
get_result_request
(
self
,
taskid
)
:
return
self
.
gene_request
(
api_get_result
,
data
=
self
.
gene_params
(
api_get_result
,
taskid
=
taskid
)
)
def
all_api_request
(
self
)
:
# 1. 預處理
pre_result
=
self
.
prepare_request
(
)
taskid
=
pre_result
[
"data"
]
# 2 . 分片上傳
self
.
upload_request
(
taskid
=
taskid
,
upload_file_path
=
self
.
upload_file_path
)
# 3 . 文件合并
self
.
merge_request
(
taskid
=
taskid
)
# 4 . 獲取任務進度
while
True
:
# 每隔20秒獲取一次任務進度
progress
=
self
.
get_progress_request
(
taskid
)
progress_dic
=
progress
if
progress_dic
[
'err_no'
]
!=
0
and
progress_dic
[
'err_no'
]
!=
26605
:
print
(
'task error: '
+
progress_dic
[
'failed'
]
)
return
else
:
data
=
progress_dic
[
'data'
]
task_status
=
json
.
loads
(
data
)
if
task_status
[
'status'
]
==
9
:
print
(
'task '
+
taskid
+
' finished'
)
break
print
(
'The task '
+
taskid
+
' is in processing, task status: '
+
str
(
data
)
)
# 每次獲取進度間隔20S
time
.
sleep
(
20
)
# 5 . 獲取結果
self
.
get_result_request
(
taskid
=
taskid
)
# 注意:如果出現requests模塊報錯:"NoneType" object has no attribute 'read', 請嘗試將requests模塊更新到2.20.0或以上版本(本demo測試版本為2.20.0)
# 輸入訊飛開放平臺的appid,secret_key和待轉寫的文件路徑
if
__name__
==
'__main__'
:
APP_ID
=
"***"
SECRET_KEY
=
"****"
file_path
=
r
"***.wav"
api
=
RequestApi
(
appid
=
APP_ID
,
secret_key
=
SECRET_KEY
,
upload_file_path
=
file_path
)
api
.
all_api_request
(
)
當然,你可以根據自己的需求對demo進行改進,比如你想并發識別錄音,你可以添加多線程執行的函數,為了獲取taskid方便,我在class的初始化里邊添加了self.taskid = “None”,并在預處理結果返回之后重新對taskid賦值。
def
thread_func
(
wav_file_path
,
txt_file_path
)
:
# 線程函數,方便并發識別錄音
doc
=
open
(
txt_file_path
,
'w'
,
encoding
=
'utf-8'
)
# doc.close()
api
=
RequestApi
(
appid
=
APP_ID
,
secret_key
=
SECRET_KEY
,
upload_file_path
=
wav_file_path
)
api
.
all_api_request
(
)
# demo中這個函數是完整過程執行,但我把提取結果的模塊提出來了
print
(
'taskid is: '
+
api
.
taskid
,
file
=
doc
)
result
=
api
.
get_result_request
(
api
.
taskid
)
result
=
eval
(
result
[
'data'
]
)
# print(result)
for
x
in
result
:
print
(
x
)
print
(
x
,
file
=
doc
)
doc
.
close
(
)
#主函數寫成類似這種
if
__name__
==
'__main__'
:
APP_ID
=
"***"
SECRET_KEY
=
"***"
file_read_path
=
r
"D:\MyProject\Python\Voice_SDK\20190820\\"
file_save_path
=
r
"D:\MyProject\Python\Voice_SDK\20190820_xunfei\\"
for
file
in
file_list
:
#多并發批量執行
wav_file_path
=
file_read_path
+
file
+
".wav"
txt_file_path
=
file_save_path
+
file
+
".txt"
t
=
threading
.
Thread
(
target
=
thread_func
,
args
=
(
wav_file_path
,
txt_file_path
)
)
t
.
start
(
)
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
