快速部署SenseVoice语音识别服务:REST API调用详解与实战
本文介绍了如何在星图GPU平台上自动化部署sensevoice-small-语音识别-onnx模型(带量化后)镜像,快速搭建多语言语音识别服务。该服务提供简洁的REST API接口,可轻松实现音频转文字功能,典型应用场景包括将会议录音、视频音频等内容自动转换为文本字幕,提升内容处理效率。
快速部署SenseVoice语音识别服务:REST API调用详解与实战
你是不是遇到过这样的场景?手头有一段会议录音需要整理成文字,或者有一段外语视频需要生成字幕,又或者想给自己的应用加上语音转文字的功能。传统方法要么需要手动听写,效率低下;要么需要调用昂贵的云端API,成本高昂。今天,我要分享一个可以部署在自己服务器上的解决方案——SenseVoice语音识别服务。
这个基于ONNX量化的多语言语音识别模型,不仅支持中文、英语、日语、韩语和粤语,还能自动检测超过50种语言。更棒的是,它提供了简单易用的REST API接口,让你能像调用普通Web服务一样,轻松实现语音转文字功能。接下来,我就带你从零开始,一步步部署这个服务,并详细讲解如何通过API进行调用。
1. 环境准备与一键部署
在开始调用API之前,我们需要先把服务跑起来。SenseVoice镜像已经帮我们做好了大部分准备工作,部署过程非常简单。
1.1 系统要求与依赖检查
首先确认你的环境满足基本要求:
- Python版本:建议使用Python 3.8或更高版本
- 内存要求:至少2GB可用内存(模型本身约230MB)
- 磁盘空间:预留500MB空间用于模型和依赖
- 网络连接:首次运行需要下载模型文件(如果本地没有缓存)
如果你使用的是提供的镜像,这些依赖通常已经预装好了。可以通过以下命令快速检查:
# 检查Python版本
python3 --version
# 检查关键依赖
pip list | grep -E "funasr-onnx|gradio|fastapi"
1.2 快速启动服务
部署SenseVoice服务只需要几条简单的命令。服务提供了两种启动方式:一种是带Web界面的完整服务,另一种是纯API服务。
方式一:启动完整服务(推荐)
这是最常用的方式,同时启动Web界面和API服务:
# 进入项目目录(如果不在该目录下)
cd /path/to/sensevoice-service
# 安装依赖(如果尚未安装)
pip install funasr-onnx gradio fastapi uvicorn soundfile jieba
# 启动服务
python3 app.py --host 0.0.0.0 --port 7860
启动成功后,你会看到类似下面的输出:
Running on local URL: http://0.0.0.0:7860
Running on public URL: https://xxxx.gradio.live
方式二:仅启动API服务
如果你只需要API功能,不需要Web界面,可以使用这个简化版本:
# 创建一个简单的API服务文件 api_server.py
cat > api_server.py << 'EOF'
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse
from funasr_onnx import SenseVoiceSmall
import os
app = FastAPI()
# 初始化模型
model_path = "/root/ai-models/danieldong/sensevoice-small-onnx-quant"
model = SenseVoiceSmall(model_path, batch_size=10, quantize=True)
@app.post("/api/transcribe")
async def transcribe_audio(
file: UploadFile = File(...),
language: str = Form("auto"),
use_itn: bool = Form(True)
):
# 保存上传的文件
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as f:
content = await file.read()
f.write(content)
# 调用模型进行识别
result = model([temp_path], language=language, use_itn=use_itn)
# 清理临时文件
os.remove(temp_path)
return JSONResponse({
"text": result[0]["text"],
"language": result[0].get("language", "auto"),
"timestamp": result[0].get("timestamp", [])
})
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
EOF
# 运行API服务
python3 api_server.py
1.3 验证服务状态
服务启动后,可以通过以下几种方式验证是否正常运行:
# 方法1:健康检查接口
curl http://localhost:7860/health
# 方法2:查看API文档
# 在浏览器中打开:http://localhost:7860/docs
# 方法3:测试Web界面
# 在浏览器中打开:http://localhost:7860
如果一切正常,健康检查接口会返回 {"status": "healthy"},API文档页面会显示所有可用的接口,Web界面则可以让你上传音频文件进行测试。
2. REST API接口详解
SenseVoice服务提供了简洁而强大的REST API接口,让我们可以轻松集成到各种应用中。下面我来详细讲解每个接口的使用方法。
2.1 核心接口:音频转写
这是最常用的接口,用于将音频文件转换为文字。
接口信息:
- URL:
POST /api/transcribe - Content-Type:
multipart/form-data - 响应格式: JSON
请求参数:
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
file |
File | 是 | - | 音频文件,支持wav、mp3、m4a、flac等格式 |
language |
String | 否 | "auto" | 语言代码,支持:auto(自动检测)、zh(中文)、en(英语)、yue(粤语)、ja(日语)、ko(韩语) |
use_itn |
Boolean | 否 | true | 是否启用逆文本正则化,将"三点"转为"3点"等 |
响应格式:
{
"text": "识别出的文字内容",
"language": "检测到的语言代码",
"timestamp": [
[开始时间, 结束时间, "文字片段"],
...
]
}
2.2 使用curl进行测试
让我们用实际的例子来演示如何使用这个接口。首先准备一个测试音频文件,或者使用现有的音频文件。
示例1:基本转写
# 上传音频文件进行转写
curl -X POST "http://localhost:7860/api/transcribe" \
-F "file=@test_audio.wav" \
-F "language=auto" \
-F "use_itn=true"
示例2:指定语言
如果你知道音频的语言,可以指定语言代码来提高识别准确率:
# 指定中文识别
curl -X POST "http://localhost:7860/api/transcribe" \
-F "file=@chinese_audio.mp3" \
-F "language=zh" \
-F "use_itn=true"
# 指定英语识别
curl -X POST "http://localhost:7860/api/transcribe" \
-F "file=@english_audio.m4a" \
-F "language=en" \
-F "use_itn=true"
示例3:禁用逆文本正则化
有些场景下,你可能需要原始的文字输出:
# 禁用ITN,获取原始识别结果
curl -X POST "http://localhost:7860/api/transcribe" \
-F "file=@audio.wav" \
-F "language=auto" \
-F "use_itn=false"
2.3 批量处理接口
虽然官方接口没有直接提供批量处理,但我们可以通过简单的脚本实现批量音频转写:
import requests
import os
import json
def batch_transcribe(audio_folder, api_url="http://localhost:7860/api/transcribe"):
"""批量转写文件夹中的所有音频文件"""
results = {}
# 支持的音频格式
audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.ogg']
for filename in os.listdir(audio_folder):
if any(filename.lower().endswith(ext) for ext in audio_extensions):
file_path = os.path.join(audio_folder, filename)
print(f"正在处理: {filename}")
try:
with open(file_path, 'rb') as f:
files = {'file': (filename, f, 'audio/wav')}
data = {'language': 'auto', 'use_itn': True}
response = requests.post(api_url, files=files, data=data)
if response.status_code == 200:
result = response.json()
results[filename] = {
'text': result['text'],
'language': result['language']
}
print(f"✓ 完成: {filename}")
else:
print(f"✗ 失败: {filename} - {response.text}")
except Exception as e:
print(f"✗ 错误: {filename} - {str(e)}")
return results
# 使用示例
if __name__ == "__main__":
# 转写指定文件夹中的所有音频
results = batch_transcribe("/path/to/audio/files")
# 保存结果到JSON文件
with open("transcription_results.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"批量处理完成,共处理 {len(results)} 个文件")
3. Python客户端调用实战
除了直接调用REST API,我们还可以使用Python编写更复杂的客户端程序。下面我提供几个实用的示例。
3.1 基础Python客户端
这是一个完整的Python客户端类,封装了所有常用的API调用:
import requests
import json
from typing import Optional, Dict, Any
import os
class SenseVoiceClient:
"""SenseVoice语音识别客户端"""
def __init__(self, base_url: str = "http://localhost:7860"):
"""
初始化客户端
Args:
base_url: SenseVoice服务地址
"""
self.base_url = base_url.rstrip('/')
self.transcribe_url = f"{self.base_url}/api/transcribe"
self.health_url = f"{self.base_url}/health"
def check_health(self) -> bool:
"""检查服务是否健康"""
try:
response = requests.get(self.health_url, timeout=5)
return response.status_code == 200 and response.json().get("status") == "healthy"
except:
return False
def transcribe_file(
self,
file_path: str,
language: str = "auto",
use_itn: bool = True
) -> Dict[str, Any]:
"""
转写音频文件
Args:
file_path: 音频文件路径
language: 语言代码
use_itn: 是否启用逆文本正则化
Returns:
识别结果字典
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'language': language, 'use_itn': str(use_itn).lower()}
response = requests.post(self.transcribe_url, files=files, data=data)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"转写失败: {response.status_code} - {response.text}")
def transcribe_bytes(
self,
audio_bytes: bytes,
filename: str = "audio.wav",
language: str = "auto",
use_itn: bool = True
) -> Dict[str, Any]:
"""
转写音频字节数据
Args:
audio_bytes: 音频字节数据
filename: 文件名(用于Content-Disposition)
language: 语言代码
use_itn: 是否启用逆文本正则化
Returns:
识别结果字典
"""
files = {'file': (filename, audio_bytes)}
data = {'language': language, 'use_itn': str(use_itn).lower()}
response = requests.post(self.transcribe_url, files=files, data=data)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"转写失败: {response.status_code} - {response.text}")
def get_transcription_text(self, file_path: str, language: str = "auto") -> str:
"""
快速获取转写文本(简化版)
Args:
file_path: 音频文件路径
language: 语言代码
Returns:
转写文本
"""
result = self.transcribe_file(file_path, language=language)
return result.get("text", "")
# 使用示例
if __name__ == "__main__":
# 创建客户端
client = SenseVoiceClient()
# 检查服务状态
if client.check_health():
print("✓ 服务状态正常")
else:
print("✗ 服务不可用")
exit(1)
# 转写单个文件
try:
result = client.transcribe_file("meeting_recording.wav", language="zh")
print("转写结果:")
print(f"文本: {result['text']}")
print(f"语言: {result['language']}")
# 如果有时间戳信息
if 'timestamp' in result:
print("时间戳:")
for start, end, text in result['timestamp']:
print(f" [{start:.2f}s - {end:.2f}s]: {text}")
except Exception as e:
print(f"转写失败: {e}")
3.2 实时音频流处理
对于需要实时处理的场景,我们可以结合音频录制库实现实时转写:
import pyaudio
import wave
import threading
import queue
import time
from sensevoice_client import SenseVoiceClient
class RealTimeTranscriber:
"""实时音频转写器"""
def __init__(self, client: SenseVoiceClient, chunk_duration: int = 5):
"""
初始化实时转写器
Args:
client: SenseVoice客户端
chunk_duration: 每个音频块的时长(秒)
"""
self.client = client
self.chunk_duration = chunk_duration
self.audio_queue = queue.Queue()
self.is_recording = False
# 音频参数
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 16000 # 16kHz采样率
def start_recording(self):
"""开始录制音频"""
self.is_recording = True
self.recording_thread = threading.Thread(target=self._record_audio)
self.recording_thread.start()
# 启动处理线程
self.processing_thread = threading.Thread(target=self._process_audio)
self.processing_thread.start()
print("开始录制音频...")
def stop_recording(self):
"""停止录制音频"""
self.is_recording = False
self.recording_thread.join()
self.processing_thread.join()
print("停止录制")
def _record_audio(self):
"""录制音频线程"""
p = pyaudio.PyAudio()
stream = p.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=int(self.rate * 0.1) # 100ms的缓冲区
)
frames = []
start_time = time.time()
while self.is_recording:
data = stream.read(int(self.rate * 0.1)) # 读取100ms数据
frames.append(data)
# 每chunk_duration秒发送一次音频数据
if time.time() - start_time >= self.chunk_duration:
if frames:
# 将音频数据保存为WAV格式
audio_data = b''.join(frames)
self.audio_queue.put(audio_data)
frames = []
start_time = time.time()
stream.stop_stream()
stream.close()
p.terminate()
def _process_audio(self):
"""处理音频队列线程"""
while self.is_recording or not self.audio_queue.empty():
try:
# 从队列获取音频数据(最多等待1秒)
audio_data = self.audio_queue.get(timeout=1)
# 转写音频
result = self.client.transcribe_bytes(
audio_data,
filename="chunk.wav",
language="auto"
)
print(f"[{time.strftime('%H:%M:%S')}] {result['text']}")
except queue.Empty:
continue
except Exception as e:
print(f"处理音频时出错: {e}")
# 使用示例
if __name__ == "__main__":
# 创建客户端
client = SenseVoiceClient()
# 创建实时转写器
transcriber = RealTimeTranscriber(client, chunk_duration=3)
try:
# 开始实时转写
transcriber.start_recording()
# 录制30秒
time.sleep(30)
finally:
# 停止录制
transcriber.stop_recording()
3.3 集成到Web应用
如果你正在开发Web应用,可以这样集成SenseVoice服务:
from flask import Flask, request, jsonify, render_template
import requests
import os
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB限制
# SenseVoice服务配置
SENSEVOICE_API = "http://localhost:7860/api/transcribe"
@app.route('/')
def index():
"""首页"""
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_audio():
"""上传并转写音频文件"""
if 'audio' not in request.files:
return jsonify({'error': '没有上传文件'}), 400
file = request.files['audio']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
# 保存上传的文件
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
try:
# 调用SenseVoice API
with open(filepath, 'rb') as f:
files = {'file': (filename, f)}
data = {
'language': request.form.get('language', 'auto'),
'use_itn': request.form.get('use_itn', 'true')
}
response = requests.post(SENSEVOICE_API, files=files, data=data)
if response.status_code == 200:
result = response.json()
# 返回转写结果
return jsonify({
'success': True,
'text': result['text'],
'language': result.get('language', 'auto'),
'filename': filename
})
else:
return jsonify({
'error': f'转写失败: {response.status_code}',
'details': response.text
}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# 清理临时文件
if os.path.exists(filepath):
os.remove(filepath)
@app.route('/batch_upload', methods=['POST'])
def batch_upload():
"""批量上传音频文件"""
if 'audio_files' not in request.files:
return jsonify({'error': '没有上传文件'}), 400
files = request.files.getlist('audio_files')
language = request.form.get('language', 'auto')
results = []
for file in files:
if file.filename:
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
try:
with open(filepath, 'rb') as f:
files_data = {'file': (filename, f)}
data = {'language': language, 'use_itn': 'true'}
response = requests.post(SENSEVOICE_API, files=files_data, data=data)
if response.status_code == 200:
result = response.json()
results.append({
'filename': filename,
'text': result['text'],
'language': result.get('language', 'auto')
})
else:
results.append({
'filename': filename,
'error': f'转写失败: {response.status_code}'
})
except Exception as e:
results.append({
'filename': filename,
'error': str(e)
})
finally:
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({
'success': True,
'total': len(results),
'results': results
})
if __name__ == '__main__':
# 创建上传目录
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# 启动Flask应用
app.run(host='0.0.0.0', port=5000, debug=True)
对应的HTML模板(templates/index.html):
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SenseVoice语音转写</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
.container { background: #f5f5f5; padding: 20px; border-radius: 10px; }
h1 { color: #333; }
.upload-section { margin: 20px 0; }
.result-section { margin-top: 30px; }
textarea { width: 100%; height: 150px; margin: 10px 0; padding: 10px; }
button { background: #007bff; color: white; border: none; padding: 10px 20px; cursor: pointer; }
button:hover { background: #0056b3; }
.loading { display: none; color: #666; }
</style>
</head>
<body>
<div class="container">
<h1>SenseVoice语音转写服务</h1>
<div class="upload-section">
<h3>上传音频文件</h3>
<form id="uploadForm">
<input type="file" id="audioFile" accept="audio/*" required>
<br><br>
<label>语言设置:</label>
<select id="language">
<option value="auto">自动检测</option>
<option value="zh">中文</option>
<option value="en">英语</option>
<option value="yue">粤语</option>
<option value="ja">日语</option>
<option value="ko">韩语</option>
</select>
<br><br>
<button type="submit">开始转写</button>
<div id="loading" class="loading">处理中,请稍候...</div>
</form>
</div>
<div class="result-section">
<h3>转写结果</h3>
<textarea id="resultText" readonly placeholder="转写结果将显示在这里..."></textarea>
<div id="resultInfo"></div>
</div>
</div>
<script>
document.getElementById('uploadForm').addEventListener('submit', async function(e) {
e.preventDefault();
const fileInput = document.getElementById('audioFile');
const language = document.getElementById('language').value;
const resultText = document.getElementById('resultText');
const resultInfo = document.getElementById('resultInfo');
const loading = document.getElementById('loading');
if (!fileInput.files[0]) {
alert('请选择音频文件');
return;
}
const formData = new FormData();
formData.append('audio', fileInput.files[0]);
formData.append('language', language);
formData.append('use_itn', 'true');
// 显示加载状态
loading.style.display = 'block';
resultText.value = '';
resultInfo.innerHTML = '';
try {
const response = await fetch('/upload', {
method: 'POST',
body: formData
});
const data = await response.json();
if (data.success) {
resultText.value = data.text;
resultInfo.innerHTML = `
<p>语言: ${data.language}</p>
<p>文件名: ${data.filename}</p>
`;
} else {
resultText.value = `错误: ${data.error}`;
}
} catch (error) {
resultText.value = `请求失败: ${error.message}`;
} finally {
loading.style.display = 'none';
}
});
</script>
</body>
</html>
4. 实际应用场景与最佳实践
现在我们已经掌握了如何部署和调用SenseVoice服务,接下来看看在实际项目中如何应用,以及一些最佳实践建议。
4.1 常见应用场景
场景一:会议记录自动化
class MeetingTranscriber:
"""会议记录自动化工具"""
def __init__(self, client):
self.client = client
def process_meeting_recording(self, audio_path, output_format="txt"):
"""
处理会议录音
Args:
audio_path: 音频文件路径
output_format: 输出格式,支持txt、srt、json
Returns:
转换后的文本
"""
# 转写音频
result = self.client.transcribe_file(audio_path, language="zh")
# 根据格式输出
if output_format == "txt":
return self._format_as_txt(result)
elif output_format == "srt":
return self._format_as_srt(result)
elif output_format == "json":
return self._format_as_json(result)
else:
return result['text']
def _format_as_txt(self, result):
"""格式化为纯文本"""
text = result['text']
# 添加简单的分段(每50个字符换行)
lines = []
for i in range(0, len(text), 50):
lines.append(text[i:i+50])
return "\n".join(lines)
def _format_as_srt(self, result):
"""格式化为SRT字幕格式"""
if 'timestamp' not in result:
return result['text']
srt_content = []
for i, (start, end, text_segment) in enumerate(result['timestamp'], 1):
# 转换时间格式
start_time = self._seconds_to_srt_time(start)
end_time = self._seconds_to_srt_time(end)
srt_content.append(f"{i}")
srt_content.append(f"{start_time} --> {end_time}")
srt_content.append(text_segment)
srt_content.append("") # 空行分隔
return "\n".join(srt_content)
def _seconds_to_srt_time(self, seconds):
"""秒数转换为SRT时间格式"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds - int(seconds)) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def _format_as_json(self, result):
"""格式化为JSON"""
import json
return json.dumps(result, ensure_ascii=False, indent=2)
# 使用示例
client = SenseVoiceClient()
transcriber = MeetingTranscriber(client)
# 处理会议录音
meeting_text = transcriber.process_meeting_recording(
"meeting_20240520.wav",
output_format="srt"
)
# 保存为字幕文件
with open("meeting_20240520.srt", "w", encoding="utf-8") as f:
f.write(meeting_text)
场景二:多语言视频字幕生成
import subprocess
import os
class VideoSubtitleGenerator:
"""视频字幕生成器"""
def __init__(self, client):
self.client = client
def generate_subtitles(self, video_path, language="auto"):
"""
为视频生成字幕
Args:
video_path: 视频文件路径
language: 音频语言
Returns:
字幕文件路径
"""
# 提取音频
audio_path = self._extract_audio(video_path)
try:
# 转写音频
result = self.client.transcribe_file(audio_path, language=language)
# 生成SRT字幕
if 'timestamp' in result:
srt_content = self._create_srt_from_timestamps(result['timestamp'])
else:
# 如果没有时间戳,生成简单的字幕
srt_content = self._create_simple_srt(result['text'])
# 保存字幕文件
base_name = os.path.splitext(video_path)[0]
srt_path = f"{base_name}.srt"
with open(srt_path, "w", encoding="utf-8") as f:
f.write(srt_content)
return srt_path
finally:
# 清理临时音频文件
if os.path.exists(audio_path):
os.remove(audio_path)
def _extract_audio(self, video_path):
"""使用ffmpeg提取音频"""
base_name = os.path.splitext(video_path)[0]
audio_path = f"{base_name}_temp.wav"
# 提取音频为WAV格式
cmd = [
"ffmpeg", "-i", video_path,
"-vn", "-acodec", "pcm_s16le",
"-ar", "16000", "-ac", "1",
"-y", audio_path
]
try:
subprocess.run(cmd, check=True, capture_output=True)
return audio_path
except subprocess.CalledProcessError as e:
raise Exception(f"音频提取失败: {e.stderr.decode()}")
def _create_srt_from_timestamps(self, timestamps):
"""根据时间戳生成SRT字幕"""
srt_lines = []
for i, (start, end, text) in enumerate(timestamps, 1):
# 格式化时间
start_str = self._format_time(start)
end_str = self._format_time(end)
srt_lines.append(str(i))
srt_lines.append(f"{start_str} --> {end_str}")
srt_lines.append(text)
srt_lines.append("") # 空行
return "\n".join(srt_lines)
def _create_simple_srt(self, text, chunk_duration=5):
"""为没有时间戳的文本生成简单字幕"""
# 将文本按标点分割
import re
sentences = re.split(r'[。!?.!?]', text)
sentences = [s.strip() for s in sentences if s.strip()]
srt_lines = []
current_time = 0
for i, sentence in enumerate(sentences, 1):
start_time = current_time
end_time = start_time + chunk_duration
current_time = end_time
start_str = self._format_time(start_time)
end_str = self._format_time(end_time)
srt_lines.append(str(i))
srt_lines.append(f"{start_str} --> {end_str}")
srt_lines.append(sentence)
srt_lines.append("")
return "\n".join(srt_lines)
def _format_time(self, seconds):
"""格式化时间为SRT格式"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds - int(seconds)) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
# 使用示例
client = SenseVoiceClient()
generator = VideoSubtitleGenerator(client)
# 为视频生成字幕
srt_file = generator.generate_subtitles(
"presentation.mp4",
language="en" # 指定英语
)
print(f"字幕已生成: {srt_file}")
4.2 性能优化建议
建议一:批量处理优化
import concurrent.futures
from typing import List
class BatchProcessor:
"""批量处理器"""
def __init__(self, client, max_workers=4):
self.client = client
self.max_workers = max_workers
def process_batch(self, file_paths: List[str], language="auto") -> List[dict]:
"""
批量处理音频文件
Args:
file_paths: 音频文件路径列表
language: 语言代码
Returns:
处理结果列表
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 创建任务
future_to_file = {
executor.submit(self._process_single, file_path, language): file_path
for file_path in file_paths
}
# 收集结果
for future in concurrent.futures.as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
results.append({
'file': file_path,
'success': True,
'result': result
})
except Exception as e:
results.append({
'file': file_path,
'success': False,
'error': str(e)
})
return results
def _process_single(self, file_path: str, language: str) -> dict:
"""处理单个文件"""
return self.client.transcribe_file(file_path, language=language)
# 使用示例
client = SenseVoiceClient()
processor = BatchProcessor(client, max_workers=3)
# 批量处理文件
audio_files = ["audio1.wav", "audio2.mp3", "audio3.m4a"]
results = processor.process_batch(audio_files, language="auto")
# 输出结果
for result in results:
if result['success']:
print(f"{result['file']}: {result['result']['text'][:50]}...")
else:
print(f"{result['file']}: 失败 - {result['error']}")
建议二:缓存优化
import hashlib
import json
import os
from datetime import datetime, timedelta
class CachedTranscriber:
"""带缓存的转写器"""
def __init__(self, client, cache_dir=".sensevoice_cache", cache_days=7):
self.client = client
self.cache_dir = cache_dir
self.cache_days = cache_days
# 创建缓存目录
os.makedirs(cache_dir, exist_ok=True)
def transcribe_with_cache(self, file_path, language="auto", use_itn=True):
"""
带缓存的转写
Args:
file_path: 音频文件路径
language: 语言代码
use_itn: 是否启用逆文本正则化
Returns:
转写结果
"""
# 生成缓存键
cache_key = self._generate_cache_key(file_path, language, use_itn)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
# 检查缓存是否存在且未过期
if os.path.exists(cache_file):
cache_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file))
if cache_age.days < self.cache_days:
with open(cache_file, 'r', encoding='utf-8') as f:
print(f"使用缓存: {file_path}")
return json.load(f)
# 调用API转写
print(f"调用API: {file_path}")
result = self.client.transcribe_file(file_path, language=language, use_itn=use_itn)
# 保存到缓存
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
return result
def _generate_cache_key(self, file_path, language, use_itn):
"""生成缓存键"""
# 使用文件内容哈希作为基础
with open(file_path, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()
# 结合参数生成唯一键
params = f"{language}_{use_itn}"
return f"{file_hash}_{hashlib.md5(params.encode()).hexdigest()[:8]}"
def clear_old_cache(self):
"""清理过期缓存"""
now = datetime.now()
removed_count = 0
for filename in os.listdir(self.cache_dir):
filepath = os.path.join(self.cache_dir, filename)
if filename.endswith('.json'):
file_age = now - datetime.fromtimestamp(os.path.getmtime(filepath))
if file_age.days >= self.cache_days:
os.remove(filepath)
removed_count += 1
print(f"清理了 {removed_count} 个过期缓存文件")
# 使用示例
client = SenseVoiceClient()
cached_transcriber = CachedTranscriber(client, cache_days=30)
# 第一次调用会调用API
result1 = cached_transcriber.transcribe_with_cache("audio.wav", language="zh")
# 第二次调用相同文件会使用缓存
result2 = cached_transcriber.transcribe_with_cache("audio.wav", language="zh")
# 定期清理过期缓存
cached_transcriber.clear_old_cache()
4.3 错误处理与监控
import logging
from typing import Optional, Callable
import time
class RobustTranscriber:
"""健壮的转写器,带重试和监控"""
def __init__(self, client, max_retries=3, retry_delay=1):
self.client = client
self.max_retries = max_retries
self.retry_delay = retry_delay
self.logger = self._setup_logger()
def _setup_logger(self):
"""设置日志"""
logger = logging.getLogger("SenseVoiceTranscriber")
logger.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler("transcriber.log")
file_handler.setLevel(logging.INFO)
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
def transcribe_with_retry(
self,
file_path: str,
language: str = "auto",
use_itn: bool = True,
progress_callback: Optional[Callable] = None
) -> dict:
"""
带重试机制的转写
Args:
file_path: 音频文件路径
language: 语言代码
use_itn: 是否启用逆文本正则化
progress_callback: 进度回调函数
Returns:
转写结果
"""
last_error = None
for attempt in range(self.max_retries):
try:
if progress_callback:
progress_callback(f"第 {attempt + 1} 次尝试...")
self.logger.info(f"尝试转写 {file_path} (尝试 {attempt + 1}/{self.max_retries})")
# 调用转写
result = self.client.transcribe_file(file_path, language=language, use_itn=use_itn)
self.logger.info(f"成功转写 {file_path}: {len(result.get('text', ''))} 字符")
if progress_callback:
progress_callback("转写完成")
return result
except Exception as e:
last_error = e
self.logger.error(f"转写失败 (尝试 {attempt + 1}): {str(e)}")
if attempt < self.max_retries - 1:
# 等待后重试
wait_time = self.retry_delay * (2 ** attempt) # 指数退避
self.logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
# 所有重试都失败
error_msg = f"转写失败,已重试 {self.max_retries} 次: {str(last_error)}"
self.logger.error(error_msg)
raise Exception(error_msg)
def monitor_service(self, check_interval=60):
"""监控服务状态"""
import threading
def monitor():
while True:
try:
is_healthy = self.client.check_health()
status = "健康" if is_healthy else "异常"
self.logger.info(f"服务状态检查: {status}")
if not is_healthy:
self.logger.warning("服务异常,可能需要重启")
except Exception as e:
self.logger.error(f"服务监控错误: {str(e)}")
time.sleep(check_interval)
# 启动监控线程
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
self.logger.info("服务监控已启动")
return monitor_thread
# 使用示例
if __name__ == "__main__":
client = SenseVoiceClient()
robust_transcriber = RobustTranscriber(client, max_retries=3)
# 启动服务监控
robust_transcriber.monitor_service(check_interval=300) # 每5分钟检查一次
# 定义进度回调
def on_progress(message):
print(f"进度: {message}")
# 带重试和进度监控的转写
try:
result = robust_transcriber.transcribe_with_retry(
"important_audio.wav",
language="zh",
progress_callback=on_progress
)
print(f"转写成功: {result['text'][:100]}...")
except Exception as e:
print(f"转写失败: {e}")
5. 总结
通过本文的详细介绍,相信你已经掌握了SenseVoice语音识别服务的完整部署和调用方法。让我们回顾一下关键要点:
5.1 核心收获
部署简单快捷:SenseVoice服务提供了开箱即用的解决方案,只需几条命令就能启动完整的语音识别服务。无论是使用预构建的镜像,还是自己从零部署,整个过程都非常简单。
API设计友好:REST API接口设计简洁明了,支持多种音频格式和语言选项。通过简单的HTTP请求就能实现语音转文字功能,非常适合集成到各种应用中。
多语言支持强大:不仅支持中文、英语、日语、韩语、粤语等主要语言,还能自动检测超过50种语言,满足国际化应用的需求。
性能表现优秀:基于ONNX量化技术,模型体积小、推理速度快,10秒音频仅需约70毫秒处理时间,适合实时应用场景。
5.2 实用建议
在实际使用中,我有几个建议:
-
选择合适的部署方式:如果只是测试或小规模使用,可以直接使用提供的镜像。如果需要大规模部署,考虑使用Docker容器化部署,便于管理和扩展。
-
合理利用缓存:对于重复处理的音频文件,实现缓存机制可以显著提升响应速度,减少API调用。
-
注意错误处理:网络请求可能失败,服务可能暂时不可用。实现重试机制和优雅降级策略,确保应用的稳定性。
-
监控服务状态:在生产环境中,定期检查服务健康状态,设置告警机制,及时发现并解决问题。
-
考虑扩展性:如果预计有大量并发请求,可以考虑部署多个服务实例,使用负载均衡器分发请求。
5.3 扩展思考
SenseVoice服务不仅是一个语音转文字工具,它还可以作为更复杂应用的基础组件。你可以考虑:
-
结合大语言模型:将转写结果输入到LLM中,实现会议纪要自动总结、要点提取、行动项识别等功能。
-
实时字幕系统:结合WebRTC技术,构建实时的视频会议字幕系统。
-
语音数据分析:对大量语音数据进行批量处理和分析,挖掘有价值的信息。
-
智能客服系统:作为客服对话的语音输入模块,实现自动化的客户服务。
语音识别技术正在快速发展,像SenseVoice这样的开源解决方案让高质量语音转文字服务变得更加普及和易用。无论你是开发者、研究者还是普通用户,现在都可以轻松地将语音识别能力集成到自己的项目中。
希望本文能帮助你快速上手SenseVoice服务,在实际项目中发挥它的价值。如果在使用过程中遇到任何问题,或者有新的使用场景想要分享,欢迎交流讨论。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐

所有评论(0)