Qwen3-ASR-0.6B保姆级教程:环境搭建到语音识别全流程

1. 为什么选择Qwen3-ASR-0.6B

语音识别正在从云端走向边缘,从服务器走向设备端。无论是智能音箱、车载语音助手,还是工业控制设备,都希望能在本地完成语音转文字,而不是把音频数据上传到云端。这背后有三个核心需求:隐私保护、低延迟响应、离线可用。

Qwen3-ASR-0.6B就是为这个趋势而生的轻量级语音识别模型。它只有6亿参数,却能支持52种语言和方言,包括22种中文方言。更关键的是,它在128并发下能达到2000倍吞吐量——这意味着用10秒钟就能处理5小时的音频数据。对于嵌入式设备和边缘计算场景来说,这个效率太重要了。

我最近在做一个智能会议记录设备,需要实时把会议内容转成文字。最初尝试用云端API,结果网络一波动,识别就中断,用户体验很差。后来找到了Qwen3-ASR-0.6B,发现它不仅能离线运行,识别准确率还相当不错。更重要的是,官方提供了Gradio界面,让部署和测试变得特别简单。

这篇文章就是我的实战记录。我会带你从零开始,一步步搭建Qwen3-ASR-0.6B的完整环境,让你在10分钟内就能跑起自己的语音识别服务。无论你是AI新手还是有一定经验的开发者,都能跟着这个教程快速上手。

2. 环境准备与快速部署

2.1 系统要求检查

在开始之前,我们先确认一下你的环境是否满足要求。Qwen3-ASR-0.6B对硬件要求不高,但有些基础依赖需要提前准备好。

基础要求:

  • 操作系统:Linux(Ubuntu 20.04+推荐)、macOS、Windows(WSL2)
  • Python版本:3.8-3.11
  • 内存:至少8GB RAM(16GB更佳)
  • 磁盘空间:至少10GB可用空间

Python依赖包:

  • PyTorch 2.0+
  • Transformers 4.35+
  • Gradio 4.0+
  • 其他音频处理库

如果你不确定自己的环境,可以打开终端,运行以下命令检查:

# 检查Python版本
python3 --version

# 检查PyTorch是否安装
python3 -c "import torch; print(f'PyTorch版本: {torch.__version__}')"

# 检查CUDA是否可用(如果有GPU)
python3 -c "import torch; print(f'CUDA可用: {torch.cuda.is_available()}')"

如果你的Python版本低于3.8,或者没有安装PyTorch,别担心,我们接下来会一步步安装。

2.2 一键安装脚本

为了让大家快速上手,我准备了一个一键安装脚本。这个脚本会自动检查环境,安装所有必要的依赖。

创建一个名为install_qwen_asr.sh的文件:

#!/bin/bash

echo "开始安装Qwen3-ASR-0.6B环境..."

# 检查Python版本
python_version=$(python3 --version 2>&1 | cut -d' ' -f2)
echo "当前Python版本: $python_version"

# 检查并安装pip
if ! command -v pip3 &> /dev/null; then
    echo "pip3未安装,正在安装..."
    sudo apt-get update
    sudo apt-get install -y python3-pip
fi

# 升级pip
pip3 install --upgrade pip

# 安装PyTorch(根据是否有GPU选择不同版本)
if python3 -c "import torch; print(torch.cuda.is_available())" 2>/dev/null | grep -q "True"; then
    echo "检测到GPU,安装GPU版本的PyTorch..."
    pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
else
    echo "未检测到GPU,安装CPU版本的PyTorch..."
    pip3 install torch torchvision torchaudio
fi

# 安装其他依赖
echo "安装transformers和gradio..."
pip3 install transformers gradio

# 安装音频处理库
echo "安装音频处理库..."
pip3 install soundfile librosa pydub

# 验证安装
echo "验证安装..."
python3 -c "
import torch
import transformers
import gradio
print('✓ PyTorch版本:', torch.__version__)
print('✓ Transformers版本:', transformers.__version__)
print('✓ Gradio版本:', gradio.__version__)
print('✓ 所有依赖安装完成!')
"

echo "安装完成!现在可以运行Qwen3-ASR-0.6B了。"

给脚本添加执行权限并运行:

# 添加执行权限
chmod +x install_qwen_asr.sh

# 运行安装脚本
./install_qwen_asr.sh

脚本运行过程中,你会看到各种包的安装进度。如果网络正常,整个过程大概需要5-10分钟。

2.3 快速验证安装

安装完成后,我们写一个简单的测试脚本来验证环境是否正常。

创建test_environment.py文件:

import torch
import transformers
import gradio as gr
import soundfile as sf
import numpy as np

print("=== 环境测试开始 ===")

# 测试PyTorch
print(f"1. PyTorch版本: {torch.__version__}")
print(f"2. CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"3. GPU设备: {torch.cuda.get_device_name(0)}")

# 测试Transformers
print(f"4. Transformers版本: {transformers.__version__}")

# 测试Gradio
print(f"5. Gradio版本: {gr.__version__}")

# 测试音频库
try:
    # 创建一个测试音频
    sample_rate = 16000
    duration = 1.0  # 1秒
    t = np.linspace(0, duration, int(sample_rate * duration))
    audio_data = 0.5 * np.sin(2 * np.pi * 440 * t)  # 440Hz正弦波
    
    # 保存为WAV文件
    sf.write('test_audio.wav', audio_data, sample_rate)
    print("6. 音频库测试: ✓ 成功创建测试音频")
    
    # 读取音频文件
    data, sr = sf.read('test_audio.wav')
    print(f"7. 音频读取测试: ✓ 采样率{sr}Hz, 长度{len(data)}个样本")
    
    # 清理测试文件
    import os
    os.remove('test_audio.wav')
    
except Exception as e:
    print(f"6. 音频库测试: ✗ 失败 - {e}")

print("=== 环境测试完成 ===")
print("如果所有测试都通过,说明环境配置正确!")

运行测试脚本:

python3 test_environment.py

如果看到所有测试项都显示成功(✓),恭喜你!环境已经准备好了。如果有任何失败,脚本会告诉你具体问题,你可以根据错误信息进行修复。

3. 模型下载与基础使用

3.1 下载Qwen3-ASR-0.6B模型

Qwen3-ASR-0.6B模型可以通过Hugging Face直接下载。官方提供了两种方式:通过Python代码自动下载,或者手动下载后使用。

方式一:自动下载(推荐)

这是最简单的方式,模型会在第一次使用时自动下载。创建一个简单的Python脚本:

from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
import torch

# 指定模型名称
model_name = "Qwen/Qwen3-ASR-0.6B"

print(f"开始下载模型: {model_name}")
print("这可能需要一些时间,请耐心等待...")

try:
    # 下载模型和处理器
    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        model_name,
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
        low_cpu_mem_usage=True,
        use_safetensors=True
    )
    
    processor = AutoProcessor.from_pretrained(model_name)
    
    print("✓ 模型下载完成!")
    print(f"模型类型: {type(model).__name__}")
    print(f"处理器类型: {type(processor).__name__}")
    
