Qwen3-ASR-0.6B实时语音转写:WebSocket流式传输实现

想象一下,你正在主持一场线上会议,参会者来自世界各地,说着不同的语言和方言。会议进行到一半,突然有人问:“刚才那段中文方言说的是什么?” 或者,你正在直播一场产品发布会,需要实时生成字幕,让全球观众都能跟上节奏。

传统的语音转写方案,要么需要等整段音频上传完才能处理,要么就是延迟高得让人着急。而今天要聊的Qwen3-ASR-0.6B配合WebSocket流式传输,能让你在音频还在录制的时候,就看到文字在屏幕上“流”出来,就像有人在同步打字一样。

1. 为什么需要实时语音转写?

我们先来看看几个真实的场景。

在线会议场景:一场跨国团队会议,有说英语的、说中文的、还有说粤语的同事。传统方案是会议结束后上传录音文件,等几分钟甚至更久才能拿到文字稿。但会议中如果有人没听清,或者需要即时确认某个细节,这种延迟就让人很头疼。

直播字幕场景:一场技术分享直播,观众里有听力障碍的朋友,也有想边看边做笔记的开发者。如果字幕延迟十几秒,观众就会觉得跟不上节奏,体验大打折扣。

客服对话场景:智能客服系统需要实时理解用户语音,给出准确回应。如果转写延迟太高,用户说完了还得等几秒才能得到回复,这种“卡顿感”会让用户觉得系统不够智能。

这些场景都有一个共同的需求:低延迟、高准确率、支持多语言。而Qwen3-ASR-0.6B正好能满足这些需求——它支持52种语言和方言,包括22种中国方言,而且0.6B的模型大小在效率和性能之间找到了很好的平衡。

但光有好的模型还不够,传输方式也很关键。这就是为什么我们要用WebSocket来实现流式传输。

2. WebSocket:实时通信的“高速公路”

你可能听说过HTTP,那是我们平时浏览网页用的协议。HTTP有个特点:每次请求都要“握手”,服务器响应完就“断开”。这种“一问一答”的模式,对于实时传输音频流来说,效率太低了。

WebSocket就不一样了。它像是一条持久连接的高速公路,一旦建立连接,数据就可以双向、持续地流动。对于语音转写来说,这意味着:

  • 客户端可以一边录音,一边把音频数据切成小块,源源不断地发送给服务器
  • 服务器收到一点音频,就处理一点,然后把转写结果实时返回
  • 整个过程几乎没有延迟,用户几乎感觉不到等待

更重要的是,WebSocket是全双工的——客户端和服务器可以同时发送和接收数据。这为实时交互提供了可能,比如在转写过程中随时调整参数,或者根据转写结果触发其他操作。

3. 搭建实时转写系统:从零开始

好了,理论说完了,我们来看看具体怎么实现。我会用一个Python示例来展示完整的流程,你可以根据自己的需求调整。

3.1 环境准备

首先,你需要安装必要的依赖。我建议创建一个虚拟环境,避免包冲突:

# 创建虚拟环境
python -m venv asr_env
source asr_env/bin/activate  # Linux/Mac
# 或者 asr_env\Scripts\activate  # Windows

# 安装核心依赖
pip install qwen-asr websockets aiohttp sounddevice numpy

这里用到的几个包:

  • qwen-asr:Qwen3-ASR的Python接口
  • websockets:WebSocket客户端和服务器库
  • aiohttp:异步HTTP客户端/服务器
  • sounddevice:音频录制
  • numpy:音频数据处理

3.2 WebSocket服务器端实现

服务器端负责接收音频流、调用Qwen3-ASR模型、返回转写结果。我们用一个简单的异步服务器来实现:

# server.py
import asyncio
import websockets
import json
import torch
from qwen_asr import Qwen3ASRModel
import io
import wave

