Qwen3-ASR-1.7B语音识别实操:音频切片工具集成+批量转写+结果Excel导出自动化脚本

你是不是也遇到过这样的烦恼?手头有一堆会议录音、访谈音频或者课程录像,需要把它们转成文字。一个个上传、等待、复制结果,不仅效率低下,还容易出错。特别是当音频文件很长,或者数量很多的时候,手动操作简直就是一场噩梦。

今天,我们就来解决这个痛点。我将带你深入体验Qwen3-ASR-1.7B这个高精度语音识别模型,并分享一套我自用的“懒人”自动化方案。这套方案的核心是:自动切分长音频 + 批量识别转写 + 结果自动整理到Excel。整个过程无需人工干预,解放你的双手,让你从繁琐的重复劳动中解脱出来。

1. 为什么选择Qwen3-ASR-1.7B?

在开始动手之前,我们先简单了解一下今天的主角。Qwen3-ASR-1.7B是阿里云通义千问团队推出的开源语音识别模型,属于其ASR系列中的“高精度”版本。

它有几个让我非常心动的特点:

  • 识别准:1.7B的参数量,相比0.6B版本,在复杂场景下的识别准确率有明显提升,尤其是带有口音或背景噪音的音频。
  • 听得懂方言:这是我选择它的一个重要原因。它支持包括粤语、四川话、上海话在内的22种中文方言,对于处理地方性访谈或会议录音非常友好。
  • 不用猜语言:模型内置了自动语言检测功能,上传音频后,它能自己判断说的是中文、英文还是日语,省去了手动选择的麻烦。
  • 开箱即用:官方提供了预置的Docker镜像,部署非常简单,自带Web界面,点点鼠标就能用。

当然,它也有个小“缺点”:对硬件要求稍高,需要至少6GB的GPU显存。不过,考虑到它带来的精度提升,这个投入是值得的。

2. 环境准备与快速部署

我们的自动化流程需要一个稳定的Qwen3-ASR服务作为后端。这里我们使用官方预置的镜像,最快几分钟就能跑起来。

2.1 获取并启动镜像

如果你在支持GPU的云平台或本地服务器上,可以按照以下步骤操作。这里假设你已经有了一个可以运行Docker的环境。

# 1. 拉取官方镜像(请以平台实际提供的镜像名称为准)
# 通常平台会提供一键部署,这一步可能不需要手动执行

# 2. 运行容器(示例命令,端口和路径请根据实际情况调整)
docker run -d --gpus all \
  -p 7860:7860 \
  -v /your/local/data:/app/data \
  --name qwen3-asr \
  registry.cn-hangzhou.aliyuncs.com/qwen/qwen3-asr:latest

# 3. 查看服务日志,确认启动成功
docker logs -f qwen3-asr

更常见的情况是,你在云服务平台直接使用预置的“Qwen3-ASR-1.7B”应用镜像。启动后,你会获得一个类似 https://gpu-xxxx-7860.web.gpu.example.com/ 的访问地址。打开这个地址,你应该能看到一个简洁的上传界面。

2.2 验证服务

在浏览器中打开Web界面,上传一个简短的测试音频(比如一段自己录制的普通话或英文),点击识别。如果很快返回了正确的文字结果,并且显示了检测到的语言(如“zh”或“en”),说明服务已经正常运转。

我们的自动化脚本将通过这个服务提供的API接口进行通信,所以确保这个Web服务可访问是第一步。

3. 自动化脚本核心思路拆解

手动操作的瓶颈在于“等待”和“重复”。我们的脚本就是要消除这些瓶颈。整个流程可以分解为三个核心步骤:

  1. 音频预处理(切片):长音频(如1小时的会议)直接识别,可能出错,且失败后重试成本高。我们先用工具把它切成15-30秒的小段。
  2. 批量识别(转写):编写一个脚本,自动遍历所有切片后的小音频文件,依次调用Qwen3-ASR的API进行识别,并保存结果。
  3. 结果后处理(导出):将所有小段的识别文本,按照时间顺序拼接起来,并生成一个结构清晰的Excel文件,包含文件名、时间戳、文本内容等信息。