except Exception as e:
    print(f"✗ 下载失败: {e}")
    print("请检查网络连接,或尝试方式二手动下载")

运行这个脚本,它会自动从Hugging Face下载模型。第一次运行会比较慢,因为需要下载大约1.2GB的模型文件。下载完成后,模型会缓存在本地,下次使用就不需要重新下载了。

方式二:手动下载

如果自动下载速度太慢,或者网络有问题,可以手动下载:

  1. 访问Hugging Face模型页面:https://huggingface.co/Qwen/Qwen3-ASR-0.6B
  2. 点击"Files and versions"标签页
  3. 下载以下文件:
    • config.json
    • generation_config.json
    • model.safetensors(主要模型文件)
    • preprocessor_config.json
    • tokenizer.json
    • vocab.json
  4. 将所有文件放在同一个目录下,比如./models/qwen3-asr-0.6b/

然后修改代码,从本地加载模型:

from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor

# 指定本地模型路径
model_path = "./models/qwen3-asr-0.6b"

model = AutoModelForSpeechSeq2Seq.from_pretrained(model_path)
processor = AutoProcessor.from_pretrained(model_path)

3.2 第一个语音识别程序

现在我们来写一个最简单的语音识别程序,感受一下Qwen3-ASR-0.6B的能力。

创建first_asr.py文件:

import torch
import librosa
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor

def transcribe_audio(audio_path):
    """
    将音频文件转换为文字
    """
    print(f"处理音频文件: {audio_path}")
    
    # 1. 加载模型和处理器
    print("加载模型...")
    model_name = "Qwen/Qwen3-ASR-0.6B"
    
    # 根据是否有GPU选择数据类型
    if torch.cuda.is_available():
        print("使用GPU加速")
        torch_dtype = torch.float16
        device = "cuda"
    else:
        print("使用CPU")
        torch_dtype = torch.float32
        device = "cpu"
    
    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        model_name,
        torch_dtype=torch_dtype,
        low_cpu_mem_usage=True,
        use_safetensors=True
    ).to(device)
    
    processor = AutoProcessor.from_pretrained(model_name)
    
    # 2. 加载音频文件
    print("加载音频...")
    audio, sample_rate = librosa.load(audio_path, sr=16000)
    
    # 3. 预处理音频
    print("预处理音频...")
    inputs = processor(
        audio,
        sampling_rate=sample_rate,
        return_tensors="pt"
    ).to(device)
    
    # 4. 语音识别
    print("开始识别...")
    with torch.no_grad():
        generated_ids = model.generate(
            inputs["input_features"],
            max_new_tokens=256
        )
    
    # 5. 解码结果
    transcription = processor.batch_decode(
        generated_ids,
        skip_special_tokens=True
    )[0]
    
    print("识别完成!")
    return transcription

if __name__ == "__main__":
    # 测试用的音频文件路径
    # 你可以录制一段自己的语音,或者使用示例音频
    audio_file = "test_audio.wav"  # 替换为你的音频文件
    
    try:
        result = transcribe_audio(audio_file)
        print("\n" + "="*50)
        print("识别结果:")
        print(result)
        print("="*50)
    except FileNotFoundError:
        print(f"错误: 找不到音频文件 {audio_file}")
        print("请先录制一段语音,保存为test_audio.wav")
    except Exception as e:
        print(f"错误: {e}")

这个程序做了几件事:

  1. 加载Qwen3-ASR-0.6B模型和处理器
  2. 读取音频文件(需要是16kHz采样率的WAV文件)
  3. 对音频进行预处理,转换成模型能理解的格式
  4. 运行模型进行语音识别
  5. 输出识别结果

要测试这个程序,你需要先准备一个音频文件。可以用手机录制一段语音,保存为WAV格式,采样率设为16000Hz。或者用Python生成一个测试音频:

import soundfile as sf
import numpy as np

# 生成测试音频:说"你好,世界"
sample_rate = 16000
duration = 3.0  # 3秒

# 创建一个简单的音频信号(实际应用中应该用真实录音)
t = np.linspace(0, duration, int(sample_rate * duration))
# 模拟语音的简单波形
audio = 0.3 * np.sin(2 * np.pi * 200 * t) * np.exp(-0.5 * t)

# 保存为WAV文件
sf.write('test_audio.wav', audio, sample_rate)
print("测试音频已生成: test_audio.wav")

运行识别程序:

python3 first_asr.py

第一次运行会比较慢,因为需要加载模型。如果一切正常,你会看到模型加载的进度,然后输出识别结果。即使识别结果不准确也没关系,这只是一个基础测试,我们后面会优化识别效果。

3.3 支持多种音频格式

实际应用中,音频格式多种多样,可能是MP3、M4A、WAV等。Qwen3-ASR-0.6B需要16kHz单声道的PCM数据,所以我们需要处理各种格式的转换。

创建一个音频处理工具函数:

import librosa
import soundfile as sf
from pydub import AudioSegment
import numpy as np
import os

def load_audio_file(audio_path, target_sr=16000):
    """
    加载各种格式的音频文件,并转换为模型需要的格式
    """
    print(f"加载音频文件: {audio_path}")
    
    # 检查文件是否存在
    if not os.path.exists(audio_path):
        raise FileNotFoundError(f"音频文件不存在: {audio_path}")
    
    # 根据文件扩展名选择加载方式
    file_ext = os.path.splitext(audio_path)[1].lower()
    
    try:
        if file_ext in ['.wav', '.flac', '.ogg']:
            # 使用soundfile加载无损格式
            audio, sr = sf.read(audio_path)
            
        elif file_ext in ['.mp3', '.m4a', '.aac']:
            # 使用pydub加载压缩格式
            audio_segment = AudioSegment.from_file(audio_path)
            
            # 转换为单声道
            if audio_segment.channels > 1:
                audio_segment = audio_segment.set_channels(1)
            
            # 转换为目标采样率
            if audio_segment.frame_rate != target_sr:
                audio_segment = audio_segment.set_frame_rate(target_sr)
            
            # 转换为numpy数组
            audio = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
            sr = target_sr
            
            # 归一化到[-1, 1]
            if audio_segment.sample_width == 2:  # 16-bit
                audio = audio / 32768.0
            elif audio_segment.sample_width == 3:  # 24-bit
                audio = audio / 8388608.0
            elif audio_segment.sample_width == 1:  # 8-bit
                audio = (audio - 128) / 128.0
                
        else:
            # 其他格式使用librosa
            audio, sr = librosa.load(audio_path, sr=target_sr, mono=True)
            
    except Exception as e:
        # 如果上述方法都失败,尝试用librosa
        print(f"使用备用方法加载音频: {e}")
        audio, sr = librosa.load(audio_path, sr=target_sr, mono=True)
    
    # 确保采样率正确
    if sr != target_sr:
        print(f"重新采样: {sr}Hz -> {target_sr}Hz")
        audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
        sr = target_sr
    
    # 确保是单声道
    if len(audio.shape) > 1:
        print("转换为单声道")
        audio = np.mean(audio, axis=1)
    
    # 确保数据类型正确
    audio = audio.astype(np.float32)
    
    print(f"音频信息: 时长={len(audio)/sr:.2f}秒, 采样率={sr}Hz, 形状={audio.shape}")
    return audio, sr