class ASRServer:
    def __init__(self):
        # 加载模型 - 这里使用0.6B版本,平衡性能和效率
        self.model = Qwen3ASRModel.from_pretrained(
            "Qwen/Qwen3-ASR-0.6B",
            dtype=torch.bfloat16,
            device_map="cuda:0",  # 如果有GPU
            max_inference_batch_size=32,
            max_new_tokens=256,
        )
        print("模型加载完成,等待连接...")
    
    async def process_audio_chunk(self, audio_data, sample_rate=16000):
        """处理单个音频块"""
        try:
            # 将字节数据转换为numpy数组
            import numpy as np
            audio_array = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
            
            # 调用模型进行转写
            results = self.model.transcribe(
                audio=audio_array,
                sample_rate=sample_rate,
                language=None,  # 自动检测语言
                streaming=True,  # 启用流式模式
            )
            
            # 返回转写结果
            if results and len(results) > 0:
                return {
                    "text": results[0].text,
                    "language": results[0].language,
                    "confidence": getattr(results[0], 'confidence', 0.9)
                }
            return {"text": "", "language": "unknown", "confidence": 0.0}
            
        except Exception as e:
            print(f"处理音频时出错: {e}")
            return {"error": str(e)}
    
    async def handle_client(self, websocket, path):
        """处理客户端连接"""
        print(f"客户端连接: {websocket.remote_address}")
        
        try:
            async for message in websocket:
                # 解析消息
                data = json.loads(message)
                
                if data.get("type") == "audio_chunk":
                    # 处理音频数据
                    audio_data = bytes(data["data"])
                    sample_rate = data.get("sample_rate", 16000)
                    
                    # 处理并返回结果
                    result = await self.process_audio_chunk(audio_data, sample_rate)
                    await websocket.send(json.dumps({
                        "type": "transcription",
                        "data": result
                    }))
                    
                elif data.get("type") == "end_stream":
                    # 流结束,可以做一些清理工作
                    await websocket.send(json.dumps({
                        "type": "stream_end",
                        "message": "转写完成"
                    }))
                    
        except websockets.exceptions.ConnectionClosed:
            print("客户端断开连接")
        except Exception as e:
            print(f"处理连接时出错: {e}")

async def main():
    server = ASRServer()
    
    # 启动WebSocket服务器
    async with websockets.serve(
        server.handle_client,
        "0.0.0.0",  # 监听所有接口
        8765,  # 端口号
        ping_interval=20,
        ping_timeout=60
    ):
        print("WebSocket服务器启动在 ws://0.0.0.0:8765")
        await asyncio.Future()  # 永久运行

if __name__ == "__main__":
    asyncio.run(main())

这个服务器做了几件事:

  1. 启动时加载Qwen3-ASR-0.6B模型
  2. 监听WebSocket连接
  3. 接收音频数据块
  4. 调用模型进行流式转写
  5. 实时返回转写结果

3.3 客户端实现:实时录音和传输

客户端需要录制音频、分块、通过WebSocket发送。我们用一个简单的Python客户端:

# client.py
import asyncio
import websockets
import json
import sounddevice as sd
import numpy as np
from queue import Queue
import threading

class AudioRecorder:
    def __init__(self, sample_rate=16000, chunk_duration=0.5):
        self.sample_rate = sample_rate
        self.chunk_size = int(sample_rate * chunk_duration)  # 每块0.5秒
        self.audio_queue = Queue()
        self.recording = False
        
    def callback(self, indata, frames, time, status):
        """音频回调函数"""
        if status:
            print(f"音频录制状态: {status}")
        if self.recording:
            # 将音频数据放入队列
            self.audio_queue.put(indata.copy())
    
    def start_recording(self):
        """开始录音"""
        self.recording = True
        self.stream = sd.InputStream(
            callback=self.callback,
            channels=1,  # 单声道
            samplerate=self.sample_rate,
            dtype='int16'
        )
        self.stream.start()
        print("开始录音...")
    
    def stop_recording(self):
        """停止录音"""
        self.recording = False
        if hasattr(self, 'stream'):
            self.stream.stop()
            self.stream.close()
        print("停止录音")
    
    def get_chunk(self):
        """获取一个音频块"""
        if not self.audio_queue.empty():
            return self.audio_queue.get()
        return None

async def send_audio_stream(websocket, recorder):
    """发送音频流到服务器"""
    print("开始发送音频流...")
    
    while recorder.recording:
        chunk = recorder.get_chunk()
        if chunk is not None:
            # 准备数据
            audio_data = chunk.tobytes()
            
            # 发送到服务器
            await websocket.send(json.dumps({
                "type": "audio_chunk",
                "data": list(audio_data),  # 转换为列表便于JSON序列化
                "sample_rate": recorder.sample_rate
            }))
            
            # 控制发送频率,避免太快
            await asyncio.sleep(0.1)
    
    # 发送流结束信号
    await websocket.send(json.dumps({
        "type": "end_stream"
    }))

