快速部署SenseVoice语音识别服务:REST API调用详解与实战

你是不是遇到过这样的场景?手头有一段会议录音需要整理成文字,或者有一段外语视频需要生成字幕,又或者想给自己的应用加上语音转文字的功能。传统方法要么需要手动听写,效率低下;要么需要调用昂贵的云端API,成本高昂。今天,我要分享一个可以部署在自己服务器上的解决方案——SenseVoice语音识别服务。

这个基于ONNX量化的多语言语音识别模型,不仅支持中文、英语、日语、韩语和粤语,还能自动检测超过50种语言。更棒的是,它提供了简单易用的REST API接口,让你能像调用普通Web服务一样,轻松实现语音转文字功能。接下来,我就带你从零开始,一步步部署这个服务,并详细讲解如何通过API进行调用。

1. 环境准备与一键部署

在开始调用API之前,我们需要先把服务跑起来。SenseVoice镜像已经帮我们做好了大部分准备工作,部署过程非常简单。

1.1 系统要求与依赖检查

首先确认你的环境满足基本要求:

  • Python版本:建议使用Python 3.8或更高版本
  • 内存要求:至少2GB可用内存(模型本身约230MB)
  • 磁盘空间:预留500MB空间用于模型和依赖
  • 网络连接:首次运行需要下载模型文件(如果本地没有缓存)

如果你使用的是提供的镜像,这些依赖通常已经预装好了。可以通过以下命令快速检查:

# 检查Python版本
python3 --version

# 检查关键依赖
pip list | grep -E "funasr-onnx|gradio|fastapi"

1.2 快速启动服务

部署SenseVoice服务只需要几条简单的命令。服务提供了两种启动方式:一种是带Web界面的完整服务,另一种是纯API服务。

方式一:启动完整服务(推荐)

这是最常用的方式,同时启动Web界面和API服务:

# 进入项目目录(如果不在该目录下)
cd /path/to/sensevoice-service

# 安装依赖(如果尚未安装)
pip install funasr-onnx gradio fastapi uvicorn soundfile jieba

# 启动服务
python3 app.py --host 0.0.0.0 --port 7860

启动成功后,你会看到类似下面的输出:

Running on local URL:  http://0.0.0.0:7860
Running on public URL: https://xxxx.gradio.live

方式二:仅启动API服务

如果你只需要API功能,不需要Web界面,可以使用这个简化版本:

# 创建一个简单的API服务文件 api_server.py
cat > api_server.py << 'EOF'
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse
from funasr_onnx import SenseVoiceSmall
import os

app = FastAPI()

# 初始化模型
model_path = "/root/ai-models/danieldong/sensevoice-small-onnx-quant"
model = SenseVoiceSmall(model_path, batch_size=10, quantize=True)

@app.post("/api/transcribe")
async def transcribe_audio(
    file: UploadFile = File(...),
    language: str = Form("auto"),
    use_itn: bool = Form(True)
):
    # 保存上传的文件
    temp_path = f"/tmp/{file.filename}"
    with open(temp_path, "wb") as f:
        content = await file.read()
        f.write(content)
    
    # 调用模型进行识别
    result = model([temp_path], language=language, use_itn=use_itn)
    
    # 清理临时文件
    os.remove(temp_path)
    
    return JSONResponse({
        "text": result[0]["text"],
        "language": result[0].get("language", "auto"),
        "timestamp": result[0].get("timestamp", [])
    })

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)
EOF

# 运行API服务
python3 api_server.py

1.3 验证服务状态

服务启动后,可以通过以下几种方式验证是否正常运行:

# 方法1:健康检查接口
curl http://localhost:7860/health

# 方法2:查看API文档
# 在浏览器中打开:http://localhost:7860/docs

# 方法3:测试Web界面
# 在浏览器中打开:http://localhost:7860

如果一切正常,健康检查接口会返回 {"status": "healthy"},API文档页面会显示所有可用的接口,Web界面则可以让你上传音频文件进行测试。

2. REST API接口详解

SenseVoice服务提供了简洁而强大的REST API接口,让我们可以轻松集成到各种应用中。下面我来详细讲解每个接口的使用方法。

2.1 核心接口:音频转写

这是最常用的接口,用于将音频文件转换为文字。

接口信息:

  • URL: POST /api/transcribe
  • Content-Type: multipart/form-data
  • 响应格式: JSON

请求参数:

参数名 类型 必填 默认值 说明
file File - 音频文件,支持wav、mp3、m4a、flac等格式
language String "auto" 语言代码,支持:auto(自动检测)、zh(中文)、en(英语)、yue(粤语)、ja(日语)、ko(韩语)
use_itn Boolean true 是否启用逆文本正则化,将"三点"转为"3点"等