def save_audio_file(audio, sample_rate, output_path):
    """
    保存音频文件
    """
    # 确保音频在[-1, 1]范围内
    audio = np.clip(audio, -1.0, 1.0)
    
    # 根据扩展名选择保存格式
    file_ext = os.path.splitext(output_path)[1].lower()
    
    if file_ext == '.wav':
        # WAV格式使用16-bit PCM
        audio_int16 = (audio * 32767).astype(np.int16)
        sf.write(output_path, audio_int16, sample_rate, subtype='PCM_16')
    elif file_ext == '.flac':
        # FLAC格式
        sf.write(output_path, audio, sample_rate, format='FLAC')
    elif file_ext == '.mp3':
        # MP3格式需要pydub
        from pydub import AudioSegment
        # 先保存为WAV,再转MP3
        temp_wav = "temp.wav"
        audio_int16 = (audio * 32767).astype(np.int16)
        sf.write(temp_wav, audio_int16, sample_rate)
        
        audio_segment = AudioSegment.from_wav(temp_wav)
        audio_segment.export(output_path, format="mp3", bitrate="192k")
        os.remove(temp_wav)
    else:
        # 默认保存为WAV
        audio_int16 = (audio * 32767).astype(np.int16)
        sf.write(output_path, audio_int16, sample_rate)
    
    print(f"音频已保存: {output_path}")

这个工具函数可以处理多种音频格式,并自动转换为模型需要的格式。你可以这样使用它:

# 加载任意格式的音频
audio, sr = load_audio_file("my_recording.mp3")

# 现在audio就是16kHz单声道的numpy数组
# 可以直接用于语音识别

# 如果需要,可以保存为WAV格式
save_audio_file(audio, sr, "converted.wav")

4. 使用Gradio构建Web界面

4.1 为什么选择Gradio

Gradio是一个Python库,可以快速为机器学习模型创建Web界面。对于语音识别应用来说,Gradio有几个明显优势:

  1. 简单易用:几行代码就能创建功能完整的界面
  2. 实时交互:支持录音、上传、实时识别
  3. 无需前端知识:纯Python实现,不需要HTML/CSS/JavaScript
  4. 自动部署:可以轻松分享给他人使用

对于Qwen3-ASR-0.6B这样的语音识别模型,Gradio特别适合,因为它内置了音频录制和播放功能。

4.2 创建基础语音识别界面

让我们创建一个完整的Gradio应用。创建gradio_app.py文件:

import gradio as gr
import torch
import numpy as np
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
import tempfile
import os
from datetime import datetime

class QwenASRApp:
    def __init__(self):
        """初始化应用"""
        self.model = None
        self.processor = None
        self.device = None
        self.initialized = False
        
    def initialize_model(self):
        """初始化模型"""
        if self.initialized:
            return "模型已初始化"
        
        try:
            print("开始初始化模型...")
            
            # 检查是否有GPU
            if torch.cuda.is_available():
                self.device = "cuda"
                torch_dtype = torch.float16
                print("使用GPU加速")
            else:
                self.device = "cpu"
                torch_dtype = torch.float32
                print("使用CPU")
            
            # 加载模型和处理器
            model_name = "Qwen/Qwen3-ASR-0.6B"
            
            print("加载处理器...")
            self.processor = AutoProcessor.from_pretrained(model_name)
            
            print("加载模型...")
            self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
                model_name,
                torch_dtype=torch_dtype,
                low_cpu_mem_usage=True,
                use_safetensors=True
            ).to(self.device)
            
            self.initialized = True
            print("模型初始化完成!")
            return "✓ 模型初始化成功!"
            
        except Exception as e:
            error_msg = f"模型初始化失败: {str(e)}"
            print(error_msg)
            return f"✗ {error_msg}"
    
    def transcribe_audio(self, audio_input, language="自动检测"):
        """转录音频"""
        if not self.initialized:
            return "错误:模型未初始化,请先点击'初始化模型'按钮", None
        
        try:
            # 获取音频数据
            if audio_input is None:
                return "错误:请提供音频文件或录制语音", None
            
            sample_rate, audio_data = audio_input
            
            # 转换为单声道和16kHz
            if len(audio_data.shape) > 1:
                audio_data = np.mean(audio_data, axis=1)
            
            # 如果采样率不是16kHz,需要重采样
            if sample_rate != 16000:
                import librosa
                audio_data = librosa.resample(
                    audio_data.astype(np.float32),
                    orig_sr=sample_rate,
                    target_sr=16000
                )
                sample_rate = 16000
            
            print(f"处理音频: {len(audio_data)/sample_rate:.2f}秒, {sample_rate}Hz")
            
            # 预处理
            inputs = self.processor(
                audio_data,
                sampling_rate=sample_rate,
                return_tensors="pt"
            ).to(self.device)
            
            # 设置生成参数
            generate_kwargs = {
                "input_features": inputs["input_features"],
                "max_new_tokens": 256,
                "do_sample": False,  # 使用贪婪解码,速度更快
            }
            
            # 如果指定了语言,添加语言提示
            if language != "自动检测":
                generate_kwargs["forced_decoder_ids"] = self.processor.get_decoder_prompt_ids(
                    language=language,
                    task="transcribe"
                )
            
            # 语音识别
            with torch.no_grad():
                generated_ids = self.model.generate(**generate_kwargs)
            
            # 解码结果
            transcription = self.processor.batch_decode(
                generated_ids,
                skip_special_tokens=True
            )[0]
            
            # 添加时间戳
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            result_text = f"[{timestamp}] {transcription}"
            
            # 保存结果到文件
            self.save_result(result_text)
            
            return result_text, transcription
            
        except Exception as e:
            error_msg = f"识别失败: {str(e)}"
            print(error_msg)
            return error_msg, None
    
    def save_result(self, text):
        """保存识别结果到文件"""
        try:
            os.makedirs("results", exist_ok=True)
            today = datetime.now().strftime("%Y%m%d")
            filename = f"results/transcriptions_{today}.txt"
            
            with open(filename, "a", encoding="utf-8") as f:
                f.write(text + "\n")
            
            print(f"结果已保存到: {filename}")
        except Exception as e:
            print(f"保存结果失败: {e}")
    
    def create_interface(self):
        """创建Gradio界面"""
        with gr.Blocks(title="Qwen3-ASR-0.6B 语音识别", theme=gr.themes.Soft()) as demo:
            gr.Markdown("# 🎤 Qwen3-ASR-0.6B 语音识别系统")
            gr.Markdown("支持52种语言和方言的语音转文字")
            
            with gr.Row():
                with gr.Column(scale=1):
                    # 模型初始化部分
                    gr.Markdown("## 1. 初始化模型")
                    init_btn = gr.Button("初始化模型", variant="primary")
                    init_status = gr.Textbox(label="初始化状态", interactive=False)
                    
                    # 语言选择
                    gr.Markdown("## 2. 选择语言")
                    language = gr.Dropdown(
                        choices=[
                            "自动检测",
                            "中文",
                            "英语",
                            "日语",
                            "韩语",
                            "法语",
                            "德语",
                            "西班牙语"
                        ],
                        value="自动检测",
                        label="识别语言"
                    )
                    
                    gr.Markdown("### 使用说明")
                    gr.Markdown("""
                    1. 点击"初始化模型"按钮加载模型(首次使用需要下载模型)
                    2. 选择识别语言(或使用自动检测)
                    3. 上传音频文件或直接录制语音
                    4. 点击"开始识别"按钮
                    5. 查看识别结果
                    """)
                    
                with gr.Column(scale=2):
                    # 音频输入部分
                    gr.Markdown("## 3. 音频输入")
                    
                    with gr.Tabs():
                        with gr.TabItem("🎤 录制语音"):
                            audio_input = gr.Audio(
                                sources=["microphone"],
                                type="numpy",
                                label="录制语音",
                                interactive=True
                            )
                        
                        with gr.TabItem("📁 上传文件"):
                            file_input = gr.Audio(
                                sources=["upload"],
                                type="numpy",
                                label="上传音频文件",
                                interactive=True
                            )
                    
                    # 识别按钮
                    transcribe_btn = gr.Button("开始识别", variant="primary", size="lg")
                    
                    # 结果显示部分
                    gr.Markdown("## 4. 识别结果")
                    result_with_time = gr.Textbox(
                        label="带时间戳的结果",
                        interactive=False,
                        lines=3
                    )
                    
                    result_raw = gr.Textbox(
                        label="原始文本",
                        interactive=True,
                        lines=3
                    )
                    
                    # 保存按钮
                    save_btn = gr.Button("保存结果", variant="secondary")
            
            # 绑定事件
            init_btn.click(
                fn=self.initialize_model,
                outputs=init_status
            )
            
            def process_audio(audio, lang):
                return self.transcribe_audio(audio, lang)
            
            transcribe_btn.click(
                fn=process_audio,
                inputs=[audio_input, language],
                outputs=[result_with_time, result_raw]
            )
            
            # 文件输入也绑定相同的事件
            file_input.change(
                fn=lambda x: x,
                inputs=[file_input],
                outputs=[audio_input]
            )
            
            # 保存结果
            def save_text(text):
                if text:
                    self.save_result(f"[手动保存] {text}")
                    return "结果已保存"
                return "无内容可保存"
            
            save_btn.click(
                fn=save_text,
                inputs=result_raw,
                outputs=gr.Textbox(label="保存状态", visible=False)
            )
            
            # 添加示例
            gr.Markdown("## 示例音频")
            gr.Examples(
                examples=[
                    ["examples/hello_chinese.wav", "中文"],
                    ["examples/hello_english.wav", "英语"],
                ],
                inputs=[file_input, language],
                outputs=[result_with_time, result_raw],
                fn=process_audio,
                cache_examples=False
            )
        
        return demo