async def receive_transcriptions(websocket):
    """接收转写结果"""
    try:
        async for message in websocket:
            data = json.loads(message)
            
            if data.get("type") == "transcription":
                result = data["data"]
                if "text" in result and result["text"].strip():
                    print(f"\r转写结果: {result['text']}", end="", flush=True)
                    
            elif data.get("type") == "stream_end":
                print("\n转写完成")
                break
                
    except Exception as e:
        print(f"\n接收数据时出错: {e}")

async def main():
    # 创建录音器
    recorder = AudioRecorder()
    
    # 连接WebSocket服务器
    uri = "ws://localhost:8765"
    
    async with websockets.connect(uri) as websocket:
        print(f"已连接到服务器: {uri}")
        
        # 启动录音
        recorder.start_recording()
        
        try:
            # 创建两个任务:发送音频和接收转写
            send_task = asyncio.create_task(send_audio_stream(websocket, recorder))
            receive_task = asyncio.create_task(receive_transcriptions(websocket))
            
            # 等待用户按Enter停止
            print("\n按Enter键停止录音...")
            await asyncio.get_event_loop().run_in_executor(None, input)
            
            # 停止录音
            recorder.stop_recording()
            
            # 等待任务完成
            await asyncio.gather(send_task, receive_task, return_exceptions=True)
            
        except KeyboardInterrupt:
            print("\n用户中断")
            recorder.stop_recording()
        except Exception as e:
            print(f"发生错误: {e}")
            recorder.stop_recording()

if __name__ == "__main__":
    asyncio.run(main())

这个客户端实现了:

  1. 实时录音(使用sounddevice)
  2. 将音频切成0.5秒的小块
  3. 通过WebSocket发送到服务器
  4. 接收并显示实时转写结果

3.4 实际运行效果

运行起来后,你会看到这样的效果:

开始录音...
按Enter键停止录音...
转写结果: 大家好欢迎参加今天的线上会议
转写结果: 我们今天要讨论的是实时语音转写技术的应用
转写结果: 特别是在多语言场景下的实际效果
转写结果: 让我们先来看看Qwen3ASR在英语识别上的表现

文字几乎是实时出现的,你说完一句话,文字就显示出来了。对于中文方言,比如粤语,它也能准确识别:

转写结果: 大家好我系广东人今日同大家倾下计
转写结果: 呢个系统真系好犀利连广东话都识听

4. 性能优化和实用技巧

在实际使用中,你可能会遇到一些性能问题。这里分享几个优化技巧:

4.1 调整音频块大小

音频块的大小会影响延迟和准确率。太小的话,模型可能没有足够的上下文;太大的话,延迟会增加。经过测试,0.5-1秒的块大小是个不错的平衡点:

# 根据网络状况动态调整块大小
def adjust_chunk_size(network_latency):
    if network_latency < 100:  # 毫秒
        return 0.3  # 网络好,可以用更小的块
    elif network_latency < 300:
        return 0.5  # 中等网络
    else:
        return 1.0  # 网络差,用大一点的块

4.2 处理网络抖动

网络不稳定时,音频传输可能会中断。我们可以添加重试机制:

async def send_with_retry(websocket, data, max_retries=3):
    """带重试的发送"""
    for attempt in range(max_retries):
        try:
            await websocket.send(data)
            return True
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"发送失败,已重试{max_retries}次: {e}")
                return False
            await asyncio.sleep(0.1 * (attempt + 1))  # 指数退避
    return False

4.3 内存和性能优化

对于长时间运行的转写服务,内存管理很重要:

class OptimizedASRServer:
    def __init__(self):
        # 使用vLLM后端提升性能
        from qwen_asr import Qwen3ASRModel
        
        self.model = Qwen3ASRModel.LLM(
            model="Qwen/Qwen3-ASR-0.6B",
            gpu_memory_utilization=0.7,  # 控制GPU内存使用
            max_inference_batch_size=128,  # 增大批处理大小
            max_new_tokens=4096,  # 支持更长的文本
        )
        
        # 启用流式推理优化
        self.model.enable_streaming()
        
    async def process_stream(self, audio_stream):
        """处理音频流"""
        # 使用异步处理,避免阻塞
        loop = asyncio.get_event_loop()
        
        # 将CPU密集型的模型推理放到线程池中执行
        result = await loop.run_in_executor(
            None,
            self.model.transcribe_stream,
            audio_stream
        )
        
        return result

4.4 多语言自动检测

Qwen3-ASR支持52种语言和方言的自动检测。在实际使用中,你可以:

# 设置语言提示,提高准确率
async def transcribe_with_context(self, audio_data, context_hint=None):
    """带上下文提示的转写"""
    
    if context_hint:
        # 如果有语言提示,可以设置系统提示
        system_prompt = f"当前对话可能包含{context_hint}语言"
    else:
        system_prompt = "请自动检测语言并进行转写"
    
    results = self.model.transcribe(
        audio=audio_data,
        language=None,  # 自动检测
        system_prompt=system_prompt,
        streaming=True
    )
    
    return results

5. 实际应用场景扩展

这个实时转写系统可以应用到很多场景,不仅仅是会议和直播。

5.1 智能客服系统

想象一个跨境电商客服系统,客户可能用英语、中文、或者各种方言咨询。实时转写可以让客服系统:

  1. 实时理解客户问题
  2. 自动识别语言并路由到对应语种的客服
  3. 生成对话摘要
  4. 提供实时翻译辅助
class CustomerServiceSystem:
    def __init__(self):
        self.asr_server = ASRServer()
        self.language_routing = {
            "English": "en_agent",
            "Chinese": "zh_agent",
            "Cantonese": "yue_agent",
            # ... 其他语言
        }
    
    async def handle_customer_call(self, audio_stream):
        """处理客户来电"""
        transcriptions = []
        
        async for chunk in audio_stream:
            # 实时转写
            result = await self.asr_server.process_audio_chunk(chunk)
            
            if result["text"]:
                transcriptions.append(result["text"])
                
                # 根据语言路由
                if result["language"] in self.language_routing:
                    agent = self.language_routing[result["language"]]
                    print(f"路由到 {agent} 处理")
                
                # 实时分析客户情绪(简单示例)
                sentiment = self.analyze_sentiment(result["text"])
                if sentiment == "urgent":
                    print("检测到紧急情况,提升处理优先级")
        
        return " ".join(transcriptions)

5.2 教育场景:实时字幕和笔记

在线教育平台可以用这个系统为课程提供实时字幕,特别是对于有听力障碍的学生。还可以自动生成课程笔记:

class EducationAssistant:
    def __init__(self):
        self.asr_server = ASRServer()
        self.keyword_detector = KeywordDetector()
        
    async def process_lecture(self, audio_stream, subject="computer_science"):
        """处理讲座音频"""
        full_transcript = []
        key_points = []
        
        async for chunk in audio_stream:
            result = await self.asr_server.process_audio_chunk(chunk)
            
            if result["text"]:
                full_transcript.append(result["text"])
                
                # 检测关键词(根据学科)
                keywords = self.keyword_detector.detect(
                    result["text"], 
                    subject=subject
                )
                
                if keywords:
                    key_points.append({
                        "text": result["text"],
                        "keywords": keywords,
                        "timestamp": time.time()
                    })
                    print(f"关键点: {keywords}")
        
        # 生成结构化笔记
        notes = self.generate_notes(full_transcript, key_points)
        return notes

5.3 医疗场景:医患对话记录

在医疗场景中,医生和患者的对话需要准确记录。实时转写可以帮助:

class MedicalTranscription:
    def __init__(self):
        self.asr_server = ASRServer()
        self.medical_terms = self.load_medical_terms()
        
    async def transcribe_consultation(self, audio_stream, doctor_id, patient_id):
        """转写医患咨询"""
        consultation_text = []
        
        async for chunk in audio_stream:
            result = await self.asr_server.process_audio_chunk(chunk)
            
            if result["text"]:
                # 自动标注医学术语
                annotated_text = self.annotate_medical_terms(result["text"])
                consultation_text.append(annotated_text)
                
                # 实时提取关键信息
                symptoms = self.extract_symptoms(annotated_text)
                if symptoms:
                    print(f"检测到症状: {symptoms}")
                
                medications = self.extract_medications(annotated_text)
                if medications:
                    print(f"提到药物: {medications}")
        
        # 生成结构化病历
        medical_record = self.generate_record(
            consultation_text,
            doctor_id,
            patient_id
        )
        
        return medical_record

6. 部署和扩展建议

如果你要把这个系统部署到生产环境,有几个建议:

6.1 使用Docker容器化

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    ffmpeg \
    libsndfile1 \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 下载模型(或者从volume挂载)
RUN python -c "from qwen_asr import Qwen3ASRModel; \
    Qwen3ASRModel.from_pretrained('Qwen/Qwen3-ASR-0.6B', cache_dir='/models')"

# 暴露端口
EXPOSE 8765