响应格式:

{
  "text": "识别出的文字内容",
  "language": "检测到的语言代码",
  "timestamp": [
    [开始时间, 结束时间, "文字片段"],
    ...
  ]
}

2.2 使用curl进行测试

让我们用实际的例子来演示如何使用这个接口。首先准备一个测试音频文件,或者使用现有的音频文件。

示例1:基本转写

# 上传音频文件进行转写
curl -X POST "http://localhost:7860/api/transcribe" \
  -F "file=@test_audio.wav" \
  -F "language=auto" \
  -F "use_itn=true"

示例2:指定语言

如果你知道音频的语言,可以指定语言代码来提高识别准确率:

# 指定中文识别
curl -X POST "http://localhost:7860/api/transcribe" \
  -F "file=@chinese_audio.mp3" \
  -F "language=zh" \
  -F "use_itn=true"

# 指定英语识别
curl -X POST "http://localhost:7860/api/transcribe" \
  -F "file=@english_audio.m4a" \
  -F "language=en" \
  -F "use_itn=true"

示例3:禁用逆文本正则化

有些场景下,你可能需要原始的文字输出:

# 禁用ITN,获取原始识别结果
curl -X POST "http://localhost:7860/api/transcribe" \
  -F "file=@audio.wav" \
  -F "language=auto" \
  -F "use_itn=false"

2.3 批量处理接口

虽然官方接口没有直接提供批量处理,但我们可以通过简单的脚本实现批量音频转写:

import requests
import os
import json

def batch_transcribe(audio_folder, api_url="http://localhost:7860/api/transcribe"):
    """批量转写文件夹中的所有音频文件"""
    results = {}
    
    # 支持的音频格式
    audio_extensions = ['.wav', '.mp3', '.m4a', '.flac', '.ogg']
    
    for filename in os.listdir(audio_folder):
        if any(filename.lower().endswith(ext) for ext in audio_extensions):
            file_path = os.path.join(audio_folder, filename)
            
            print(f"正在处理: {filename}")
            
            try:
                with open(file_path, 'rb') as f:
                    files = {'file': (filename, f, 'audio/wav')}
                    data = {'language': 'auto', 'use_itn': True}
                    
                    response = requests.post(api_url, files=files, data=data)
                    
                    if response.status_code == 200:
                        result = response.json()
                        results[filename] = {
                            'text': result['text'],
                            'language': result['language']
                        }
                        print(f"✓ 完成: {filename}")
                    else:
                        print(f"✗ 失败: {filename} - {response.text}")
                        
            except Exception as e:
                print(f"✗ 错误: {filename} - {str(e)}")
    
    return results