def main():
    """主函数"""
    app = QwenASRApp()
    demo = app.create_interface()
    
    # 启动服务
    print("启动Gradio服务...")
    print("访问 http://localhost:7860 使用语音识别系统")
    
    demo.launch(
        server_name="0.0.0.0",
        server_port=7860,
        share=False,  # 设置为True可以生成公共链接
        debug=True
    )

if __name__ == "__main__":
    main()

这个应用提供了完整的功能:

  1. 模型初始化按钮
  2. 语言选择(支持自动检测和指定语言)
  3. 两种音频输入方式:录制和上传
  4. 实时识别结果显示
  5. 结果保存功能
  6. 示例音频

4.3 运行和测试应用

在运行应用之前,我们需要创建示例音频文件。创建一个examples目录,并添加一些测试音频:

# 创建示例目录
mkdir -p examples

# 创建示例音频(可以使用文本转语音工具生成)
# 这里我们用Python生成简单的测试音频
python3 create_examples.py

create_examples.py内容:

import numpy as np
import soundfile as sf
import os

def create_sine_wave(freq, duration, sample_rate=16000):
    """创建正弦波音频"""
    t = np.linspace(0, duration, int(sample_rate * duration))
    audio = 0.3 * np.sin(2 * np.pi * freq * t)
    return audio

def create_chirp(start_freq, end_freq, duration, sample_rate=16000):
    """创建扫频信号"""
    t = np.linspace(0, duration, int(sample_rate * duration))
    freq = np.linspace(start_freq, end_freq, len(t))
    audio = 0.3 * np.sin(2 * np.pi * freq * t)
    return audio

# 创建中文示例(模拟"你好,世界")
print("创建中文示例音频...")
chinese_audio = create_chirp(200, 400, 1.5)
sf.write('examples/hello_chinese.wav', chinese_audio, 16000)

# 创建英文示例(模拟"Hello, world")
print("创建英文示例音频...")
english_audio = create_sine_wave(300, 1.0)
english_audio = np.concatenate([english_audio, create_sine_wave(250, 0.5)])
sf.write('examples/hello_english.wav', english_audio, 16000)

print("示例音频创建完成!")

现在运行Gradio应用:

python3 gradio_app.py

你会看到类似这样的输出:

启动Gradio服务...
访问 http://localhost:7860 使用语音识别系统
Running on local URL:  http://0.0.0.0:7860

打开浏览器,访问 http://localhost:7860,你会看到一个漂亮的Web界面。

4.4 使用步骤详解

让我们一步步使用这个应用:

第一步:初始化模型 点击"初始化模型"按钮。第一次运行时会下载模型文件,这可能需要一些时间(取决于网络速度)。下载完成后,会显示"✓ 模型初始化成功!"。

第二步:选择语言 在下拉菜单中选择识别语言。如果你不确定音频的语言,选择"自动检测",模型会自动识别。

第三步:输入音频 有两种方式输入音频:

  1. 录制语音:点击录音按钮,允许浏览器访问麦克风,然后开始说话
  2. 上传文件:点击上传按钮,选择本地音频文件(支持WAV、MP3、M4A等格式)

第四步:开始识别 点击"开始识别"按钮。你会看到处理进度,几秒钟后识别结果就会显示在下方。

第五步:查看和保存结果 识别结果会显示在两个文本框中:

  • 第一个框显示带时间戳的结果
  • 第二个框显示原始文本,你可以编辑和复制

点击"保存结果"按钮,识别结果会被保存到results目录下的文本文件中,按日期分类。

5. 高级功能与优化技巧

5.1 批量处理音频文件

在实际应用中,我们经常需要批量处理多个音频文件。让我们创建一个批量处理脚本:

import os
import glob
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
from tqdm import tqdm
import json
from datetime import datetime