# 启动命令
CMD ["python", "server.py"]

6.2 添加负载均衡

对于高并发场景,可以使用多个实例:

# load_balancer.py
import asyncio
import websockets
import json
from typing import List
import random

class ASRLoadBalancer:
    def __init__(self, server_list: List[str]):
        self.servers = server_list
        self.server_weights = {server: 1.0 for server in server_list}
        self.connections = {}
        
    async def route_request(self, websocket, path):
        """路由请求到最空闲的服务器"""
        # 选择服务器(简单轮询或基于权重)
        selected_server = self.select_server()
        
        try:
            # 连接到选中的服务器
            async with websockets.connect(selected_server) as server_ws:
                # 双向代理
                client_to_server = asyncio.create_task(
                    self.forward(websocket, server_ws, "client→server")
                )
                server_to_client = asyncio.create_task(
                    self.forward(server_ws, websocket, "server→client")
                )
                
                await asyncio.gather(client_to_server, server_to_client)
                
        except Exception as e:
            print(f"路由失败: {e}")
            # 降低该服务器的权重
            self.server_weights[selected_server] *= 0.8
    
    def select_server(self):
        """选择服务器(加权随机)"""
        servers = list(self.server_weights.keys())
        weights = list(self.server_weights.values())
        return random.choices(servers, weights=weights, k=1)[0]
    
    async def forward(self, source, destination, label):
        """转发消息"""
        try:
            async for message in source:
                await destination.send(message)
        except websockets.exceptions.ConnectionClosed:
            print(f"{label} 连接关闭")

6.3 监控和日志

添加监控可以帮助你了解系统运行状况:

# monitoring.py
import time
import psutil
from prometheus_client import start_http_server, Gauge, Counter

class ASRMonitor:
    def __init__(self):
        # 定义监控指标
        self.requests_total = Counter('asr_requests_total', '总请求数')
        self.request_duration = Gauge('asr_request_duration_seconds', '请求耗时')
        self.memory_usage = Gauge('asr_memory_usage_bytes', '内存使用')
        self.gpu_usage = Gauge('asr_gpu_usage_percent', 'GPU使用率')
        
        # 启动Prometheus metrics服务器
        start_http_server(8000)
    
    def record_request(self, duration):
        """记录请求"""
        self.requests_total.inc()
        self.request_duration.set(duration)
    
    def update_system_metrics(self):
        """更新系统指标"""
        # 内存使用
        memory = psutil.virtual_memory()
        self.memory_usage.set(memory.used)
        
        # GPU使用(如果有)
        try:
            import pynvml
            pynvml.nvmlInit()
            handle = pynvml.nvmlDeviceGetHandleByIndex(0)
            util = pynvml.nvmlDeviceGetUtilizationRates(handle)
            self.gpu_usage.set(util.gpu)
        except:
            pass  # 没有GPU或pynvml不可用

7. 总结

把Qwen3-ASR-0.6B和WebSocket结合起来做实时语音转写,效果确实不错。我实际测试下来,延迟能控制在几百毫秒以内,对于大多数实时应用来说完全够用。准确率方面,特别是对中文和英语的识别,基本能达到商用水平。

这套方案最大的优势是灵活。你可以根据实际需求调整音频块大小、网络重试策略、错误处理逻辑。比如在网络条件好的情况下,可以用更小的块来降低延迟;在网络不稳定的时候,自动增大块大小,牺牲一点实时性来保证稳定性。

部署方面,用Docker打包确实方便,特别是需要批量部署的时候。监控系统也很重要,能帮你及时发现性能瓶颈。我建议至少监控请求延迟、内存使用、GPU使用率这几个关键指标。

如果你要处理特别大的并发量,可以考虑用负载均衡把请求分发到多个实例。Qwen3-ASR-0.6B本身效率就不错,128并发下RTF能到0.064,相当于1秒能处理15秒的音频,这个性能对于大多数场景都足够了。

实际用的时候,你会发现一些小技巧很有用。比如在转写开始前给一点语言提示,准确率会更高;又比如根据上下文动态调整模型参数,能更好地处理专业术语。这些都需要在实际场景中慢慢摸索。

总的来说,这套方案把先进的语音识别模型和高效的网络传输协议结合得很好,既保证了实时性,又保持了高准确率。无论是做在线会议、直播字幕,还是智能客服、教育辅助,都能派上用场。如果你有类似的需求,不妨试试看,应该会有不错的体验。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