# 使用示例
if __name__ == "__main__":
    # 转写指定文件夹中的所有音频
    results = batch_transcribe("/path/to/audio/files")
    
    # 保存结果到JSON文件
    with open("transcription_results.json", "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    
    print(f"批量处理完成,共处理 {len(results)} 个文件")

3. Python客户端调用实战

除了直接调用REST API,我们还可以使用Python编写更复杂的客户端程序。下面我提供几个实用的示例。

3.1 基础Python客户端

这是一个完整的Python客户端类,封装了所有常用的API调用:

import requests
import json
from typing import Optional, Dict, Any
import os

class SenseVoiceClient:
    """SenseVoice语音识别客户端"""
    
    def __init__(self, base_url: str = "http://localhost:7860"):
        """
        初始化客户端
        
        Args:
            base_url: SenseVoice服务地址
        """
        self.base_url = base_url.rstrip('/')
        self.transcribe_url = f"{self.base_url}/api/transcribe"
        self.health_url = f"{self.base_url}/health"
    
    def check_health(self) -> bool:
        """检查服务是否健康"""
        try:
            response = requests.get(self.health_url, timeout=5)
            return response.status_code == 200 and response.json().get("status") == "healthy"
        except:
            return False
    
    def transcribe_file(
        self,
        file_path: str,
        language: str = "auto",
        use_itn: bool = True
    ) -> Dict[str, Any]:
        """
        转写音频文件
        
        Args:
            file_path: 音频文件路径
            language: 语言代码
            use_itn: 是否启用逆文本正则化
            
        Returns:
            识别结果字典
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        with open(file_path, 'rb') as f:
            files = {'file': (os.path.basename(file_path), f)}
            data = {'language': language, 'use_itn': str(use_itn).lower()}
            
            response = requests.post(self.transcribe_url, files=files, data=data)
            
            if response.status_code == 200:
                return response.json()
            else:
                raise Exception(f"转写失败: {response.status_code} - {response.text}")
    
    def transcribe_bytes(
        self,
        audio_bytes: bytes,
        filename: str = "audio.wav",
        language: str = "auto",
        use_itn: bool = True
    ) -> Dict[str, Any]:
        """
        转写音频字节数据
        
        Args:
            audio_bytes: 音频字节数据
            filename: 文件名(用于Content-Disposition)
            language: 语言代码
            use_itn: 是否启用逆文本正则化
            
        Returns:
            识别结果字典
        """
        files = {'file': (filename, audio_bytes)}
        data = {'language': language, 'use_itn': str(use_itn).lower()}
        
        response = requests.post(self.transcribe_url, files=files, data=data)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"转写失败: {response.status_code} - {response.text}")
    
    def get_transcription_text(self, file_path: str, language: str = "auto") -> str:
        """
        快速获取转写文本(简化版)
        
        Args:
            file_path: 音频文件路径
            language: 语言代码
            
        Returns:
            转写文本
        """
        result = self.transcribe_file(file_path, language=language)
        return result.get("text", "")

# 使用示例
if __name__ == "__main__":
    # 创建客户端
    client = SenseVoiceClient()
    
    # 检查服务状态
    if client.check_health():
        print("✓ 服务状态正常")
    else:
        print("✗ 服务不可用")
        exit(1)
    
    # 转写单个文件
    try:
        result = client.transcribe_file("meeting_recording.wav", language="zh")
        print("转写结果:")
        print(f"文本: {result['text']}")
        print(f"语言: {result['language']}")
        
        # 如果有时间戳信息
        if 'timestamp' in result:
            print("时间戳:")
            for start, end, text in result['timestamp']:
                print(f"  [{start:.2f}s - {end:.2f}s]: {text}")
                
    except Exception as e:
        print(f"转写失败: {e}")

3.2 实时音频流处理

对于需要实时处理的场景,我们可以结合音频录制库实现实时转写:

import pyaudio
import wave
import threading
import queue
import time
from sensevoice_client import SenseVoiceClient

class RealTimeTranscriber:
    """实时音频转写器"""
    
    def __init__(self, client: SenseVoiceClient, chunk_duration: int = 5):
        """
        初始化实时转写器
        
        Args:
            client: SenseVoice客户端
            chunk_duration: 每个音频块的时长(秒)
        """
        self.client = client
        self.chunk_duration = chunk_duration
        self.audio_queue = queue.Queue()
        self.is_recording = False
        
        # 音频参数
        self.format = pyaudio.paInt16
        self.channels = 1
        self.rate = 16000  # 16kHz采样率
        
    def start_recording(self):
        """开始录制音频"""
        self.is_recording = True
        self.recording_thread = threading.Thread(target=self._record_audio)
        self.recording_thread.start()
        
        # 启动处理线程
        self.processing_thread = threading.Thread(target=self._process_audio)
        self.processing_thread.start()
        
        print("开始录制音频...")
    
    def stop_recording(self):
        """停止录制音频"""
        self.is_recording = False
        self.recording_thread.join()
        self.processing_thread.join()
        print("停止录制")
    
    def _record_audio(self):
        """录制音频线程"""
        p = pyaudio.PyAudio()
        stream = p.open(
            format=self.format,
            channels=self.channels,
            rate=self.rate,
            input=True,
            frames_per_buffer=int(self.rate * 0.1)  # 100ms的缓冲区
        )
        
        frames = []
        start_time = time.time()
        
        while self.is_recording:
            data = stream.read(int(self.rate * 0.1))  # 读取100ms数据
            frames.append(data)
            
            # 每chunk_duration秒发送一次音频数据
            if time.time() - start_time >= self.chunk_duration:
                if frames:
                    # 将音频数据保存为WAV格式
                    audio_data = b''.join(frames)
                    self.audio_queue.put(audio_data)
                    frames = []
                start_time = time.time()
        
        stream.stop_stream()
        stream.close()
        p.terminate()
    
    def _process_audio(self):
        """处理音频队列线程"""
        while self.is_recording or not self.audio_queue.empty():
            try:
                # 从队列获取音频数据(最多等待1秒)
                audio_data = self.audio_queue.get(timeout=1)
                
                # 转写音频
                result = self.client.transcribe_bytes(
                    audio_data,
                    filename="chunk.wav",
                    language="auto"
                )
                
                print(f"[{time.strftime('%H:%M:%S')}] {result['text']}")
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"处理音频时出错: {e}")

# 使用示例
if __name__ == "__main__":
    # 创建客户端
    client = SenseVoiceClient()
    
    # 创建实时转写器
    transcriber = RealTimeTranscriber(client, chunk_duration=3)
    
    try:
        # 开始实时转写
        transcriber.start_recording()
        
        # 录制30秒
        time.sleep(30)
        
    finally:
        # 停止录制
        transcriber.stop_recording()

3.3 集成到Web应用

如果你正在开发Web应用,可以这样集成SenseVoice服务:

from flask import Flask, request, jsonify, render_template
import requests
import os
from werkzeug.utils import secure_filename

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024  # 16MB限制

# SenseVoice服务配置
SENSEVOICE_API = "http://localhost:7860/api/transcribe"

@app.route('/')
def index():
    """首页"""
    return render_template('index.html')

@app.route('/upload', methods=['POST'])
def upload_audio():
    """上传并转写音频文件"""
    if 'audio' not in request.files:
        return jsonify({'error': '没有上传文件'}), 400
    
    file = request.files['audio']
    
    if file.filename == '':
        return jsonify({'error': '没有选择文件'}), 400
    
    # 保存上传的文件
    filename = secure_filename(file.filename)
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    file.save(filepath)
    
    try:
        # 调用SenseVoice API
        with open(filepath, 'rb') as f:
            files = {'file': (filename, f)}
            data = {
                'language': request.form.get('language', 'auto'),
                'use_itn': request.form.get('use_itn', 'true')
            }
            
            response = requests.post(SENSEVOICE_API, files=files, data=data)
            
            if response.status_code == 200:
                result = response.json()
                
                # 返回转写结果
                return jsonify({
                    'success': True,
                    'text': result['text'],
                    'language': result.get('language', 'auto'),
                    'filename': filename
                })
            else:
                return jsonify({
                    'error': f'转写失败: {response.status_code}',
                    'details': response.text
                }), 500
                
    except Exception as e:
        return jsonify({'error': str(e)}), 500
    
    finally:
        # 清理临时文件
        if os.path.exists(filepath):
            os.remove(filepath)

@app.route('/batch_upload', methods=['POST'])
def batch_upload():
    """批量上传音频文件"""
    if 'audio_files' not in request.files:
        return jsonify({'error': '没有上传文件'}), 400
    
    files = request.files.getlist('audio_files')
    language = request.form.get('language', 'auto')
    results = []
    
    for file in files:
        if file.filename:
            filename = secure_filename(file.filename)
            filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
            file.save(filepath)
            
            try:
                with open(filepath, 'rb') as f:
                    files_data = {'file': (filename, f)}
                    data = {'language': language, 'use_itn': 'true'}
                    
                    response = requests.post(SENSEVOICE_API, files=files_data, data=data)
                    
                    if response.status_code == 200:
                        result = response.json()
                        results.append({
                            'filename': filename,
                            'text': result['text'],
                            'language': result.get('language', 'auto')
                        })
                    else:
                        results.append({
                            'filename': filename,
                            'error': f'转写失败: {response.status_code}'
                        })
                        
            except Exception as e:
                results.append({
                    'filename': filename,
                    'error': str(e)
                })
            
            finally:
                if os.path.exists(filepath):
                    os.remove(filepath)
    
    return jsonify({
        'success': True,
        'total': len(results),
        'results': results
    })

if __name__ == '__main__':
    # 创建上传目录
    os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
    
    # 启动Flask应用
    app.run(host='0.0.0.0', port=5000, debug=True)

对应的HTML模板(templates/index.html):

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SenseVoice语音转写</title>
    <style>
        body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
        .container { background: #f5f5f5; padding: 20px; border-radius: 10px; }
        h1 { color: #333; }
        .upload-section { margin: 20px 0; }
        .result-section { margin-top: 30px; }
        textarea { width: 100%; height: 150px; margin: 10px 0; padding: 10px; }
        button { background: #007bff; color: white; border: none; padding: 10px 20px; cursor: pointer; }
        button:hover { background: #0056b3; }
        .loading { display: none; color: #666; }
    </style>
</head>
<body>
    <div class="container">
        <h1>SenseVoice语音转写服务</h1>
        
        <div class="upload-section">
            <h3>上传音频文件</h3>
            <form id="uploadForm">
                <input type="file" id="audioFile" accept="audio/*" required>
                <br><br>
                
                <label>语言设置:</label>
                <select id="language">
                    <option value="auto">自动检测</option>
                    <option value="zh">中文</option>
                    <option value="en">英语</option>
                    <option value="yue">粤语</option>
                    <option value="ja">日语</option>
                    <option value="ko">韩语</option>
                </select>
                
                <br><br>
                <button type="submit">开始转写</button>
                <div id="loading" class="loading">处理中,请稍候...</div>
            </form>
        </div>
        
        <div class="result-section">
            <h3>转写结果</h3>
            <textarea id="resultText" readonly placeholder="转写结果将显示在这里..."></textarea>
            <div id="resultInfo"></div>
        </div>
    </div>

    <script>
        document.getElementById('uploadForm').addEventListener('submit', async function(e) {
            e.preventDefault();
            
            const fileInput = document.getElementById('audioFile');
            const language = document.getElementById('language').value;
            const resultText = document.getElementById('resultText');
            const resultInfo = document.getElementById('resultInfo');
            const loading = document.getElementById('loading');
            
            if (!fileInput.files[0]) {
                alert('请选择音频文件');
                return;
            }
            
            const formData = new FormData();
            formData.append('audio', fileInput.files[0]);
            formData.append('language', language);
            formData.append('use_itn', 'true');
            
            // 显示加载状态
            loading.style.display = 'block';
            resultText.value = '';
            resultInfo.innerHTML = '';
            
            try {
                const response = await fetch('/upload', {
                    method: 'POST',
                    body: formData
                });
                
                const data = await response.json();
                
                if (data.success) {
                    resultText.value = data.text;
                    resultInfo.innerHTML = `
                        <p>语言: ${data.language}</p>
                        <p>文件名: ${data.filename}</p>
                    `;
                } else {
                    resultText.value = `错误: ${data.error}`;
                }
            } catch (error) {
                resultText.value = `请求失败: ${error.message}`;
            } finally {
                loading.style.display = 'none';
            }
        });
    </script>
</body>
</html>

4. 实际应用场景与最佳实践

现在我们已经掌握了如何部署和调用SenseVoice服务,接下来看看在实际项目中如何应用,以及一些最佳实践建议。

4.1 常见应用场景

场景一:会议记录自动化

class MeetingTranscriber:
    """会议记录自动化工具"""
    
    def __init__(self, client):
        self.client = client
    
    def process_meeting_recording(self, audio_path, output_format="txt"):
        """
        处理会议录音
        
        Args:
            audio_path: 音频文件路径
            output_format: 输出格式,支持txt、srt、json
            
        Returns:
            转换后的文本
        """
        # 转写音频
        result = self.client.transcribe_file(audio_path, language="zh")
        
        # 根据格式输出
        if output_format == "txt":
            return self._format_as_txt(result)
        elif output_format == "srt":
            return self._format_as_srt(result)
        elif output_format == "json":
            return self._format_as_json(result)
        else:
            return result['text']
    
    def _format_as_txt(self, result):
        """格式化为纯文本"""
        text = result['text']
        
        # 添加简单的分段(每50个字符换行)
        lines = []
        for i in range(0, len(text), 50):
            lines.append(text[i:i+50])
        
        return "\n".join(lines)
    
    def _format_as_srt(self, result):
        """格式化为SRT字幕格式"""
        if 'timestamp' not in result:
            return result['text']
        
        srt_content = []
        for i, (start, end, text_segment) in enumerate(result['timestamp'], 1):
            # 转换时间格式
            start_time = self._seconds_to_srt_time(start)
            end_time = self._seconds_to_srt_time(end)
            
            srt_content.append(f"{i}")
            srt_content.append(f"{start_time} --> {end_time}")
            srt_content.append(text_segment)
            srt_content.append("")  # 空行分隔
        
        return "\n".join(srt_content)
    
    def _seconds_to_srt_time(self, seconds):
        """秒数转换为SRT时间格式"""
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        millis = int((seconds - int(seconds)) * 1000)
        
        return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
    
    def _format_as_json(self, result):
        """格式化为JSON"""
        import json
        return json.dumps(result, ensure_ascii=False, indent=2)

# 使用示例
client = SenseVoiceClient()
transcriber = MeetingTranscriber(client)

# 处理会议录音
meeting_text = transcriber.process_meeting_recording(
    "meeting_20240520.wav",
    output_format="srt"
)

# 保存为字幕文件
with open("meeting_20240520.srt", "w", encoding="utf-8") as f:
    f.write(meeting_text)

场景二:多语言视频字幕生成

import subprocess
import os

class VideoSubtitleGenerator:
    """视频字幕生成器"""
    
    def __init__(self, client):
        self.client = client
    
    def generate_subtitles(self, video_path, language="auto"):
        """
        为视频生成字幕
        
        Args:
            video_path: 视频文件路径
            language: 音频语言
            
        Returns:
            字幕文件路径
        """
        # 提取音频
        audio_path = self._extract_audio(video_path)
        
        try:
            # 转写音频
            result = self.client.transcribe_file(audio_path, language=language)
            
            # 生成SRT字幕
            if 'timestamp' in result:
                srt_content = self._create_srt_from_timestamps(result['timestamp'])
            else:
                # 如果没有时间戳,生成简单的字幕
                srt_content = self._create_simple_srt(result['text'])
            
            # 保存字幕文件
            base_name = os.path.splitext(video_path)[0]
            srt_path = f"{base_name}.srt"
            
            with open(srt_path, "w", encoding="utf-8") as f:
                f.write(srt_content)
            
            return srt_path
            
        finally:
            # 清理临时音频文件
            if os.path.exists(audio_path):
                os.remove(audio_path)
    
    def _extract_audio(self, video_path):
        """使用ffmpeg提取音频"""
        base_name = os.path.splitext(video_path)[0]
        audio_path = f"{base_name}_temp.wav"
        
        # 提取音频为WAV格式
        cmd = [
            "ffmpeg", "-i", video_path,
            "-vn", "-acodec", "pcm_s16le",
            "-ar", "16000", "-ac", "1",
            "-y", audio_path
        ]
        
        try:
            subprocess.run(cmd, check=True, capture_output=True)
            return audio_path
        except subprocess.CalledProcessError as e:
            raise Exception(f"音频提取失败: {e.stderr.decode()}")
    
    def _create_srt_from_timestamps(self, timestamps):
        """根据时间戳生成SRT字幕"""
        srt_lines = []
        
        for i, (start, end, text) in enumerate(timestamps, 1):
            # 格式化时间
            start_str = self._format_time(start)
            end_str = self._format_time(end)
            
            srt_lines.append(str(i))
            srt_lines.append(f"{start_str} --> {end_str}")
            srt_lines.append(text)
            srt_lines.append("")  # 空行
        
        return "\n".join(srt_lines)
    
    def _create_simple_srt(self, text, chunk_duration=5):
        """为没有时间戳的文本生成简单字幕"""
        # 将文本按标点分割
        import re
        sentences = re.split(r'[。!?.!?]', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        srt_lines = []
        current_time = 0
        
        for i, sentence in enumerate(sentences, 1):
            start_time = current_time
            end_time = start_time + chunk_duration
            current_time = end_time
            
            start_str = self._format_time(start_time)
            end_str = self._format_time(end_time)
            
            srt_lines.append(str(i))
            srt_lines.append(f"{start_str} --> {end_str}")
            srt_lines.append(sentence)
            srt_lines.append("")
        
        return "\n".join(srt_lines)
    
    def _format_time(self, seconds):
        """格式化时间为SRT格式"""
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        millis = int((seconds - int(seconds)) * 1000)
        
        return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"

# 使用示例
client = SenseVoiceClient()
generator = VideoSubtitleGenerator(client)

# 为视频生成字幕
srt_file = generator.generate_subtitles(
    "presentation.mp4",
    language="en"  # 指定英语
)

print(f"字幕已生成: {srt_file}")

4.2 性能优化建议

建议一:批量处理优化

import concurrent.futures
from typing import List

class BatchProcessor:
    """批量处理器"""
    
    def __init__(self, client, max_workers=4):
        self.client = client
        self.max_workers = max_workers
    
    def process_batch(self, file_paths: List[str], language="auto") -> List[dict]:
        """
        批量处理音频文件
        
        Args:
            file_paths: 音频文件路径列表
            language: 语言代码
            
        Returns:
            处理结果列表
        """
        results = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 创建任务
            future_to_file = {
                executor.submit(self._process_single, file_path, language): file_path
                for file_path in file_paths
            }
            
            # 收集结果
            for future in concurrent.futures.as_completed(future_to_file):
                file_path = future_to_file[future]
                try:
                    result = future.result()
                    results.append({
                        'file': file_path,
                        'success': True,
                        'result': result
                    })
                except Exception as e:
                    results.append({
                        'file': file_path,
                        'success': False,
                        'error': str(e)
                    })
        
        return results
    
    def _process_single(self, file_path: str, language: str) -> dict:
        """处理单个文件"""
        return self.client.transcribe_file(file_path, language=language)

# 使用示例
client = SenseVoiceClient()
processor = BatchProcessor(client, max_workers=3)

# 批量处理文件
audio_files = ["audio1.wav", "audio2.mp3", "audio3.m4a"]
results = processor.process_batch(audio_files, language="auto")

# 输出结果
for result in results:
    if result['success']:
        print(f"{result['file']}: {result['result']['text'][:50]}...")
    else:
        print(f"{result['file']}: 失败 - {result['error']}")

建议二:缓存优化

import hashlib
import json
import os
from datetime import datetime, timedelta

class CachedTranscriber:
    """带缓存的转写器"""
    
    def __init__(self, client, cache_dir=".sensevoice_cache", cache_days=7):
        self.client = client
        self.cache_dir = cache_dir
        self.cache_days = cache_days
        
        # 创建缓存目录
        os.makedirs(cache_dir, exist_ok=True)
    
    def transcribe_with_cache(self, file_path, language="auto", use_itn=True):
        """
        带缓存的转写
        
        Args:
            file_path: 音频文件路径
            language: 语言代码
            use_itn: 是否启用逆文本正则化
            
        Returns:
            转写结果
        """
        # 生成缓存键
        cache_key = self._generate_cache_key(file_path, language, use_itn)
        cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
        
        # 检查缓存是否存在且未过期
        if os.path.exists(cache_file):
            cache_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file))
            if cache_age.days < self.cache_days:
                with open(cache_file, 'r', encoding='utf-8') as f:
                    print(f"使用缓存: {file_path}")
                    return json.load(f)
        
        # 调用API转写
        print(f"调用API: {file_path}")
        result = self.client.transcribe_file(file_path, language=language, use_itn=use_itn)
        
        # 保存到缓存
        with open(cache_file, 'w', encoding='utf-8') as f:
            json.dump(result, f, ensure_ascii=False, indent=2)
        
        return result
    
    def _generate_cache_key(self, file_path, language, use_itn):
        """生成缓存键"""
        # 使用文件内容哈希作为基础
        with open(file_path, 'rb') as f:
            file_hash = hashlib.md5(f.read()).hexdigest()
        
        # 结合参数生成唯一键
        params = f"{language}_{use_itn}"
        return f"{file_hash}_{hashlib.md5(params.encode()).hexdigest()[:8]}"
    
    def clear_old_cache(self):
        """清理过期缓存"""
        now = datetime.now()
        removed_count = 0
        
        for filename in os.listdir(self.cache_dir):
            filepath = os.path.join(self.cache_dir, filename)
            if filename.endswith('.json'):
                file_age = now - datetime.fromtimestamp(os.path.getmtime(filepath))
                if file_age.days >= self.cache_days:
                    os.remove(filepath)
                    removed_count += 1
        
        print(f"清理了 {removed_count} 个过期缓存文件")

# 使用示例
client = SenseVoiceClient()
cached_transcriber = CachedTranscriber(client, cache_days=30)

# 第一次调用会调用API
result1 = cached_transcriber.transcribe_with_cache("audio.wav", language="zh")

# 第二次调用相同文件会使用缓存
result2 = cached_transcriber.transcribe_with_cache("audio.wav", language="zh")

# 定期清理过期缓存
cached_transcriber.clear_old_cache()

4.3 错误处理与监控

import logging
from typing import Optional, Callable
import time

class RobustTranscriber:
    """健壮的转写器,带重试和监控"""
    
    def __init__(self, client, max_retries=3, retry_delay=1):
        self.client = client
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        """设置日志"""
        logger = logging.getLogger("SenseVoiceTranscriber")
        logger.setLevel(logging.INFO)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # 文件处理器
        file_handler = logging.FileHandler("transcriber.log")
        file_handler.setLevel(logging.INFO)
        
        # 格式化
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        console_handler.setFormatter(formatter)
        file_handler.setFormatter(formatter)
        
        logger.addHandler(console_handler)
        logger.addHandler(file_handler)
        
        return logger
    
    def transcribe_with_retry(
        self,
        file_path: str,
        language: str = "auto",
        use_itn: bool = True,
        progress_callback: Optional[Callable] = None
    ) -> dict:
        """
        带重试机制的转写
        
        Args:
            file_path: 音频文件路径
            language: 语言代码
            use_itn: 是否启用逆文本正则化
            progress_callback: 进度回调函数
            
        Returns:
            转写结果
        """
        last_error = None
        
        for attempt in range(self.max_retries):
            try:
                if progress_callback:
                    progress_callback(f"第 {attempt + 1} 次尝试...")
                
                self.logger.info(f"尝试转写 {file_path} (尝试 {attempt + 1}/{self.max_retries})")
                
                # 调用转写
                result = self.client.transcribe_file(file_path, language=language, use_itn=use_itn)
                
                self.logger.info(f"成功转写 {file_path}: {len(result.get('text', ''))} 字符")
                
                if progress_callback:
                    progress_callback("转写完成")
                
                return result
                
            except Exception as e:
                last_error = e
                self.logger.error(f"转写失败 (尝试 {attempt + 1}): {str(e)}")
                
                if attempt < self.max_retries - 1:
                    # 等待后重试
                    wait_time = self.retry_delay * (2 ** attempt)  # 指数退避
                    self.logger.info(f"等待 {wait_time} 秒后重试...")
                    time.sleep(wait_time)
        
        # 所有重试都失败
        error_msg = f"转写失败,已重试 {self.max_retries} 次: {str(last_error)}"
        self.logger.error(error_msg)
        raise Exception(error_msg)
    
    def monitor_service(self, check_interval=60):
        """监控服务状态"""
        import threading
        
        def monitor():
            while True:
                try:
                    is_healthy = self.client.check_health()
                    status = "健康" if is_healthy else "异常"
                    self.logger.info(f"服务状态检查: {status}")
                    
                    if not is_healthy:
                        self.logger.warning("服务异常,可能需要重启")
                        
                except Exception as e:
                    self.logger.error(f"服务监控错误: {str(e)}")
                
                time.sleep(check_interval)
        
        # 启动监控线程
        monitor_thread = threading.Thread(target=monitor, daemon=True)
        monitor_thread.start()
        self.logger.info("服务监控已启动")
        
        return monitor_thread

# 使用示例
if __name__ == "__main__":
    client = SenseVoiceClient()
    robust_transcriber = RobustTranscriber(client, max_retries=3)
    
    # 启动服务监控
    robust_transcriber.monitor_service(check_interval=300)  # 每5分钟检查一次
    
    # 定义进度回调
    def on_progress(message):
        print(f"进度: {message}")
    
    # 带重试和进度监控的转写
    try:
        result = robust_transcriber.transcribe_with_retry(
            "important_audio.wav",
            language="zh",
            progress_callback=on_progress
        )
        print(f"转写成功: {result['text'][:100]}...")
        
    except Exception as e:
        print(f"转写失败: {e}")

5. 总结

通过本文的详细介绍,相信你已经掌握了SenseVoice语音识别服务的完整部署和调用方法。让我们回顾一下关键要点:

5.1 核心收获

部署简单快捷:SenseVoice服务提供了开箱即用的解决方案,只需几条命令就能启动完整的语音识别服务。无论是使用预构建的镜像,还是自己从零部署,整个过程都非常简单。

API设计友好:REST API接口设计简洁明了,支持多种音频格式和语言选项。通过简单的HTTP请求就能实现语音转文字功能,非常适合集成到各种应用中。

多语言支持强大:不仅支持中文、英语、日语、韩语、粤语等主要语言,还能自动检测超过50种语言,满足国际化应用的需求。

性能表现优秀:基于ONNX量化技术,模型体积小、推理速度快,10秒音频仅需约70毫秒处理时间,适合实时应用场景。

5.2 实用建议

在实际使用中,我有几个建议:

  1. 选择合适的部署方式:如果只是测试或小规模使用,可以直接使用提供的镜像。如果需要大规模部署,考虑使用Docker容器化部署,便于管理和扩展。

  2. 合理利用缓存:对于重复处理的音频文件,实现缓存机制可以显著提升响应速度,减少API调用。

  3. 注意错误处理:网络请求可能失败,服务可能暂时不可用。实现重试机制和优雅降级策略,确保应用的稳定性。

  4. 监控服务状态:在生产环境中,定期检查服务健康状态,设置告警机制,及时发现并解决问题。

  5. 考虑扩展性:如果预计有大量并发请求,可以考虑部署多个服务实例,使用负载均衡器分发请求。

5.3 扩展思考

SenseVoice服务不仅是一个语音转文字工具,它还可以作为更复杂应用的基础组件。你可以考虑:

  • 结合大语言模型:将转写结果输入到LLM中,实现会议纪要自动总结、要点提取、行动项识别等功能。

  • 实时字幕系统:结合WebRTC技术,构建实时的视频会议字幕系统。

  • 语音数据分析:对大量语音数据进行批量处理和分析,挖掘有价值的信息。

  • 智能客服系统:作为客服对话的语音输入模块,实现自动化的客户服务。

语音识别技术正在快速发展,像SenseVoice这样的开源解决方案让高质量语音转文字服务变得更加普及和易用。无论你是开发者、研究者还是普通用户,现在都可以轻松地将语音识别能力集成到自己的项目中。

希望本文能帮助你快速上手SenseVoice服务,在实际项目中发挥它的价值。如果在使用过程中遇到任何问题,或者有新的使用场景想要分享,欢迎交流讨论。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