class BatchASRProcessor:
    def __init__(self, model_path="Qwen/Qwen3-ASR-0.6B", device=None):
        """初始化批量处理器"""
        self.model_path = model_path
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self.model = None
        self.processor = None
        
    def initialize(self):
        """初始化模型"""
        print(f"初始化模型,使用设备: {self.device}")
        
        torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
        
        self.processor = AutoProcessor.from_pretrained(self.model_path)
        self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
            self.model_path,
            torch_dtype=torch_dtype,
            low_cpu_mem_usage=True,
            use_safetensors=True
        ).to(self.device)
        
        print("模型初始化完成")
        
    def process_file(self, audio_path, language="自动检测"):
        """处理单个音频文件"""
        if self.model is None:
            self.initialize()
        
        try:
            # 加载音频
            import librosa
            audio, sr = librosa.load(audio_path, sr=16000, mono=True)
            
            # 预处理
            inputs = self.processor(
                audio,
                sampling_rate=sr,
                return_tensors="pt"
            ).to(self.device)
            
            # 设置生成参数
            generate_kwargs = {
                "input_features": inputs["input_features"],
                "max_new_tokens": 256,
            }
            
            # 语言设置
            if language != "自动检测":
                generate_kwargs["forced_decoder_ids"] = self.processor.get_decoder_prompt_ids(
                    language=language,
                    task="transcribe"
                )
            
            # 识别
            with torch.no_grad():
                generated_ids = self.model.generate(**generate_kwargs)
            
            # 解码
            transcription = self.processor.batch_decode(
                generated_ids,
                skip_special_tokens=True
            )[0]
            
            return {
                "file": audio_path,
                "text": transcription,
                "language": language,
                "status": "success",
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                "file": audio_path,
                "error": str(e),
                "status": "failed",
                "timestamp": datetime.now().isoformat()
            }
    
    def process_folder(self, folder_path, output_file="results.json", language="自动检测"):
        """处理整个文件夹的音频文件"""
        # 查找所有音频文件
        audio_extensions = ['*.wav', '*.mp3', '*.m4a', '*.flac', '*.ogg']
        audio_files = []
        
        for ext in audio_extensions:
            audio_files.extend(glob.glob(os.path.join(folder_path, ext)))
        
        if not audio_files:
            print(f"在 {folder_path} 中未找到音频文件")
            return []
        
        print(f"找到 {len(audio_files)} 个音频文件")
        
        # 初始化模型
        if self.model is None:
            self.initialize()
        
        # 批量处理
        results = []
        for audio_file in tqdm(audio_files, desc="处理音频文件"):
            result = self.process_file(audio_file, language)
            results.append(result)
            
            # 实时保存进度
            if output_file:
                with open(output_file, 'w', encoding='utf-8') as f:
                    json.dump(results, f, ensure_ascii=False, indent=2)
        
        # 统计结果
        success_count = sum(1 for r in results if r["status"] == "success")
        failed_count = len(results) - success_count
        
        print(f"\n处理完成!")
        print(f"成功: {success_count} 个文件")
        print(f"失败: {failed_count} 个文件")
        
        if output_file:
            print(f"结果已保存到: {output_file}")
        
        return results
    
    def process_file_list(self, file_list, output_file="results.json", language="自动检测"):
        """处理文件列表"""
        if not file_list:
            print("文件列表为空")
            return []
        
        print(f"处理 {len(file_list)} 个文件")
        
        # 初始化模型
        if self.model is None:
            self.initialize()
        
        # 批量处理
        results = []
        for audio_file in tqdm(file_list, desc="处理文件"):
            if os.path.exists(audio_file):
                result = self.process_file(audio_file, language)
                results.append(result)
            else:
                results.append({
                    "file": audio_file,
                    "error": "文件不存在",
                    "status": "failed",
                    "timestamp": datetime.now().isoformat()
                })
            
            # 实时保存进度
            if output_file:
                with open(output_file, 'w', encoding='utf-8') as f:
                    json.dump(results, f, ensure_ascii=False, indent=2)
        
        return results

def main():
    """批量处理示例"""
    import argparse
    
    parser = argparse.ArgumentParser(description="批量语音识别")
    parser.add_argument("--input", "-i", required=True, help="输入文件或文件夹路径")
    parser.add_argument("--output", "-o", default="results.json", help="输出JSON文件路径")
    parser.add_argument("--language", "-l", default="自动检测", help="识别语言")
    parser.add_argument("--device", "-d", choices=["cpu", "cuda"], help="运行设备")
    
    args = parser.parse_args()
    
    # 创建处理器
    processor = BatchASRProcessor(device=args.device)
    
    # 判断输入是文件还是文件夹
    if os.path.isfile(args.input):
        # 单个文件
        result = processor.process_file(args.input, args.language)
        print(f"识别结果: {result['text']}")
        
        # 保存结果
        with open(args.output, 'w', encoding='utf-8') as f:
            json.dump([result], f, ensure_ascii=False, indent=2)
            
    elif os.path.isdir(args.input):
        # 文件夹
        processor.process_folder(args.input, args.output, args.language)
    else:
        print(f"错误: {args.input} 不是有效的文件或文件夹")

if __name__ == "__main__":
    main()

使用这个批量处理脚本:

# 处理单个文件
python3 batch_asr.py --input audio.wav --output result.json

# 处理整个文件夹
python3 batch_asr.py --input ./audio_files/ --output all_results.json --language 中文

# 指定使用CPU
python3 batch_asr.py --input audio.wav --device cpu

5.2 流式识别实现

对于实时应用,我们需要流式识别功能。Qwen3-ASR-0.6B支持流式推理,让我们实现一个简单的流式识别示例:

import torch
import numpy as np
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
import queue
import threading
import time

