Qwen3-ASR-0.6B保姆级教程:环境搭建到语音识别全流程
本文介绍了如何在星图GPU平台上自动化部署Qwen3-ASR-0.6B镜像,快速搭建本地语音识别服务。该平台简化了环境配置流程,用户可一键部署此轻量级模型,并将其应用于智能会议记录、实时语音转文字等典型场景,实现高效、离线的语音处理。
Qwen3-ASR-0.6B保姆级教程:环境搭建到语音识别全流程
1. 为什么选择Qwen3-ASR-0.6B
语音识别正在从云端走向边缘,从服务器走向设备端。无论是智能音箱、车载语音助手,还是工业控制设备,都希望能在本地完成语音转文字,而不是把音频数据上传到云端。这背后有三个核心需求:隐私保护、低延迟响应、离线可用。
Qwen3-ASR-0.6B就是为这个趋势而生的轻量级语音识别模型。它只有6亿参数,却能支持52种语言和方言,包括22种中文方言。更关键的是,它在128并发下能达到2000倍吞吐量——这意味着用10秒钟就能处理5小时的音频数据。对于嵌入式设备和边缘计算场景来说,这个效率太重要了。
我最近在做一个智能会议记录设备,需要实时把会议内容转成文字。最初尝试用云端API,结果网络一波动,识别就中断,用户体验很差。后来找到了Qwen3-ASR-0.6B,发现它不仅能离线运行,识别准确率还相当不错。更重要的是,官方提供了Gradio界面,让部署和测试变得特别简单。
这篇文章就是我的实战记录。我会带你从零开始,一步步搭建Qwen3-ASR-0.6B的完整环境,让你在10分钟内就能跑起自己的语音识别服务。无论你是AI新手还是有一定经验的开发者,都能跟着这个教程快速上手。
2. 环境准备与快速部署
2.1 系统要求检查
在开始之前,我们先确认一下你的环境是否满足要求。Qwen3-ASR-0.6B对硬件要求不高,但有些基础依赖需要提前准备好。
基础要求:
- 操作系统:Linux(Ubuntu 20.04+推荐)、macOS、Windows(WSL2)
- Python版本:3.8-3.11
- 内存:至少8GB RAM(16GB更佳)
- 磁盘空间:至少10GB可用空间
Python依赖包:
- PyTorch 2.0+
- Transformers 4.35+
- Gradio 4.0+
- 其他音频处理库
如果你不确定自己的环境,可以打开终端,运行以下命令检查:
# 检查Python版本
python3 --version
# 检查PyTorch是否安装
python3 -c "import torch; print(f'PyTorch版本: {torch.__version__}')"
# 检查CUDA是否可用(如果有GPU)
python3 -c "import torch; print(f'CUDA可用: {torch.cuda.is_available()}')"
如果你的Python版本低于3.8,或者没有安装PyTorch,别担心,我们接下来会一步步安装。
2.2 一键安装脚本
为了让大家快速上手,我准备了一个一键安装脚本。这个脚本会自动检查环境,安装所有必要的依赖。
创建一个名为install_qwen_asr.sh的文件:
#!/bin/bash
echo "开始安装Qwen3-ASR-0.6B环境..."
# 检查Python版本
python_version=$(python3 --version 2>&1 | cut -d' ' -f2)
echo "当前Python版本: $python_version"
# 检查并安装pip
if ! command -v pip3 &> /dev/null; then
echo "pip3未安装,正在安装..."
sudo apt-get update
sudo apt-get install -y python3-pip
fi
# 升级pip
pip3 install --upgrade pip
# 安装PyTorch(根据是否有GPU选择不同版本)
if python3 -c "import torch; print(torch.cuda.is_available())" 2>/dev/null | grep -q "True"; then
echo "检测到GPU,安装GPU版本的PyTorch..."
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
else
echo "未检测到GPU,安装CPU版本的PyTorch..."
pip3 install torch torchvision torchaudio
fi
# 安装其他依赖
echo "安装transformers和gradio..."
pip3 install transformers gradio
# 安装音频处理库
echo "安装音频处理库..."
pip3 install soundfile librosa pydub
# 验证安装
echo "验证安装..."
python3 -c "
import torch
import transformers
import gradio
print('✓ PyTorch版本:', torch.__version__)
print('✓ Transformers版本:', transformers.__version__)
print('✓ Gradio版本:', gradio.__version__)
print('✓ 所有依赖安装完成!')
"
echo "安装完成!现在可以运行Qwen3-ASR-0.6B了。"
给脚本添加执行权限并运行:
# 添加执行权限
chmod +x install_qwen_asr.sh
# 运行安装脚本
./install_qwen_asr.sh
脚本运行过程中,你会看到各种包的安装进度。如果网络正常,整个过程大概需要5-10分钟。
2.3 快速验证安装
安装完成后,我们写一个简单的测试脚本来验证环境是否正常。
创建test_environment.py文件:
import torch
import transformers
import gradio as gr
import soundfile as sf
import numpy as np
print("=== 环境测试开始 ===")
# 测试PyTorch
print(f"1. PyTorch版本: {torch.__version__}")
print(f"2. CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"3. GPU设备: {torch.cuda.get_device_name(0)}")
# 测试Transformers
print(f"4. Transformers版本: {transformers.__version__}")
# 测试Gradio
print(f"5. Gradio版本: {gr.__version__}")
# 测试音频库
try:
# 创建一个测试音频
sample_rate = 16000
duration = 1.0 # 1秒
t = np.linspace(0, duration, int(sample_rate * duration))
audio_data = 0.5 * np.sin(2 * np.pi * 440 * t) # 440Hz正弦波
# 保存为WAV文件
sf.write('test_audio.wav', audio_data, sample_rate)
print("6. 音频库测试: ✓ 成功创建测试音频")
# 读取音频文件
data, sr = sf.read('test_audio.wav')
print(f"7. 音频读取测试: ✓ 采样率{sr}Hz, 长度{len(data)}个样本")
# 清理测试文件
import os
os.remove('test_audio.wav')
except Exception as e:
print(f"6. 音频库测试: ✗ 失败 - {e}")
print("=== 环境测试完成 ===")
print("如果所有测试都通过,说明环境配置正确!")
运行测试脚本:
python3 test_environment.py
如果看到所有测试项都显示成功(✓),恭喜你!环境已经准备好了。如果有任何失败,脚本会告诉你具体问题,你可以根据错误信息进行修复。
3. 模型下载与基础使用
3.1 下载Qwen3-ASR-0.6B模型
Qwen3-ASR-0.6B模型可以通过Hugging Face直接下载。官方提供了两种方式:通过Python代码自动下载,或者手动下载后使用。
方式一:自动下载(推荐)
这是最简单的方式,模型会在第一次使用时自动下载。创建一个简单的Python脚本:
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
import torch
# 指定模型名称
model_name = "Qwen/Qwen3-ASR-0.6B"
print(f"开始下载模型: {model_name}")
print("这可能需要一些时间,请耐心等待...")
try:
# 下载模型和处理器
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
low_cpu_mem_usage=True,
use_safetensors=True
)
processor = AutoProcessor.from_pretrained(model_name)
print("✓ 模型下载完成!")
print(f"模型类型: {type(model).__name__}")
print(f"处理器类型: {type(processor).__name__}")
except Exception as e:
print(f"✗ 下载失败: {e}")
print("请检查网络连接,或尝试方式二手动下载")
运行这个脚本,它会自动从Hugging Face下载模型。第一次运行会比较慢,因为需要下载大约1.2GB的模型文件。下载完成后,模型会缓存在本地,下次使用就不需要重新下载了。
方式二:手动下载
如果自动下载速度太慢,或者网络有问题,可以手动下载:
- 访问Hugging Face模型页面:
https://huggingface.co/Qwen/Qwen3-ASR-0.6B - 点击"Files and versions"标签页
- 下载以下文件:
config.jsongeneration_config.jsonmodel.safetensors(主要模型文件)preprocessor_config.jsontokenizer.jsonvocab.json
- 将所有文件放在同一个目录下,比如
./models/qwen3-asr-0.6b/
然后修改代码,从本地加载模型:
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
# 指定本地模型路径
model_path = "./models/qwen3-asr-0.6b"
model = AutoModelForSpeechSeq2Seq.from_pretrained(model_path)
processor = AutoProcessor.from_pretrained(model_path)
3.2 第一个语音识别程序
现在我们来写一个最简单的语音识别程序,感受一下Qwen3-ASR-0.6B的能力。
创建first_asr.py文件:
import torch
import librosa
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
def transcribe_audio(audio_path):
"""
将音频文件转换为文字
"""
print(f"处理音频文件: {audio_path}")
# 1. 加载模型和处理器
print("加载模型...")
model_name = "Qwen/Qwen3-ASR-0.6B"
# 根据是否有GPU选择数据类型
if torch.cuda.is_available():
print("使用GPU加速")
torch_dtype = torch.float16
device = "cuda"
else:
print("使用CPU")
torch_dtype = torch.float32
device = "cpu"
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_name,
torch_dtype=torch_dtype,
low_cpu_mem_usage=True,
use_safetensors=True
).to(device)
processor = AutoProcessor.from_pretrained(model_name)
# 2. 加载音频文件
print("加载音频...")
audio, sample_rate = librosa.load(audio_path, sr=16000)
# 3. 预处理音频
print("预处理音频...")
inputs = processor(
audio,
sampling_rate=sample_rate,
return_tensors="pt"
).to(device)
# 4. 语音识别
print("开始识别...")
with torch.no_grad():
generated_ids = model.generate(
inputs["input_features"],
max_new_tokens=256
)
# 5. 解码结果
transcription = processor.batch_decode(
generated_ids,
skip_special_tokens=True
)[0]
print("识别完成!")
return transcription
if __name__ == "__main__":
# 测试用的音频文件路径
# 你可以录制一段自己的语音,或者使用示例音频
audio_file = "test_audio.wav" # 替换为你的音频文件
try:
result = transcribe_audio(audio_file)
print("\n" + "="*50)
print("识别结果:")
print(result)
print("="*50)
except FileNotFoundError:
print(f"错误: 找不到音频文件 {audio_file}")
print("请先录制一段语音,保存为test_audio.wav")
except Exception as e:
print(f"错误: {e}")
这个程序做了几件事:
- 加载Qwen3-ASR-0.6B模型和处理器
- 读取音频文件(需要是16kHz采样率的WAV文件)
- 对音频进行预处理,转换成模型能理解的格式
- 运行模型进行语音识别
- 输出识别结果
要测试这个程序,你需要先准备一个音频文件。可以用手机录制一段语音,保存为WAV格式,采样率设为16000Hz。或者用Python生成一个测试音频:
import soundfile as sf
import numpy as np
# 生成测试音频:说"你好,世界"
sample_rate = 16000
duration = 3.0 # 3秒
# 创建一个简单的音频信号(实际应用中应该用真实录音)
t = np.linspace(0, duration, int(sample_rate * duration))
# 模拟语音的简单波形
audio = 0.3 * np.sin(2 * np.pi * 200 * t) * np.exp(-0.5 * t)
# 保存为WAV文件
sf.write('test_audio.wav', audio, sample_rate)
print("测试音频已生成: test_audio.wav")
运行识别程序:
python3 first_asr.py
第一次运行会比较慢,因为需要加载模型。如果一切正常,你会看到模型加载的进度,然后输出识别结果。即使识别结果不准确也没关系,这只是一个基础测试,我们后面会优化识别效果。
3.3 支持多种音频格式
实际应用中,音频格式多种多样,可能是MP3、M4A、WAV等。Qwen3-ASR-0.6B需要16kHz单声道的PCM数据,所以我们需要处理各种格式的转换。
创建一个音频处理工具函数:
import librosa
import soundfile as sf
from pydub import AudioSegment
import numpy as np
import os
def load_audio_file(audio_path, target_sr=16000):
"""
加载各种格式的音频文件,并转换为模型需要的格式
"""
print(f"加载音频文件: {audio_path}")
# 检查文件是否存在
if not os.path.exists(audio_path):
raise FileNotFoundError(f"音频文件不存在: {audio_path}")
# 根据文件扩展名选择加载方式
file_ext = os.path.splitext(audio_path)[1].lower()
try:
if file_ext in ['.wav', '.flac', '.ogg']:
# 使用soundfile加载无损格式
audio, sr = sf.read(audio_path)
elif file_ext in ['.mp3', '.m4a', '.aac']:
# 使用pydub加载压缩格式
audio_segment = AudioSegment.from_file(audio_path)
# 转换为单声道
if audio_segment.channels > 1:
audio_segment = audio_segment.set_channels(1)
# 转换为目标采样率
if audio_segment.frame_rate != target_sr:
audio_segment = audio_segment.set_frame_rate(target_sr)
# 转换为numpy数组
audio = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
sr = target_sr
# 归一化到[-1, 1]
if audio_segment.sample_width == 2: # 16-bit
audio = audio / 32768.0
elif audio_segment.sample_width == 3: # 24-bit
audio = audio / 8388608.0
elif audio_segment.sample_width == 1: # 8-bit
audio = (audio - 128) / 128.0
else:
# 其他格式使用librosa
audio, sr = librosa.load(audio_path, sr=target_sr, mono=True)
except Exception as e:
# 如果上述方法都失败,尝试用librosa
print(f"使用备用方法加载音频: {e}")
audio, sr = librosa.load(audio_path, sr=target_sr, mono=True)
# 确保采样率正确
if sr != target_sr:
print(f"重新采样: {sr}Hz -> {target_sr}Hz")
audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
sr = target_sr
# 确保是单声道
if len(audio.shape) > 1:
print("转换为单声道")
audio = np.mean(audio, axis=1)
# 确保数据类型正确
audio = audio.astype(np.float32)
print(f"音频信息: 时长={len(audio)/sr:.2f}秒, 采样率={sr}Hz, 形状={audio.shape}")
return audio, sr
def save_audio_file(audio, sample_rate, output_path):
"""
保存音频文件
"""
# 确保音频在[-1, 1]范围内
audio = np.clip(audio, -1.0, 1.0)
# 根据扩展名选择保存格式
file_ext = os.path.splitext(output_path)[1].lower()
if file_ext == '.wav':
# WAV格式使用16-bit PCM
audio_int16 = (audio * 32767).astype(np.int16)
sf.write(output_path, audio_int16, sample_rate, subtype='PCM_16')
elif file_ext == '.flac':
# FLAC格式
sf.write(output_path, audio, sample_rate, format='FLAC')
elif file_ext == '.mp3':
# MP3格式需要pydub
from pydub import AudioSegment
# 先保存为WAV,再转MP3
temp_wav = "temp.wav"
audio_int16 = (audio * 32767).astype(np.int16)
sf.write(temp_wav, audio_int16, sample_rate)
audio_segment = AudioSegment.from_wav(temp_wav)
audio_segment.export(output_path, format="mp3", bitrate="192k")
os.remove(temp_wav)
else:
# 默认保存为WAV
audio_int16 = (audio * 32767).astype(np.int16)
sf.write(output_path, audio_int16, sample_rate)
print(f"音频已保存: {output_path}")
这个工具函数可以处理多种音频格式,并自动转换为模型需要的格式。你可以这样使用它:
# 加载任意格式的音频
audio, sr = load_audio_file("my_recording.mp3")
# 现在audio就是16kHz单声道的numpy数组
# 可以直接用于语音识别
# 如果需要,可以保存为WAV格式
save_audio_file(audio, sr, "converted.wav")
4. 使用Gradio构建Web界面
4.1 为什么选择Gradio
Gradio是一个Python库,可以快速为机器学习模型创建Web界面。对于语音识别应用来说,Gradio有几个明显优势:
- 简单易用:几行代码就能创建功能完整的界面
- 实时交互:支持录音、上传、实时识别
- 无需前端知识:纯Python实现,不需要HTML/CSS/JavaScript
- 自动部署:可以轻松分享给他人使用
对于Qwen3-ASR-0.6B这样的语音识别模型,Gradio特别适合,因为它内置了音频录制和播放功能。
4.2 创建基础语音识别界面
让我们创建一个完整的Gradio应用。创建gradio_app.py文件:
import gradio as gr
import torch
import numpy as np
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
import tempfile
import os
from datetime import datetime
class QwenASRApp:
def __init__(self):
"""初始化应用"""
self.model = None
self.processor = None
self.device = None
self.initialized = False
def initialize_model(self):
"""初始化模型"""
if self.initialized:
return "模型已初始化"
try:
print("开始初始化模型...")
# 检查是否有GPU
if torch.cuda.is_available():
self.device = "cuda"
torch_dtype = torch.float16
print("使用GPU加速")
else:
self.device = "cpu"
torch_dtype = torch.float32
print("使用CPU")
# 加载模型和处理器
model_name = "Qwen/Qwen3-ASR-0.6B"
print("加载处理器...")
self.processor = AutoProcessor.from_pretrained(model_name)
print("加载模型...")
self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_name,
torch_dtype=torch_dtype,
low_cpu_mem_usage=True,
use_safetensors=True
).to(self.device)
self.initialized = True
print("模型初始化完成!")
return "✓ 模型初始化成功!"
except Exception as e:
error_msg = f"模型初始化失败: {str(e)}"
print(error_msg)
return f"✗ {error_msg}"
def transcribe_audio(self, audio_input, language="自动检测"):
"""转录音频"""
if not self.initialized:
return "错误:模型未初始化,请先点击'初始化模型'按钮", None
try:
# 获取音频数据
if audio_input is None:
return "错误:请提供音频文件或录制语音", None
sample_rate, audio_data = audio_input
# 转换为单声道和16kHz
if len(audio_data.shape) > 1:
audio_data = np.mean(audio_data, axis=1)
# 如果采样率不是16kHz,需要重采样
if sample_rate != 16000:
import librosa
audio_data = librosa.resample(
audio_data.astype(np.float32),
orig_sr=sample_rate,
target_sr=16000
)
sample_rate = 16000
print(f"处理音频: {len(audio_data)/sample_rate:.2f}秒, {sample_rate}Hz")
# 预处理
inputs = self.processor(
audio_data,
sampling_rate=sample_rate,
return_tensors="pt"
).to(self.device)
# 设置生成参数
generate_kwargs = {
"input_features": inputs["input_features"],
"max_new_tokens": 256,
"do_sample": False, # 使用贪婪解码,速度更快
}
# 如果指定了语言,添加语言提示
if language != "自动检测":
generate_kwargs["forced_decoder_ids"] = self.processor.get_decoder_prompt_ids(
language=language,
task="transcribe"
)
# 语音识别
with torch.no_grad():
generated_ids = self.model.generate(**generate_kwargs)
# 解码结果
transcription = self.processor.batch_decode(
generated_ids,
skip_special_tokens=True
)[0]
# 添加时间戳
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result_text = f"[{timestamp}] {transcription}"
# 保存结果到文件
self.save_result(result_text)
return result_text, transcription
except Exception as e:
error_msg = f"识别失败: {str(e)}"
print(error_msg)
return error_msg, None
def save_result(self, text):
"""保存识别结果到文件"""
try:
os.makedirs("results", exist_ok=True)
today = datetime.now().strftime("%Y%m%d")
filename = f"results/transcriptions_{today}.txt"
with open(filename, "a", encoding="utf-8") as f:
f.write(text + "\n")
print(f"结果已保存到: {filename}")
except Exception as e:
print(f"保存结果失败: {e}")
def create_interface(self):
"""创建Gradio界面"""
with gr.Blocks(title="Qwen3-ASR-0.6B 语音识别", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🎤 Qwen3-ASR-0.6B 语音识别系统")
gr.Markdown("支持52种语言和方言的语音转文字")
with gr.Row():
with gr.Column(scale=1):
# 模型初始化部分
gr.Markdown("## 1. 初始化模型")
init_btn = gr.Button("初始化模型", variant="primary")
init_status = gr.Textbox(label="初始化状态", interactive=False)
# 语言选择
gr.Markdown("## 2. 选择语言")
language = gr.Dropdown(
choices=[
"自动检测",
"中文",
"英语",
"日语",
"韩语",
"法语",
"德语",
"西班牙语"
],
value="自动检测",
label="识别语言"
)
gr.Markdown("### 使用说明")
gr.Markdown("""
1. 点击"初始化模型"按钮加载模型(首次使用需要下载模型)
2. 选择识别语言(或使用自动检测)
3. 上传音频文件或直接录制语音
4. 点击"开始识别"按钮
5. 查看识别结果
""")
with gr.Column(scale=2):
# 音频输入部分
gr.Markdown("## 3. 音频输入")
with gr.Tabs():
with gr.TabItem("🎤 录制语音"):
audio_input = gr.Audio(
sources=["microphone"],
type="numpy",
label="录制语音",
interactive=True
)
with gr.TabItem("📁 上传文件"):
file_input = gr.Audio(
sources=["upload"],
type="numpy",
label="上传音频文件",
interactive=True
)
# 识别按钮
transcribe_btn = gr.Button("开始识别", variant="primary", size="lg")
# 结果显示部分
gr.Markdown("## 4. 识别结果")
result_with_time = gr.Textbox(
label="带时间戳的结果",
interactive=False,
lines=3
)
result_raw = gr.Textbox(
label="原始文本",
interactive=True,
lines=3
)
# 保存按钮
save_btn = gr.Button("保存结果", variant="secondary")
# 绑定事件
init_btn.click(
fn=self.initialize_model,
outputs=init_status
)
def process_audio(audio, lang):
return self.transcribe_audio(audio, lang)
transcribe_btn.click(
fn=process_audio,
inputs=[audio_input, language],
outputs=[result_with_time, result_raw]
)
# 文件输入也绑定相同的事件
file_input.change(
fn=lambda x: x,
inputs=[file_input],
outputs=[audio_input]
)
# 保存结果
def save_text(text):
if text:
self.save_result(f"[手动保存] {text}")
return "结果已保存"
return "无内容可保存"
save_btn.click(
fn=save_text,
inputs=result_raw,
outputs=gr.Textbox(label="保存状态", visible=False)
)
# 添加示例
gr.Markdown("## 示例音频")
gr.Examples(
examples=[
["examples/hello_chinese.wav", "中文"],
["examples/hello_english.wav", "英语"],
],
inputs=[file_input, language],
outputs=[result_with_time, result_raw],
fn=process_audio,
cache_examples=False
)
return demo
def main():
"""主函数"""
app = QwenASRApp()
demo = app.create_interface()
# 启动服务
print("启动Gradio服务...")
print("访问 http://localhost:7860 使用语音识别系统")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False, # 设置为True可以生成公共链接
debug=True
)
if __name__ == "__main__":
main()
这个应用提供了完整的功能:
- 模型初始化按钮
- 语言选择(支持自动检测和指定语言)
- 两种音频输入方式:录制和上传
- 实时识别结果显示
- 结果保存功能
- 示例音频
4.3 运行和测试应用
在运行应用之前,我们需要创建示例音频文件。创建一个examples目录,并添加一些测试音频:
# 创建示例目录
mkdir -p examples
# 创建示例音频(可以使用文本转语音工具生成)
# 这里我们用Python生成简单的测试音频
python3 create_examples.py
create_examples.py内容:
import numpy as np
import soundfile as sf
import os
def create_sine_wave(freq, duration, sample_rate=16000):
"""创建正弦波音频"""
t = np.linspace(0, duration, int(sample_rate * duration))
audio = 0.3 * np.sin(2 * np.pi * freq * t)
return audio
def create_chirp(start_freq, end_freq, duration, sample_rate=16000):
"""创建扫频信号"""
t = np.linspace(0, duration, int(sample_rate * duration))
freq = np.linspace(start_freq, end_freq, len(t))
audio = 0.3 * np.sin(2 * np.pi * freq * t)
return audio
# 创建中文示例(模拟"你好,世界")
print("创建中文示例音频...")
chinese_audio = create_chirp(200, 400, 1.5)
sf.write('examples/hello_chinese.wav', chinese_audio, 16000)
# 创建英文示例(模拟"Hello, world")
print("创建英文示例音频...")
english_audio = create_sine_wave(300, 1.0)
english_audio = np.concatenate([english_audio, create_sine_wave(250, 0.5)])
sf.write('examples/hello_english.wav', english_audio, 16000)
print("示例音频创建完成!")
现在运行Gradio应用:
python3 gradio_app.py
你会看到类似这样的输出:
启动Gradio服务...
访问 http://localhost:7860 使用语音识别系统
Running on local URL: http://0.0.0.0:7860
打开浏览器,访问 http://localhost:7860,你会看到一个漂亮的Web界面。
4.4 使用步骤详解
让我们一步步使用这个应用:
第一步:初始化模型 点击"初始化模型"按钮。第一次运行时会下载模型文件,这可能需要一些时间(取决于网络速度)。下载完成后,会显示"✓ 模型初始化成功!"。
第二步:选择语言 在下拉菜单中选择识别语言。如果你不确定音频的语言,选择"自动检测",模型会自动识别。
第三步:输入音频 有两种方式输入音频:
- 录制语音:点击录音按钮,允许浏览器访问麦克风,然后开始说话
- 上传文件:点击上传按钮,选择本地音频文件(支持WAV、MP3、M4A等格式)
第四步:开始识别 点击"开始识别"按钮。你会看到处理进度,几秒钟后识别结果就会显示在下方。
第五步:查看和保存结果 识别结果会显示在两个文本框中:
- 第一个框显示带时间戳的结果
- 第二个框显示原始文本,你可以编辑和复制
点击"保存结果"按钮,识别结果会被保存到results目录下的文本文件中,按日期分类。
5. 高级功能与优化技巧
5.1 批量处理音频文件
在实际应用中,我们经常需要批量处理多个音频文件。让我们创建一个批量处理脚本:
import os
import glob
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
from tqdm import tqdm
import json
from datetime import datetime
class BatchASRProcessor:
def __init__(self, model_path="Qwen/Qwen3-ASR-0.6B", device=None):
"""初始化批量处理器"""
self.model_path = model_path
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.model = None
self.processor = None
def initialize(self):
"""初始化模型"""
print(f"初始化模型,使用设备: {self.device}")
torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
self.processor = AutoProcessor.from_pretrained(self.model_path)
self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
self.model_path,
torch_dtype=torch_dtype,
low_cpu_mem_usage=True,
use_safetensors=True
).to(self.device)
print("模型初始化完成")
def process_file(self, audio_path, language="自动检测"):
"""处理单个音频文件"""
if self.model is None:
self.initialize()
try:
# 加载音频
import librosa
audio, sr = librosa.load(audio_path, sr=16000, mono=True)
# 预处理
inputs = self.processor(
audio,
sampling_rate=sr,
return_tensors="pt"
).to(self.device)
# 设置生成参数
generate_kwargs = {
"input_features": inputs["input_features"],
"max_new_tokens": 256,
}
# 语言设置
if language != "自动检测":
generate_kwargs["forced_decoder_ids"] = self.processor.get_decoder_prompt_ids(
language=language,
task="transcribe"
)
# 识别
with torch.no_grad():
generated_ids = self.model.generate(**generate_kwargs)
# 解码
transcription = self.processor.batch_decode(
generated_ids,
skip_special_tokens=True
)[0]
return {
"file": audio_path,
"text": transcription,
"language": language,
"status": "success",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"file": audio_path,
"error": str(e),
"status": "failed",
"timestamp": datetime.now().isoformat()
}
def process_folder(self, folder_path, output_file="results.json", language="自动检测"):
"""处理整个文件夹的音频文件"""
# 查找所有音频文件
audio_extensions = ['*.wav', '*.mp3', '*.m4a', '*.flac', '*.ogg']
audio_files = []
for ext in audio_extensions:
audio_files.extend(glob.glob(os.path.join(folder_path, ext)))
if not audio_files:
print(f"在 {folder_path} 中未找到音频文件")
return []
print(f"找到 {len(audio_files)} 个音频文件")
# 初始化模型
if self.model is None:
self.initialize()
# 批量处理
results = []
for audio_file in tqdm(audio_files, desc="处理音频文件"):
result = self.process_file(audio_file, language)
results.append(result)
# 实时保存进度
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
# 统计结果
success_count = sum(1 for r in results if r["status"] == "success")
failed_count = len(results) - success_count
print(f"\n处理完成!")
print(f"成功: {success_count} 个文件")
print(f"失败: {failed_count} 个文件")
if output_file:
print(f"结果已保存到: {output_file}")
return results
def process_file_list(self, file_list, output_file="results.json", language="自动检测"):
"""处理文件列表"""
if not file_list:
print("文件列表为空")
return []
print(f"处理 {len(file_list)} 个文件")
# 初始化模型
if self.model is None:
self.initialize()
# 批量处理
results = []
for audio_file in tqdm(file_list, desc="处理文件"):
if os.path.exists(audio_file):
result = self.process_file(audio_file, language)
results.append(result)
else:
results.append({
"file": audio_file,
"error": "文件不存在",
"status": "failed",
"timestamp": datetime.now().isoformat()
})
# 实时保存进度
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
return results
def main():
"""批量处理示例"""
import argparse
parser = argparse.ArgumentParser(description="批量语音识别")
parser.add_argument("--input", "-i", required=True, help="输入文件或文件夹路径")
parser.add_argument("--output", "-o", default="results.json", help="输出JSON文件路径")
parser.add_argument("--language", "-l", default="自动检测", help="识别语言")
parser.add_argument("--device", "-d", choices=["cpu", "cuda"], help="运行设备")
args = parser.parse_args()
# 创建处理器
processor = BatchASRProcessor(device=args.device)
# 判断输入是文件还是文件夹
if os.path.isfile(args.input):
# 单个文件
result = processor.process_file(args.input, args.language)
print(f"识别结果: {result['text']}")
# 保存结果
with open(args.output, 'w', encoding='utf-8') as f:
json.dump([result], f, ensure_ascii=False, indent=2)
elif os.path.isdir(args.input):
# 文件夹
processor.process_folder(args.input, args.output, args.language)
else:
print(f"错误: {args.input} 不是有效的文件或文件夹")
if __name__ == "__main__":
main()
使用这个批量处理脚本:
# 处理单个文件
python3 batch_asr.py --input audio.wav --output result.json
# 处理整个文件夹
python3 batch_asr.py --input ./audio_files/ --output all_results.json --language 中文
# 指定使用CPU
python3 batch_asr.py --input audio.wav --device cpu
5.2 流式识别实现
对于实时应用,我们需要流式识别功能。Qwen3-ASR-0.6B支持流式推理,让我们实现一个简单的流式识别示例:
import torch
import numpy as np
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
import queue
import threading
import time
class StreamASR:
def __init__(self, model_name="Qwen/Qwen3-ASR-0.6B", chunk_duration=1.0, overlap=0.5):
"""
流式语音识别
参数:
model_name: 模型名称
chunk_duration: 每次处理的音频时长(秒)
overlap: 重叠时长(秒),用于平滑过渡
"""
self.chunk_duration = chunk_duration
self.overlap = overlap
self.sample_rate = 16000
# 计算chunk大小
self.chunk_samples = int(self.chunk_duration * self.sample_rate)
self.overlap_samples = int(self.overlap * self.sample_rate)
# 初始化模型
self.device = "cuda" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
print(f"加载模型: {model_name}")
self.processor = AutoProcessor.from_pretrained(model_name)
self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_name,
torch_dtype=torch_dtype,
low_cpu_mem_usage=True,
use_safetensors=True
).to(self.device)
# 音频缓冲区
self.audio_buffer = np.array([], dtype=np.float32)
self.buffer_lock = threading.Lock()
# 结果队列
self.result_queue = queue.Queue()
# 处理线程
self.processing = False
self.processing_thread = None
def add_audio(self, audio_chunk):
"""添加音频数据到缓冲区"""
with self.buffer_lock:
self.audio_buffer = np.concatenate([self.audio_buffer, audio_chunk])
def process_chunk(self):
"""处理一个音频chunk"""
with self.buffer_lock:
if len(self.audio_buffer) < self.chunk_samples:
return None
# 取一个chunk
chunk = self.audio_buffer[:self.chunk_samples]
# 保留重叠部分
keep_samples = self.chunk_samples - self.overlap_samples
if keep_samples > 0:
self.audio_buffer = self.audio_buffer[keep_samples:]
else:
self.audio_buffer = np.array([], dtype=np.float32)
return chunk
def transcribe_chunk(self, audio_chunk):
"""转录一个音频chunk"""
# 预处理
inputs = self.processor(
audio_chunk,
sampling_rate=self.sample_rate,
return_tensors="pt"
).to(self.device)
# 识别
with torch.no_grad():
generated_ids = self.model.generate(
inputs["input_features"],
max_new_tokens=128, # 流式识别使用较短的token
do_sample=False
)
# 解码
transcription = self.processor.batch_decode(
generated_ids,
skip_special_tokens=True
)[0]
return transcription
def processing_loop(self):
"""处理循环"""
while self.processing:
# 获取一个chunk
chunk = self.process_chunk()
if chunk is not None:
try:
# 转录
text = self.transcribe_chunk(chunk)
# 放入结果队列
timestamp = time.time()
self.result_queue.put({
"text": text,
"timestamp": timestamp,
"duration": len(chunk) / self.sample_rate
})
print(f"[{timestamp:.3f}] 部分结果: {text}")
except Exception as e:
print(f"处理chunk失败: {e}")
# 短暂休眠,避免CPU占用过高
time.sleep(0.1)
def start(self):
"""开始流式识别"""
if self.processing:
print("已经在运行中")
return
self.processing = True
self.processing_thread = threading.Thread(target=self.processing_loop)
self.processing_thread.start()
print("流式识别已启动")
def stop(self):
"""停止流式识别"""
self.processing = False
if self.processing_thread:
self.processing_thread.join()
print("流式识别已停止")
def get_results(self, block=True, timeout=None):
"""获取识别结果"""
results = []
while not self.result_queue.empty():
try:
result = self.result_queue.get(block=block, timeout=timeout)
results.append(result)
except queue.Empty:
break
return results
def clear_buffer(self):
"""清空缓冲区"""
with self.buffer_lock:
self.audio_buffer = np.array([], dtype=np.float32)
print("缓冲区已清空")
# 使用示例
def stream_asr_example():
"""流式识别示例"""
import pyaudio
import wave
# 创建流式识别器
asr = StreamASR(chunk_duration=2.0, overlap=0.5)
# 音频录制参数
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = 10
p = pyaudio.PyAudio()
print("开始录制音频...")
# 打开音频流
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
# 开始流式识别
asr.start()
print(f"录制 {RECORD_SECONDS} 秒音频...")
# 录制并处理音频
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
# 读取音频数据
data = stream.read(CHUNK)
audio_chunk = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
# 添加到识别器
asr.add_audio(audio_chunk)
# 获取并显示结果
results = asr.get_results(block=False)
for result in results:
print(f"实时识别: {result['text']}")
# 停止
stream.stop_stream()
stream.close()
p.terminate()
# 等待处理完成
time.sleep(1)
asr.stop()
# 获取所有结果
all_results = asr.get_results()
print(f"\n总共识别到 {len(all_results)} 个片段")
# 合并结果
full_text = " ".join([r["text"] for r in all_results])
print(f"完整文本: {full_text}")
if __name__ == "__main__":
# 注意:这个示例需要pyaudio库
# 安装: pip install pyaudio
try:
stream_asr_example()
except ImportError:
print("需要安装pyaudio: pip install pyaudio")
except Exception as e:
print(f"错误: {e}")
5.3 性能优化技巧
Qwen3-ASR-0.6B已经相当高效,但我们还可以进一步优化:
技巧1:使用半精度推理 如果使用GPU,启用半精度可以显著减少内存使用并提高速度:
# 在模型加载时指定半精度
model = AutoModelForSpeechSeq2Seq.from_pretrained(
"Qwen/Qwen3-ASR-0.6B",
torch_dtype=torch.float16, # 半精度
low_cpu_mem_usage=True,
use_safetensors=True
).to("cuda")
技巧2:批处理推理 如果有多个音频文件需要处理,使用批处理可以提高效率:
def batch_transcribe(audio_files, batch_size=4):
"""批量转录多个音频文件"""
# 加载所有音频
audios = []
for file in audio_files:
audio, sr = librosa.load(file, sr=16000, mono=True)
audios.append(audio)
# 分批处理
results = []
for i in range(0, len(audios), batch_size):
batch = audios[i:i+batch_size]
# 预处理
inputs = processor(
batch,
sampling_rate=16000,
return_tensors="pt",
padding=True # 自动填充
).to(device)
# 批量识别
with torch.no_grad():
generated_ids = model.generate(
inputs["input_features"],
max_new_tokens=256
)
# 解码
batch_results = processor.batch_decode(
generated_ids,
skip_special_tokens=True
)
results.extend(batch_results)
return results
技巧3:缓存模型 如果频繁使用,可以将模型缓存到内存中:
import hashlib
import pickle
import os
class CachedASR:
def __init__(self, model_name="Qwen/Qwen3-ASR-0.6B", cache_dir="./cache"):
self.model_name = model_name
self.cache_dir = cache_dir
self.cache = {}
os.makedirs(cache_dir, exist_ok=True)
def get_cache_key(self, audio_data, language):
"""生成缓存键"""
# 使用音频数据的MD5和语言作为键
audio_hash = hashlib.md5(audio_data.tobytes()).hexdigest()
return f"{audio_hash}_{language}"
def transcribe(self, audio_data, language="自动检测", use_cache=True):
"""带缓存的转录"""
if use_cache:
cache_key = self.get_cache_key(audio_data, language)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
# 检查缓存
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
return pickle.load(f)
# 实际识别
result = self._transcribe_impl(audio_data, language)
# 保存到缓存
if use_cache:
cache_key = self.get_cache_key(audio_data, language)
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")
with open(cache_file, 'wb') as f:
pickle.dump(result, f)
return result
def _transcribe_impl(self, audio_data, language):
"""实际识别实现"""
# 这里放实际的识别代码
pass
6. 常见问题与解决方案
6.1 安装问题
问题1:PyTorch安装失败
错误:Could not find a version that satisfies the requirement torch
解决方案: 使用官方安装命令,指定正确的版本:
# 对于CPU版本
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
# 对于CUDA 11.8
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# 对于CUDA 12.1
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
问题2:内存不足
错误:CUDA out of memory
解决方案:
- 使用CPU版本:
model.to("cpu") - 使用半精度:
torch_dtype=torch.float16 - 减少批处理大小
- 使用
low_cpu_mem_usage=True参数
6.2 运行时问题
问题3:音频格式不支持
错误:libsndfile failed to open file
解决方案: 使用我们的load_audio_file函数,它支持多种格式:
from audio_utils import load_audio_file
# 自动处理各种格式
audio, sr = load_audio_file("your_audio.mp3")
问题4:识别结果不准确
现象:中文识别成英文,或者全是乱码
解决方案:
- 确保音频质量:清晰的语音,背景噪音小
- 指定正确的语言:
# 明确指定语言
generate_kwargs = {
"input_features": inputs["input_features"],
"max_new_tokens": 256,
"forced_decoder_ids": processor.get_decoder_prompt_ids(
language="chinese", # 指定中文
task="transcribe"
)
}
- 调整音频参数:
# 预处理时调整参数
inputs = processor(
audio,
sampling_rate=16000,
return_tensors="pt",
padding="longest", # 对长音频更好
truncation=True, # 截断过长的音频
max_length=480000 # 最大30秒
)
6.3 性能问题
问题5:识别速度慢
现象:处理1分钟音频需要30秒以上
解决方案:
- 使用GPU加速
- 启用半精度推理
- 使用流式识别,分块处理
- 调整生成参数:
# 使用贪婪解码,速度更快但可能质量稍低
generated_ids = model.generate(
inputs["input_features"],
max_new_tokens=128, # 减少最大token数
num_beams=1, # 不使用beam search
do_sample=False, # 贪婪解码
temperature=1.0, # 默认温度
)
问题6:内存占用过高
现象:处理长音频时内存爆满
解决方案:
- 分块处理长音频:
def transcribe_long_audio(audio_path, chunk_duration=30):
"""分块处理长音频"""
audio, sr = librosa.load(audio_path, sr=16000, mono=True)
total_duration = len(audio) / sr
results = []
chunk_samples = chunk_duration * sr
for i in range(0, len(audio), chunk_samples):
chunk = audio[i:i+chunk_samples]
if len(chunk) < sr * 0.5: # 小于0.5秒跳过
continue
# 处理每个chunk
text = transcribe_chunk(chunk)
results.append(text)
# 清理内存
torch.cuda.empty_cache() if torch.cuda.is_available() else None
return " ".join(results)
- 使用内存映射:
# 使用内存映射加载大模型
model = AutoModelForSpeechSeq2Seq.from_pretrained(
"Qwen/Qwen3-ASR-0.6B",
torch_dtype=torch.float16,
low_cpu_mem_usage=True,
device_map="auto", # 自动分配设备
offload_folder="./offload" # 溢出到磁盘
)
7. 实际应用案例
7.1 会议记录系统
让我们构建一个完整的会议记录系统,将语音实时转写成文字,并保存为会议纪要。
创建meeting_minutes.py:
import os
import json
from datetime import datetime
import gradio as gr
from typing import List, Dict
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
class MeetingMinutesSystem:
def __init__(self):
"""会议记录系统"""
self.model = None
self.processor = None
self.device = None
self.current_meeting = None
self.transcriptions = []
def initialize(self):
"""初始化系统"""
if self.model is not None:
return "系统已初始化"
try:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if self.device == "cuda" else torch.float32
print("加载会议记录模型...")
self.processor = AutoProcessor.from_pretrained("Qwen/Qwen3-ASR-0.6B")
self.model = AutoModelForSpeechSeq2Seq.from_pretrained(
"Qwen/Qwen3-ASR-0.6B",
torch_dtype=torch_dtype,
low_cpu_mem_usage=True
).to(self.device)
# 创建会议记录目录
os.makedirs("meetings", exist_ok=True)
return "✓ 会议记录系统初始化完成"
except Exception as e:
return f"✗ 初始化失败: {str(e)}"
def start_meeting(self, meeting_title: str, participants: str):
"""开始新的会议"""
if self.model is None:
return "错误:请先初始化系统", None
# 创建会议记录
self.current_meeting = {
"title": meeting_title,
"participants": participants.split(","),
"start_time": datetime.now().isoformat(),
"transcriptions": []
}
self.transcriptions = []
# 生成会议ID
meeting_id = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"meetings/meeting_{meeting_id}.json"
return f"会议 '{meeting_title}' 已开始", filename
def transcribe_audio(self, audio_input, speaker: str = ""):
"""转录会议音频"""
if self.model is None or self.current_meeting is None:
return "错误:请先初始化系统并开始会议", ""
try:
# 获取音频数据
sample_rate, audio_data = audio_input
# 预处理
inputs = self.processor(
audio_data,
sampling_rate=sample_rate,
return_tensors="pt"
).to(self.device)
# 识别
with torch.no_grad():
generated_ids = self.model.generate(
inputs["input_features"],
max_new_tokens=256
)
# 解码
text = self.processor.batch_decode(
generated_ids,
skip_special_tokens=True
)[0]
# 添加说话人信息
if speaker:
text = f"{speaker}: {text}"
# 记录时间戳
timestamp = datetime.now().strftime("%H:%M:%S")
entry = {
"time": timestamp,
"speaker": speaker,
"text": text
}
self.transcriptions.append(entry)
self.current_meeting["transcriptions"].append(entry)
# 实时保存
self.save_meeting()
return f"[{timestamp}] {text}", text
except Exception as e:
return f"识别失败: {str(e)}", ""
def save_meeting(self):
"""保存会议记录"""
if self.current_meeting is None:
return
# 更新结束时间
self.current_meeting["end_time"] = datetime.now().isoformat()
self.current_meeting["duration"] = len(self.transcriptions)
# 保存为JSON
meeting_id = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"meetings/meeting_{meeting_id}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.current_meeting, f, ensure_ascii=False, indent=2)
# 同时保存为文本格式
txt_filename = f"meetings/meeting_{meeting_id}.txt"
with open(txt_filename, 'w', encoding='utf-8') as f:
f.write(f"会议标题: {self.current_meeting['title']}\n")
f.write(f"参会人员: {', '.join(self.current_meeting['participants'])}\n")
f.write(f"开始时间: {self.current_meeting['start_time']}\n")
f.write(f"结束时间: {self.current_meeting['end_time']}\n")
f.write("=" * 50 + "\n\n")
for entry in self.transcriptions:
f.write(f"[{entry['time']}] {entry['speaker']}: {entry['text']}\n")
return filename, txt_filename
def generate_summary(self):
"""生成会议摘要"""
if not self.transcriptions:
return "暂无会议内容"
# 简单的摘要生成:提取关键信息
all_text = " ".join([entry["text"] for entry in self.transcriptions])
# 这里可以集成文本摘要模型
# 暂时使用简单规则
summary = f"会议摘要:\n"
summary += f"• 会议时长: {len(self.transcriptions)} 条记录\n"
summary += f"• 主要内容: {all_text[:200]}...\n"
summary += f"• 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return summary
def create_interface(self):
"""创建会议记录界面"""
with gr.Blocks(title="智能会议记录系统", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 📝 智能会议记录系统")
gr.Markdown("基于Qwen3-ASR-0.6B的实时会议转录与纪要生成")
with gr.Row():
with gr.Column(scale=1):
# 系统初始化
gr.Markdown("## 1. 系统初始化")
init_btn = gr.Button("初始化系统", variant="primary")
init_status = gr.Textbox(label="初始化状态", interactive=False)
# 会议设置
gr.Markdown("## 2. 会议设置")
meeting_title = gr.Textbox(label="会议标题", placeholder="例如:项目周会")
participants = gr.Textbox(
label="参会人员",
placeholder="用逗号分隔,例如:张三,李四,王五"
)
start_btn = gr.Button("开始会议", variant="primary")
meeting_status = gr.Textbox(label="会议状态", interactive=False)
# 说话人标识
gr.Markdown("## 3. 说话人标识")
speaker = gr.Dropdown(
choices=["主持人", "发言人1", "发言人2", "发言人3", "其他"],
value="主持人",
label="当前说话人"
)
# 会议控制
gr.Markdown("## 4. 会议控制")
save_btn = gr.Button("保存会议", variant="secondary")
summary_btn = gr.Button("生成摘要", variant="secondary")
with gr.Column(scale=2):
# 音频输入
gr.Markdown("## 5. 音频输入")
audio_input = gr.Audio(
sources=["microphone"],
type="numpy",
label="录制会议语音",
interactive=True
)
# 实时转录
gr.Markdown("## 6. 实时转录")
transcribe_btn = gr.Button("转录当前语音", variant="primary", size="lg")
with gr.Row():
realtime_output = gr.Textbox(
label="带时间戳的记录",
interactive=False,
lines=4
)
raw_output = gr.Textbox(
label="原始文本",
interactive=True,
lines=4
)
# 会议记录显示
gr.Markdown("## 7. 会议记录")
meeting_log = gr.Textbox(
label="完整会议记录",
interactive=False,
lines=10,
value=""
)
# 会议摘要
gr.Markdown("## 8. 会议摘要")
meeting_summary = gr.Textbox(
label="会议摘要",
interactive=False,
lines=5
)
# 绑定事件
init_btn.click(
fn=self.initialize,
outputs=init_status
)
def start_new_meeting(title, participants):
return self.start_meeting(title, participants)
start_btn.click(
fn=start_new_meeting,
inputs=[meeting_title, participants],
outputs=[meeting_status, meeting_status]
)
def transcribe_with_speaker(audio, spk):
return self.transcribe_audio(audio, spk)
transcribe_btn.click(
fn=transcribe_with_speaker,
inputs=[audio_input, speaker],
outputs=[realtime_output, raw_output]
).then(
fn=lambda: "\n".join([f"[{t['time']}] {t['speaker']}: {t['text']}"
for t in self.transcriptions[-10:]]),
outputs=meeting_log
)
save_btn.click(
fn=self.save_meeting,
outputs=[gr.Textbox(visible=False), gr.Textbox(visible=False)]
).then(
fn=lambda: "会议已保存到 meetings/ 目录",
outputs=meeting_status
)
summary_btn.click(
fn=self.generate_summary,
outputs=meeting_summary
)
return demo
def main():
"""主函数"""
system = MeetingMinutesSystem()
demo = system.create_interface()
print("启动智能会议记录系统...")
print("访问 http://localhost:7860 使用系统")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)
if __name__ == "__main__":
main()
这个会议记录系统提供了完整的功能:
- 会议开始/结束管理
- 参会人员记录
- 实时语音转录
- 说话人标识
- 自动保存会议记录
- 会议摘要生成
7.2 语音指令系统
另一个常见应用是语音指令系统。让我们创建一个简单的智能家居语音控制示例:
import re
from typing import Dict, List, Optional
class VoiceCommandSystem:
def __init__(self):
"""语音指令系统"""
self.commands = {
"开灯": self.turn_on_light,
"关灯": self.turn_off_light,
"打开空调": self.turn_on_ac,
"关闭空调": self.turn_off_ac,
"调高温度": self.increase_temp,
"调低温度": self.decrease_temp,
"打开窗帘": self.open_curtain,
"关闭窗帘": self.close_curtain,
"播放音乐": self.play_music,
"停止音乐": self.stop_music,
"今天天气": self.get_weather,
"现在几点": self.get_time,
}
self.device_status = {
"light": False,
"ac": False,
"temperature": 24,
"curtain": False,
"music": False
}
def parse_command(self, text: str) -> Optional[str]:
"""解析语音指令"""
text = text.lower().strip()
# 模糊匹配指令
for command, func in self.commands.items():
if command in text:
return func()
# 处理带参数的指令
if "温度调到" in text:
match = re.search(r"温度调到(\d+)", text)
if match:
temp = int(match.group(1))
return self.set_temperature(temp)
return "抱歉,我没有听懂这个指令"
def turn_on_light(self) -> str:
self.device_status["light"] = True
return "好的,已打开灯光"
def turn_off_light(self) -> str:
self.更多推荐

所有评论(0)