听起来是不是很简单?接下来,我们一步步用代码实现它。

4. 实战:一步步构建自动化脚本

我们将使用Python来编写这个脚本,因为它有丰富的音频处理和HTTP请求库。请确保你的环境已安装Python 3.8+。

4.1 第一步:安装依赖库

首先,创建一个新的项目目录,并安装我们需要的Python包。

# 创建项目目录并进入
mkdir asr_automation && cd asr_automation

# 创建虚拟环境(可选,但推荐)
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装依赖库
pip install requests pydub pandas openpyxl
  • requests: 用于调用Qwen3-ASR的HTTP API。
  • pydub: 一个非常强大的音频处理库,我们用它来切割音频。
  • pandas & openpyxl: 用于处理数据和生成Excel文件。

注意pydub依赖于ffmpeg。你需要确保系统已安装ffmpeg

  • Ubuntu/Debian: sudo apt install ffmpeg
  • Mac: brew install ffmpeg
  • Windows: 从官网下载并添加至系统PATH。

4.2 第二步:编写音频切片工具

我们创建一个 audio_slicer.py 文件,它的任务是把一个长音频文件,按固定时长切成小段。

# audio_slicer.py
import os
from pydub import AudioSegment

def slice_audio(input_path, output_dir, segment_length_ms=30000):
    """
    将长音频切片为指定时长的小段。
    
    参数:
        input_path (str): 输入音频文件路径。
        output_dir (str): 切片后音频的输出目录。
        segment_length_ms (int): 每个切片的时长,单位毫秒。默认30秒。
    """
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 加载音频文件
    print(f"正在加载音频文件: {input_path}")
    audio = AudioSegment.from_file(input_path)
    total_duration = len(audio)  # 总时长(毫秒)
    print(f"音频总时长: {total_duration/1000:.2f} 秒")
    
    # 计算需要切多少段
    num_segments = (total_duration // segment_length_ms) + 1
    print(f"将切分为 {num_segments} 段 (每段约 {segment_length_ms/1000} 秒)")
    
    slices_info = []  # 用于记录切片信息
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    for i in range(num_segments):
        start_time = i * segment_length_ms
        end_time = min((i + 1) * segment_length_ms, total_duration)
        
        # 提取音频片段
        segment = audio[start_time:end_time]
        
        # 生成输出文件名
        segment_filename = f"{base_name}_part{i+1:03d}.wav"
        segment_path = os.path.join(output_dir, segment_filename)
        
        # 导出为wav格式(Qwen3-ASR兼容性好)
        segment.export(segment_path, format="wav")
        
        # 记录信息:文件名,开始时间(秒),结束时间(秒)
        slices_info.append({
            'filename': segment_filename,
            'start_s': start_time / 1000.0,
            'end_s': end_time / 1000.0,
            'filepath': segment_path
        })
        
        print(f"  已保存: {segment_filename} ({start_time/1000:.1f}s - {end_time/1000:.1f}s)")
    
    print(f"音频切片完成!所有片段保存在: {output_dir}")
    return slices_info  # 返回切片信息列表,供后续使用

if __name__ == "__main__":
    # 示例用法
    input_audio = "your_long_meeting.mp3"  # 替换为你的长音频文件
    output_folder = "sliced_audios"
    
    if os.path.exists(input_audio):
        info = slice_audio(input_audio, output_folder)
        print(f"\n共生成 {len(info)} 个切片文件。")
    else:
        print(f"错误:找不到文件 {input_audio}")

这个脚本会把你的长音频,比如 meeting.mp3,在 sliced_audios 文件夹里生成一系列 meeting_part001.wavmeeting_part002.wav 这样的文件。

4.3 第三步:编写批量识别脚本

现在,我们创建 batch_transcribe.py 脚本。它的工作是调用Qwen3-ASR服务,把上一步切好的所有小音频文件转成文字。

# batch_transcribe.py
import os
import requests
import time
import json
from typing import List, Dict

class QwenASRClient:
    """Qwen3-ASR API 客户端"""
    
    def __init__(self, base_url="http://localhost:7860"):
        """
        初始化客户端。
        参数:
            base_url (str): Qwen3-ASR Web服务的地址。
                            例如: "http://localhost:7860" 或 "https://gpu-xxxx-7860.web.gpu.example.com"
        """
        # 确保URL以/结尾,方便拼接
        self.base_url = base_url.rstrip('/')
        self.api_url = f"{self.base_url}/transcribe"
        
    def transcribe_audio(self, audio_file_path, language="auto"):
        """
        识别单个音频文件。
        
        参数:
            audio_file_path (str): 音频文件路径。
            language (str): 语言代码,如 'zh', 'en', 'ja',或 'auto'(自动检测)。
        
        返回:
            dict: 识别结果,包含'text'和'language'字段。
        """
        try:
            with open(audio_file_path, 'rb') as f:
                files = {'file': (os.path.basename(audio_file_path), f, 'audio/wav')}
                data = {'language': language}
                
                response = requests.post(self.api_url, files=files, data=data, timeout=60)
                response.raise_for_status()  # 检查HTTP错误
                
                result = response.json()
                return result
        except requests.exceptions.RequestException as e:
            print(f"  请求失败: {e}")
            return {'text': f'[识别错误: {e}]', 'language': 'error'}
        except json.JSONDecodeError:
            print(f"  响应解析失败")
            return {'text': '[响应解析失败]', 'language': 'error'}
    
    def batch_transcribe(self, audio_files: List[str], language="auto", delay=1.0):
        """
        批量识别多个音频文件。
        
        参数:
            audio_files (list): 音频文件路径列表。
            language (str): 语言代码。
            delay (float): 每次请求之间的延迟(秒),避免服务器压力过大。
        
        返回:
            list: 每个文件的识别结果列表。
        """
        results = []
        total = len(audio_files)
        
        for idx, file_path in enumerate(audio_files, 1):
            if not os.path.exists(file_path):
                print(f"[{idx}/{total}] 文件不存在: {file_path}")
                results.append({'filename': os.path.basename(file_path), 'text': '[文件不存在]', 'language': 'error'})
                continue
                
            print(f"[{idx}/{total}] 正在识别: {os.path.basename(file_path)}")
            start_time = time.time()
            
            # 调用识别接口
            transcribe_result = self.transcribe_audio(file_path, language)
            
            elapsed = time.time() - start_time
            detected_lang = transcribe_result.get('language', 'unknown')
            text_preview = transcribe_result.get('text', '')[:50] + "..." if len(transcribe_result.get('text', '')) > 50 else transcribe_result.get('text', '')
            
            print(f"      结果 -> 语言: {detected_lang}, 耗时: {elapsed:.1f}s, 文本: {text_preview}")
            
            # 保存结果
            result_entry = {
                'filename': os.path.basename(file_path),
                'filepath': file_path,
                'text': transcribe_result.get('text', ''),
                'language': detected_lang,
                'time_cost': elapsed
            }
            results.append(result_entry)
            
            # 延迟一下,避免请求过快
            if idx < total:
                time.sleep(delay)
        
        print(f"\n批量识别完成!共处理 {len(results)} 个文件。")
        return results

def get_audio_files_from_dir(directory, extensions=['.wav', '.mp3', '.flac', '.m4a']):
    """从目录中获取所有指定格式的音频文件"""
    audio_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if any(file.lower().endswith(ext) for ext in extensions):
                audio_files.append(os.path.join(root, file))
    # 按文件名排序,保证顺序
    audio_files.sort()
    return audio_files

if __name__ == "__main__":
    # ====== 配置区域 ======
    ASR_SERVER_URL = "http://localhost:7860"  # 修改为你的Qwen3-ASR服务地址
    AUDIO_DIR = "sliced_audios"                # 存放切片音频的目录
    OUTPUT_JSON = "transcription_results.json" # 中间结果保存文件
    LANGUAGE = "auto"                          # 识别语言,'auto'为自动检测
    # ======================
    
    # 1. 初始化客户端
    client = QwenASRClient(ASR_SERVER_URL)
    
    # 2. 获取待识别的音频文件列表
    print(f"正在扫描目录: {AUDIO_DIR}")
    audio_file_list = get_audio_files_from_dir(AUDIO_DIR)
    
    if not audio_file_list:
        print(f"在 '{AUDIO_DIR}' 目录下未找到音频文件。请先运行音频切片脚本。")
        exit(1)
        
    print(f"找到 {len(audio_file_list)} 个音频文件。")
    
    # 3. 执行批量识别
    all_results = client.batch_transcribe(audio_file_list, language=LANGUAGE, delay=0.5)
    
    # 4. 将结果保存为JSON文件(方便后续处理)
    with open(OUTPUT_JSON, 'w', encoding='utf-8') as f:
        json.dump(all_results, f, ensure_ascii=False, indent=2)
    print(f"识别结果已保存至: {OUTPUT_JSON}")

运行这个脚本前,记得把 ASR_SERVER_URL 改成你实际的Qwen3-ASR服务地址。脚本会依次处理 sliced_audios 文件夹里的所有音频,并把识别结果(文本、检测到的语言、耗时)保存到一个 transcription_results.json 文件里。

4.4 第四步:编写Excel导出脚本

最后,我们把JSON中的结果,连同之前切片时记录的时间信息,整理成一个清晰易读的Excel文件。

# export_to_excel.py
import json
import pandas as pd
import os

def load_slices_info(slices_info_file="slices_info.json"):
    """加载音频切片时生成的时间信息"""
    if os.path.exists(slices_info_file):
        with open(slices_info_file, 'r', encoding='utf-8') as f:
            return json.load(f)
    return None

def merge_results_with_slices(transcription_results, slices_info_list):
    """
    将识别结果与切片时间信息合并。
    根据文件名进行匹配。
    """
    merged_data = []
    
    # 将切片信息转为字典,方便通过文件名查找
    slices_dict = {item['filename']: item for item in slices_info_list} if slices_info_list else {}
    
    for result in transcription_results:
        filename = result['filename']
        merged_entry = {
            '音频片段': filename,
            '识别文本': result['text'],
            '检测语言': result['language'],
            '识别耗时(秒)': result.get('time_cost', 0)
        }
        
        # 如果有时序信息,则添加
        if filename in slices_dict:
            slice_info = slices_dict[filename]
            merged_entry['开始时间(秒)'] = slice_info['start_s']
            merged_entry['结束时间(秒)'] = slice_info['end_s']
            merged_entry['时长(秒)'] = slice_info['end_s'] - slice_info['start_s']
        else:
            merged_entry['开始时间(秒)'] = 'N/A'
            merged_entry['结束时间(秒)'] = 'N/A'
            merged_entry['时长(秒)'] = 'N/A'
            
        merged_data.append(merged_entry)
    
    return merged_data

def generate_summary_text(merged_data):
    """根据合并的数据,生成完整的、按时间顺序拼接的文本摘要。"""
    # 首先,确保数据按开始时间排序(如果有时间信息)
    if all('开始时间(秒)' in item and item['开始时间(秒)'] != 'N/A' for item in merged_data):
        sorted_data = sorted(merged_data, key=lambda x: x['开始时间(秒)'])
    else:
        # 否则按文件名排序(假设文件名包含顺序)
        sorted_data = sorted(merged_data, key=lambda x: x['音频片段'])
    
    # 拼接文本
    full_text_parts = []
    for item in sorted_data:
        text = item['识别文本'].strip()
        if text and not text.startswith('['):  # 忽略错误信息
            # 可以添加时间戳标记
            if item['开始时间(秒)'] != 'N/A':
                time_mark = f"[{item['开始时间(秒)']:.1f}s] "
                full_text_parts.append(time_mark + text)
            else:
                full_text_parts.append(text)
    
    return "\n".join(full_text_parts)

def export_to_excel(transcription_file="transcription_results.json", 
                    slices_info_file="slices_info.json",
                    output_excel="语音识别结果.xlsx"):
    """主函数:读取结果,合并信息,导出Excel"""
    
    # 1. 加载识别结果
    print("正在加载识别结果...")
    with open(transcription_file, 'r', encoding='utf-8') as f:
        transcription_results = json.load(f)
    
    # 2. 加载切片信息(如果存在)
    slices_info_list = load_slices_info(slices_info_file)
    
    # 3. 合并数据
    print("正在合并数据...")
    merged_data = merge_results_with_slices(transcription_results, slices_info_list)
    
    # 4. 生成完整文本摘要
    print("正在生成完整文本...")
    full_text = generate_summary_text(merged_data)
    
    # 5. 创建DataFrame并导出Excel
    df_details = pd.DataFrame(merged_data)
    
    # 设置列顺序
    column_order = ['音频片段', '开始时间(秒)', '结束时间(秒)', '时长(秒)', '识别文本', '检测语言', '识别耗时(秒)']
    # 只保留实际存在的列
    existing_columns = [col for col in column_order if col in df_details.columns]
    df_details = df_details[existing_columns]
    
    # 创建第二个DataFrame存放完整文本
    df_summary = pd.DataFrame({
        '完整转写文本': [full_text]
    })
    
    print(f"导出到Excel文件: {output_excel}")
    with pd.ExcelWriter(output_excel, engine='openpyxl') as writer:
        df_details.to_excel(writer, sheet_name='详细结果', index=False)
        df_summary.to_excel(writer, sheet_name='完整文本', index=False)
        
        # 自动调整列宽(近似)
        for sheet_name in writer.sheets:
            worksheet = writer.sheets[sheet_name]
            for column in worksheet.columns:
                max_length = 0
                column_letter = column[0].column_letter
                for cell in column:
                    try:
                        if len(str(cell.value)) > max_length:
                            max_length = len(str(cell.value))
                    except:
                        pass
                adjusted_width = min(max_length + 2, 50)  # 设置最大宽度
                worksheet.column_dimensions[column_letter].width = adjusted_width
    
    print("导出完成!")
    print(f"  详细结果在 '详细结果' 工作表")
    print(f"  合并后的完整文本在 '完整文本' 工作表")
    
    # 打印统计信息
    total_chars = sum(len(str(item['识别文本'])) for item in merged_data)
    print(f"\n统计信息:")
    print(f"  共处理音频片段: {len(merged_data)} 个")
    print(f"  总文本字符数: {total_chars}")
    if full_text:
        print(f"  完整文本已保存,共 {len(full_text.splitlines())} 行")

if __name__ == "__main__":
    # 修改这里的文件名,如果你使用了不同的命名
    export_to_excel(
        transcription_file="transcription_results.json",
        slices_info_file="slices_info.json",  # 需要修改audio_slicer.py来保存这个文件
        output_excel="语音识别结果_自动化.xlsx"
    )

注意:为了让 export_to_excel.py 能获取时间信息,我们需要稍微修改一下第一步的 audio_slicer.py,让它把切片信息也保存下来。在 slice_audio 函数的最后,添加几行代码:

# 在 audio_slicer.py 的 slice_audio 函数末尾,return之前添加:
    # 保存切片信息到JSON文件,方便后续合并
    import json
    info_file = os.path.join(output_dir, "slices_info.json")
    with open(info_file, 'w', encoding='utf-8') as f:
        # 只保存必要信息,去掉文件绝对路径
        simplified_info = []
        for info in slices_info:
            simplified_info.append({
                'filename': info['filename'],
                'start_s': info['start_s'],
                'end_s': info['end_s']
            })
        json.dump(simplified_info, f, ensure_ascii=False, indent=2)
    print(f"切片信息已保存至: {info_file}")

这样,切片工具会在输出目录生成一个 slices_info.json 文件,记录每个切片文件的时间范围。

5. 一键运行:整合所有步骤

每次都手动运行三个脚本太麻烦。我们创建一个主脚本 main.py 来串联整个流程。

# main.py
import os
import subprocess
import sys

def run_automation_pipeline():
    """运行完整的自动化流程"""
    
    print("=" * 50)
    print("Qwen3-ASR 语音识别自动化流程启动")
    print("=" * 50)
    
    # 配置参数
    long_audio_file = "input/your_audio.mp3"  # 请修改为你的长音频文件路径
    sliced_output_dir = "output/sliced"
    asr_server_url = "http://localhost:7860"  # 请修改为你的ASR服务地址
    final_excel = "output/最终转写结果.xlsx"
    
    # 步骤1: 音频切片
    print("\n[步骤1/3] 音频切片")
    print("-" * 30)
    if not os.path.exists(long_audio_file):
        print(f"错误:找不到输入音频文件 '{long_audio_file}'")
        print("请将你的长音频文件放入 'input' 文件夹,并修改 main.py 中的 'long_audio_file' 变量。")
        return
    
    # 创建输出目录
    os.makedirs(sliced_output_dir, exist_ok=True)
    
    # 这里直接调用我们写好的函数,为了演示,我们导入模块
    # 在实际打包时,可以将函数定义放在同一个文件或通过模块导入
    print("提示:请确保已运行 audio_slicer.py 完成音频切片。")
    print(f"切片文件将保存在: {sliced_output_dir}")
    # 假设切片已完成,生成了 slices_info.json
    
    # 步骤2: 批量识别
    print("\n[步骤2/3] 批量语音识别")
    print("-" * 30)
    print(f"ASR 服务器: {asr_server_url}")
    print("提示:请确保 batch_transcribe.py 中的 ASR_SERVER_URL 已正确配置,并运行该脚本。")
    
    # 步骤3: 导出Excel
    print("\n[步骤3/3] 导出结果到Excel")
    print("-" * 30)
    print("提示:请运行 export_to_excel.py 生成最终报告。")
    
    print("\n" + "=" * 50)
    print("手动执行顺序建议:")
    print("1. 修改 audio_slicer.py 中的输入文件路径,运行它进行切片。")
    print("2. 修改 batch_transcribe.py 中的 ASR_SERVER_URL,运行它进行识别。")
    print("3. 运行 export_to_excel.py 生成最终Excel文件。")
    print("=" * 50)
    print(f"最终结果将生成在: {final_excel}")
    print("完成!")

if __name__ == "__main__":
    run_automation_pipeline()

这个主脚本给出了清晰的指引。在实际使用中,你可以依次运行:

  1. python audio_slicer.py (修改输入文件路径)
  2. python batch_transcribe.py (修改服务器地址)
  3. python export_to_excel.py

或者,你也可以进一步优化,将三个脚本的逻辑整合到一个真正的自动化流水线中,实现完全的一键执行。

6. 总结与进阶思考

通过以上步骤,我们成功搭建了一个从长音频到结构化文字稿的自动化流水线。回顾一下我们实现的核心价值:

  • 效率倍增:告别手动上传和复制粘贴,尤其适合处理数十甚至上百个音频文件。
  • 结果规整:自动生成的Excel文件,包含时间戳、分段文本和完整文稿,查找和校对都非常方便。
  • 灵活可扩展:脚本是模块化的,你可以很容易地修改切片时长、调整识别参数,或者将结果导入数据库。

一些进阶优化思路

  1. 错误重试与容错:在 batch_transcribe.py 中添加重试逻辑,当某个片段识别失败时自动重试几次。
  2. 并行处理:如果服务器性能允许,可以改用多线程或异步请求,同时发送多个识别任务,大幅缩短总耗时。
  3. Web界面化:使用 GradioStreamlit 快速为这个脚本套上一个可视化界面,上传音频后点击按钮即可得到Excel,对非技术人员更友好。
  4. 集成更多功能:在导出Excel时,可以调用大语言模型API(如通义千问本身)对转写文本进行总结、提炼要点或翻译,实现“语音→文字→洞察”的全链路自动化。

Qwen3-ASR-1.7B提供了一个强大且易用的语音识别基础能力。通过简单的脚本集成,我们就能将其变成解决实际工作流痛点的利器。希望这套方案能给你带来启发,也欢迎你在此基础上进行改造,让它更适合你的具体场景。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