class StreamASR:
    def __init__(self, model_name="Qwen/Qwen3-ASR-0.6B", chunk_duration=1.0, overlap=0.5):
        """
        流式语音识别
        
        参数:
            model_name: 模型名称
            chunk_duration: 每次处理的音频时长(秒)
            overlap: 重叠时长(秒),用于平滑过渡
        """
        self.chunk_duration = chunk_duration
        self.overlap = overlap
        self.sample_rate = 16000
        
        # 计算chunk大小
        self.chunk_samples = int(self.chunk_duration * self.sample_rate)
        self.overlap_samples = int(self.overlap * self.sample_rate)
        
        # 初始化模型
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
        
        print(f"加载模型: {model_name}")
        self.processor = AutoProcessor.from_pretrained(model_name)
        self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
            model_name,
            torch_dtype=torch_dtype,
            low_cpu_mem_usage=True,
            use_safetensors=True
        ).to(self.device)
        
        # 音频缓冲区
        self.audio_buffer = np.array([], dtype=np.float32)
        self.buffer_lock = threading.Lock()
        
        # 结果队列
        self.result_queue = queue.Queue()
        
        # 处理线程
        self.processing = False
        self.processing_thread = None
        
    def add_audio(self, audio_chunk):
        """添加音频数据到缓冲区"""
        with self.buffer_lock:
            self.audio_buffer = np.concatenate([self.audio_buffer, audio_chunk])
    
    def process_chunk(self):
        """处理一个音频chunk"""
        with self.buffer_lock:
            if len(self.audio_buffer) < self.chunk_samples:
                return None
            
            # 取一个chunk
            chunk = self.audio_buffer[:self.chunk_samples]
            
            # 保留重叠部分
            keep_samples = self.chunk_samples - self.overlap_samples
            if keep_samples > 0:
                self.audio_buffer = self.audio_buffer[keep_samples:]
            else:
                self.audio_buffer = np.array([], dtype=np.float32)
            
            return chunk
    
    def transcribe_chunk(self, audio_chunk):
        """转录一个音频chunk"""
        # 预处理
        inputs = self.processor(
            audio_chunk,
            sampling_rate=self.sample_rate,
            return_tensors="pt"
        ).to(self.device)
        
        # 识别
        with torch.no_grad():
            generated_ids = self.model.generate(
                inputs["input_features"],
                max_new_tokens=128,  # 流式识别使用较短的token
                do_sample=False
            )
        
        # 解码
        transcription = self.processor.batch_decode(
            generated_ids,
            skip_special_tokens=True
        )[0]
        
        return transcription
    
    def processing_loop(self):
        """处理循环"""
        while self.processing:
            # 获取一个chunk
            chunk = self.process_chunk()
            
            if chunk is not None:
                try:
                    # 转录
                    text = self.transcribe_chunk(chunk)
                    
                    # 放入结果队列
                    timestamp = time.time()
                    self.result_queue.put({
                        "text": text,
                        "timestamp": timestamp,
                        "duration": len(chunk) / self.sample_rate
                    })
                    
                    print(f"[{timestamp:.3f}] 部分结果: {text}")
                    
                except Exception as e:
                    print(f"处理chunk失败: {e}")
            
            # 短暂休眠,避免CPU占用过高
            time.sleep(0.1)
    
    def start(self):
        """开始流式识别"""
        if self.processing:
            print("已经在运行中")
            return
        
        self.processing = True
        self.processing_thread = threading.Thread(target=self.processing_loop)
        self.processing_thread.start()
        print("流式识别已启动")
    
    def stop(self):
        """停止流式识别"""
        self.processing = False
        if self.processing_thread:
            self.processing_thread.join()
        print("流式识别已停止")
    
    def get_results(self, block=True, timeout=None):
        """获取识别结果"""
        results = []
        while not self.result_queue.empty():
            try:
                result = self.result_queue.get(block=block, timeout=timeout)
                results.append(result)
            except queue.Empty:
                break
        
        return results
    
    def clear_buffer(self):
        """清空缓冲区"""
        with self.buffer_lock:
            self.audio_buffer = np.array([], dtype=np.float32)
        print("缓冲区已清空")

# 使用示例
def stream_asr_example():
    """流式识别示例"""
    import pyaudio
    import wave
    
    # 创建流式识别器
    asr = StreamASR(chunk_duration=2.0, overlap=0.5)
    
    # 音频录制参数
    CHUNK = 1024
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 16000
    RECORD_SECONDS = 10
    
    p = pyaudio.PyAudio()
    
    print("开始录制音频...")
    
    # 打开音频流
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=CHUNK)
    
    # 开始流式识别
    asr.start()
    
    print(f"录制 {RECORD_SECONDS} 秒音频...")
    
    # 录制并处理音频
    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        # 读取音频数据
        data = stream.read(CHUNK)
        audio_chunk = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
        
        # 添加到识别器
        asr.add_audio(audio_chunk)
        
        # 获取并显示结果
        results = asr.get_results(block=False)
        for result in results:
            print(f"实时识别: {result['text']}")
    
    # 停止
    stream.stop_stream()
    stream.close()
    p.terminate()
    
    # 等待处理完成
    time.sleep(1)
    asr.stop()
    
    # 获取所有结果
    all_results = asr.get_results()
    print(f"\n总共识别到 {len(all_results)} 个片段")
    
    # 合并结果
    full_text = " ".join([r["text"] for r in all_results])
    print(f"完整文本: {full_text}")

if __name__ == "__main__":
    # 注意:这个示例需要pyaudio库
    # 安装: pip install pyaudio
    try:
        stream_asr_example()
    except ImportError:
        print("需要安装pyaudio: pip install pyaudio")
    except Exception as e:
        print(f"错误: {e}")

5.3 性能优化技巧

Qwen3-ASR-0.6B已经相当高效,但我们还可以进一步优化:

技巧1:使用半精度推理 如果使用GPU,启用半精度可以显著减少内存使用并提高速度:

# 在模型加载时指定半精度
model = AutoModelForSpeechSeq2Seq.from_pretrained(
    "Qwen/Qwen3-ASR-0.6B",
    torch_dtype=torch.float16,  # 半精度
    low_cpu_mem_usage=True,
    use_safetensors=True
).to("cuda")

技巧2:批处理推理 如果有多个音频文件需要处理,使用批处理可以提高效率:

def batch_transcribe(audio_files, batch_size=4):
    """批量转录多个音频文件"""
    # 加载所有音频
    audios = []
    for file in audio_files:
        audio, sr = librosa.load(file, sr=16000, mono=True)
        audios.append(audio)
    
    # 分批处理
    results = []
    for i in range(0, len(audios), batch_size):
        batch = audios[i:i+batch_size]
        
        # 预处理
        inputs = processor(
            batch,
            sampling_rate=16000,
            return_tensors="pt",
            padding=True  # 自动填充
        ).to(device)
        
        # 批量识别
        with torch.no_grad():
            generated_ids = model.generate(
                inputs["input_features"],
                max_new_tokens=256
            )
        
        # 解码
        batch_results = processor.batch_decode(
            generated_ids,
            skip_special_tokens=True
        )
        
        results.extend(batch_results)
    
    return results

技巧3:缓存模型 如果频繁使用,可以将模型缓存到内存中:

import hashlib
import pickle
import os

class CachedASR:
    def __init__(self, model_name="Qwen/Qwen3-ASR-0.6B", cache_dir="./cache"):
        self.model_name = model_name
        self.cache_dir = cache_dir
        self.cache = {}
        
        os.makedirs(cache_dir, exist_ok=True)
    
    def get_cache_key(self, audio_data, language):
        """生成缓存键"""
        # 使用音频数据的MD5和语言作为键
        audio_hash = hashlib.md5(audio_data.tobytes()).hexdigest()
        return f"{audio_hash}_{language}"
    
    def transcribe(self, audio_data, language="自动检测", use_cache=True):
        """带缓存的转录"""
        if use_cache:
            cache_key = self.get_cache_key(audio_data, language)
            cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
            
            # 检查缓存
            if os.path.exists(cache_file):
                with open(cache_file, 'rb') as f:
                    return pickle.load(f)
        
        # 实际识别
        result = self._transcribe_impl(audio_data, language)
        
        # 保存到缓存
        if use_cache:
            cache_key = self.get_cache_key(audio_data, language)
            cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
            with open(cache_file, 'wb') as f:
                pickle.dump(result, f)
        
        return result
    
    def _transcribe_impl(self, audio_data, language):
        """实际识别实现"""
        # 这里放实际的识别代码
        pass

6. 常见问题与解决方案

6.1 安装问题

问题1:PyTorch安装失败

错误:Could not find a version that satisfies the requirement torch

解决方案: 使用官方安装命令,指定正确的版本:

# 对于CPU版本
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu

# 对于CUDA 11.8
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# 对于CUDA 12.1
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

问题2:内存不足

错误:CUDA out of memory

解决方案:

  • 使用CPU版本:model.to("cpu")
  • 使用半精度:torch_dtype=torch.float16
  • 减少批处理大小
  • 使用low_cpu_mem_usage=True参数

6.2 运行时问题

问题3:音频格式不支持

错误:libsndfile failed to open file

解决方案: 使用我们的load_audio_file函数,它支持多种格式:

from audio_utils import load_audio_file

# 自动处理各种格式
audio, sr = load_audio_file("your_audio.mp3")

问题4:识别结果不准确

现象:中文识别成英文,或者全是乱码

解决方案:

  1. 确保音频质量:清晰的语音,背景噪音小
  2. 指定正确的语言:
# 明确指定语言
generate_kwargs = {
    "input_features": inputs["input_features"],
    "max_new_tokens": 256,
    "forced_decoder_ids": processor.get_decoder_prompt_ids(
        language="chinese",  # 指定中文
        task="transcribe"
    )
}
  1. 调整音频参数:
# 预处理时调整参数
inputs = processor(
    audio,
    sampling_rate=16000,
    return_tensors="pt",
    padding="longest",  # 对长音频更好
    truncation=True,    # 截断过长的音频
    max_length=480000   # 最大30秒
)

6.3 性能问题

问题5:识别速度慢

现象:处理1分钟音频需要30秒以上

解决方案:

  1. 使用GPU加速
  2. 启用半精度推理
  3. 使用流式识别,分块处理
  4. 调整生成参数:
# 使用贪婪解码,速度更快但可能质量稍低
generated_ids = model.generate(
    inputs["input_features"],
    max_new_tokens=128,      # 减少最大token数
    num_beams=1,            # 不使用beam search
    do_sample=False,        # 贪婪解码
    temperature=1.0,        # 默认温度
)

问题6:内存占用过高

现象:处理长音频时内存爆满

解决方案:

  1. 分块处理长音频:
def transcribe_long_audio(audio_path, chunk_duration=30):
    """分块处理长音频"""
    audio, sr = librosa.load(audio_path, sr=16000, mono=True)
    total_duration = len(audio) / sr
    
    results = []
    chunk_samples = chunk_duration * sr
    
    for i in range(0, len(audio), chunk_samples):
        chunk = audio[i:i+chunk_samples]
        if len(chunk) < sr * 0.5:  # 小于0.5秒跳过
            continue
            
        # 处理每个chunk
        text = transcribe_chunk(chunk)
        results.append(text)
        
        # 清理内存
        torch.cuda.empty_cache() if torch.cuda.is_available() else None
    
    return " ".join(results)
  1. 使用内存映射:
# 使用内存映射加载大模型
model = AutoModelForSpeechSeq2Seq.from_pretrained(
    "Qwen/Qwen3-ASR-0.6B",
    torch_dtype=torch.float16,
    low_cpu_mem_usage=True,
    device_map="auto",  # 自动分配设备
    offload_folder="./offload"  # 溢出到磁盘
)

7. 实际应用案例

7.1 会议记录系统

让我们构建一个完整的会议记录系统,将语音实时转写成文字,并保存为会议纪要。

创建meeting_minutes.py

import os
import json
from datetime import datetime
import gradio as gr
from typing import List, Dict
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor

class MeetingMinutesSystem:
    def __init__(self):
        """会议记录系统"""
        self.model = None
        self.processor = None
        self.device = None
        self.current_meeting = None
        self.transcriptions = []
        
    def initialize(self):
        """初始化系统"""
        if self.model is not None:
            return "系统已初始化"
        
        try:
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
            
            print("加载会议记录模型...")
            self.processor = AutoProcessor.from_pretrained("Qwen/Qwen3-ASR-0.6B")
            self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
                "Qwen/Qwen3-ASR-0.6B",
                torch_dtype=torch_dtype,
                low_cpu_mem_usage=True
            ).to(self.device)
            
            # 创建会议记录目录
            os.makedirs("meetings", exist_ok=True)
            
            return "✓ 会议记录系统初始化完成"
            
        except Exception as e:
            return f"✗ 初始化失败: {str(e)}"
    
    def start_meeting(self, meeting_title: str, participants: str):
        """开始新的会议"""
        if self.model is None:
            return "错误:请先初始化系统", None
        
        # 创建会议记录
        self.current_meeting = {
            "title": meeting_title,
            "participants": participants.split(","),
            "start_time": datetime.now().isoformat(),
            "transcriptions": []
        }
        
        self.transcriptions = []
        
        # 生成会议ID
        meeting_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"meetings/meeting_{meeting_id}.json"
        
        return f"会议 '{meeting_title}' 已开始", filename
    
    def transcribe_audio(self, audio_input, speaker: str = ""):
        """转录会议音频"""
        if self.model is None or self.current_meeting is None:
            return "错误:请先初始化系统并开始会议", ""
        
        try:
            # 获取音频数据
            sample_rate, audio_data = audio_input
            
            # 预处理
            inputs = self.processor(
                audio_data,
                sampling_rate=sample_rate,
                return_tensors="pt"
            ).to(self.device)
            
            # 识别
            with torch.no_grad():
                generated_ids = self.model.generate(
                    inputs["input_features"],
                    max_new_tokens=256
                )
            
            # 解码
            text = self.processor.batch_decode(
                generated_ids,
                skip_special_tokens=True
            )[0]
            
            # 添加说话人信息
            if speaker:
                text = f"{speaker}: {text}"
            
            # 记录时间戳
            timestamp = datetime.now().strftime("%H:%M:%S")
            entry = {
                "time": timestamp,
                "speaker": speaker,
                "text": text
            }
            
            self.transcriptions.append(entry)
            self.current_meeting["transcriptions"].append(entry)
            
            # 实时保存
            self.save_meeting()
            
            return f"[{timestamp}] {text}", text
            
        except Exception as e:
            return f"识别失败: {str(e)}", ""
    
    def save_meeting(self):
        """保存会议记录"""
        if self.current_meeting is None:
            return
        
        # 更新结束时间
        self.current_meeting["end_time"] = datetime.now().isoformat()
        self.current_meeting["duration"] = len(self.transcriptions)
        
        # 保存为JSON
        meeting_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"meetings/meeting_{meeting_id}.json"
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.current_meeting, f, ensure_ascii=False, indent=2)
        
        # 同时保存为文本格式
        txt_filename = f"meetings/meeting_{meeting_id}.txt"
        with open(txt_filename, 'w', encoding='utf-8') as f:
            f.write(f"会议标题: {self.current_meeting['title']}\n")
            f.write(f"参会人员: {', '.join(self.current_meeting['participants'])}\n")
            f.write(f"开始时间: {self.current_meeting['start_time']}\n")
            f.write(f"结束时间: {self.current_meeting['end_time']}\n")
            f.write("=" * 50 + "\n\n")
            
            for entry in self.transcriptions:
                f.write(f"[{entry['time']}] {entry['speaker']}: {entry['text']}\n")
        
        return filename, txt_filename
    
    def generate_summary(self):
        """生成会议摘要"""
        if not self.transcriptions:
            return "暂无会议内容"
        
        # 简单的摘要生成:提取关键信息
        all_text = " ".join([entry["text"] for entry in self.transcriptions])
        
        # 这里可以集成文本摘要模型
        # 暂时使用简单规则
        summary = f"会议摘要:\n"
        summary += f"• 会议时长: {len(self.transcriptions)} 条记录\n"
        summary += f"• 主要内容: {all_text[:200]}...\n"
        summary += f"• 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        
        return summary
    
    def create_interface(self):
        """创建会议记录界面"""
        with gr.Blocks(title="智能会议记录系统", theme=gr.themes.Soft()) as demo:
            gr.Markdown("# 📝 智能会议记录系统")
            gr.Markdown("基于Qwen3-ASR-0.6B的实时会议转录与纪要生成")
            
            with gr.Row():
                with gr.Column(scale=1):
                    # 系统初始化
                    gr.Markdown("## 1. 系统初始化")
                    init_btn = gr.Button("初始化系统", variant="primary")
                    init_status = gr.Textbox(label="初始化状态", interactive=False)
                    
                    # 会议设置
                    gr.Markdown("## 2. 会议设置")
                    meeting_title = gr.Textbox(label="会议标题", placeholder="例如:项目周会")
                    participants = gr.Textbox(
                        label="参会人员",
                        placeholder="用逗号分隔,例如:张三,李四,王五"
                    )
                    start_btn = gr.Button("开始会议", variant="primary")
                    meeting_status = gr.Textbox(label="会议状态", interactive=False)
                    
                    # 说话人标识
                    gr.Markdown("## 3. 说话人标识")
                    speaker = gr.Dropdown(
                        choices=["主持人", "发言人1", "发言人2", "发言人3", "其他"],
                        value="主持人",
                        label="当前说话人"
                    )
                    
                    # 会议控制
                    gr.Markdown("## 4. 会议控制")
                    save_btn = gr.Button("保存会议", variant="secondary")
                    summary_btn = gr.Button("生成摘要", variant="secondary")
                    
                with gr.Column(scale=2):
                    # 音频输入
                    gr.Markdown("## 5. 音频输入")
                    audio_input = gr.Audio(
                        sources=["microphone"],
                        type="numpy",
                        label="录制会议语音",
                        interactive=True
                    )
                    
                    # 实时转录
                    gr.Markdown("## 6. 实时转录")
                    transcribe_btn = gr.Button("转录当前语音", variant="primary", size="lg")
                    
                    with gr.Row():
                        realtime_output = gr.Textbox(
                            label="带时间戳的记录",
                            interactive=False,
                            lines=4
                        )
                        raw_output = gr.Textbox(
                            label="原始文本",
                            interactive=True,
                            lines=4
                        )
                    
                    # 会议记录显示
                    gr.Markdown("## 7. 会议记录")
                    meeting_log = gr.Textbox(
                        label="完整会议记录",
                        interactive=False,
                        lines=10,
                        value=""
                    )
                    
                    # 会议摘要
                    gr.Markdown("## 8. 会议摘要")
                    meeting_summary = gr.Textbox(
                        label="会议摘要",
                        interactive=False,
                        lines=5
                    )
            
            # 绑定事件
            init_btn.click(
                fn=self.initialize,
                outputs=init_status
            )
            
            def start_new_meeting(title, participants):
                return self.start_meeting(title, participants)
            
            start_btn.click(
                fn=start_new_meeting,
                inputs=[meeting_title, participants],
                outputs=[meeting_status, meeting_status]
            )
            
            def transcribe_with_speaker(audio, spk):
                return self.transcribe_audio(audio, spk)
            
            transcribe_btn.click(
                fn=transcribe_with_speaker,
                inputs=[audio_input, speaker],
                outputs=[realtime_output, raw_output]
            ).then(
                fn=lambda: "\n".join([f"[{t['time']}] {t['speaker']}: {t['text']}" 
                                     for t in self.transcriptions[-10:]]),
                outputs=meeting_log
            )
            
            save_btn.click(
                fn=self.save_meeting,
                outputs=[gr.Textbox(visible=False), gr.Textbox(visible=False)]
            ).then(
                fn=lambda: "会议已保存到 meetings/ 目录",
                outputs=meeting_status
            )
            
            summary_btn.click(
                fn=self.generate_summary,
                outputs=meeting_summary
            )
        
        return demo

def main():
    """主函数"""
    system = MeetingMinutesSystem()
    demo = system.create_interface()
    
    print("启动智能会议记录系统...")
    print("访问 http://localhost:7860 使用系统")
    
    demo.launch(
        server_name="0.0.0.0",
        server_port=7860,
        share=False
    )

if __name__ == "__main__":
    main()

这个会议记录系统提供了完整的功能:

  1. 会议开始/结束管理
  2. 参会人员记录
  3. 实时语音转录
  4. 说话人标识
  5. 自动保存会议记录
  6. 会议摘要生成

7.2 语音指令系统

另一个常见应用是语音指令系统。让我们创建一个简单的智能家居语音控制示例:

import re
from typing import Dict, List, Optional

class VoiceCommandSystem:
    def __init__(self):
        """语音指令系统"""
        self.commands = {
            "开灯": self.turn_on_light,
            "关灯": self.turn_off_light,
            "打开空调": self.turn_on_ac,
            "关闭空调": self.turn_off_ac,
            "调高温度": self.increase_temp,
            "调低温度": self.decrease_temp,
            "打开窗帘": self.open_curtain,
            "关闭窗帘": self.close_curtain,
            "播放音乐": self.play_music,
            "停止音乐": self.stop_music,
            "今天天气": self.get_weather,
            "现在几点": self.get_time,
        }
        
        self.device_status = {
            "light": False,
            "ac": False,
            "temperature": 24,
            "curtain": False,
            "music": False
        }
    
    def parse_command(self, text: str) -> Optional[str]:
        """解析语音指令"""
        text = text.lower().strip()
        
        # 模糊匹配指令
        for command, func in self.commands.items():
            if command in text:
                return func()
        
        # 处理带参数的指令
        if "温度调到" in text:
            match = re.search(r"温度调到(\d+)", text)
            if match:
                temp = int(match.group(1))
                return self.set_temperature(temp)
        
        return "抱歉,我没有听懂这个指令"
    
    def turn_on_light(self) -> str:
        self.device_status["light"] = True
        return "好的,已打开灯光"
    
    def turn_off_light(self) -> str:
        self.
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