GLM-ASR-Nano-2512部署教程:Airflow调度定时语音转写任务+结果入库MySQL

1. 项目概述与环境准备

GLM-ASR-Nano-2512是一个强大的开源语音识别模型,拥有15亿参数。这个模型专门为处理现实世界中的复杂语音场景设计,在多个基准测试中性能超越了OpenAI Whisper V3,同时保持了相对较小的模型体积。

在实际应用中,我们经常需要定时处理大量的语音文件,比如每日的会议录音、客服通话记录或者播客内容。手动一个个上传处理显然不现实,这时候就需要一个自动化的解决方案。

本教程将带你一步步搭建一个完整的语音识别流水线:

  • 使用Docker部署GLM-ASR-Nano-2512语音识别服务
  • 配置Airflow来定时调度识别任务
  • 将识别结果自动保存到MySQL数据库
  • 实现完全自动化的语音转文字流程

环境要求

  • NVIDIA GPU(推荐RTX 4090/3090)或CPU
  • 16GB以上内存
  • 10GB以上可用存储空间
  • CUDA 12.4+(如果使用GPU)

2. 快速部署语音识别服务

2.1 Docker方式部署(推荐)

首先创建项目目录并准备必要的文件:

mkdir glm-asr-pipeline && cd glm-asr-pipeline
mkdir -p scripts config data/audio data/output

创建Dockerfile文件:

FROM nvidia/cuda:12.4.0-runtime-ubuntu22.04

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    git \
    git-lfs \
    ffmpeg \
    && rm -rf /var/lib/apt/lists/*

# 安装Python依赖
RUN pip3 install --no-cache-dir \
    torch \
    torchaudio \
    transformers \
    gradio \
    pandas \
    mysql-connector-python

# 创建应用目录
WORKDIR /app

# 复制项目文件
COPY . /app/

# 初始化git lfs并下载模型
RUN git lfs install && git lfs pull

# 暴露端口
EXPOSE 7860

# 启动服务
CMD ["python3", "app.py"]

创建docker-compose.yml来简化部署:

version: '3.8'

services:
  glm-asr:
    build: .
    ports:
      - "7860:7860"
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: all
              capabilities: [gpu]
    volumes:
      - ./data:/app/data
    restart: unless-stopped

构建并启动服务:

# 克隆项目(假设已有项目文件)
git clone <your-glm-asr-repo>
cd <your-glm-asr-repo>

# 构建和运行
docker-compose up -d --build

# 查看日志
docker-compose logs -f

2.2 验证服务运行

服务启动后,可以通过两种方式访问:

  1. Web界面:打开浏览器访问 http://localhost:7860
  2. API接口:通过 http://localhost:7860/gradio_api/ 进行程序调用

测试API是否正常工作:

import requests

def test_asr_api(audio_file_path):
    """测试语音识别API"""
    url = "http://localhost:7860/gradio_api/"
    
    with open(audio_file_path, 'rb') as f:
        files = {'audio': f}
        response = requests.post(url, files=files)
    
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": f"请求失败: {response.status_code}"}

# 测试调用
result = test_asr_api("test_audio.wav")
print("识别结果:", result)

3. 配置数据库和批处理脚本

3.1 创建MySQL数据库

首先创建存储识别结果的数据库表:

CREATE DATABASE voice_transcription;

USE voice_transcription;

CREATE TABLE transcriptions (
    id INT AUTO_INCREMENT PRIMARY KEY,
    audio_file_path VARCHAR(500) NOT NULL,
    transcription_text TEXT,
    confidence_score FLOAT,
    processing_time FLOAT,
    language_detected VARCHAR(10),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
    error_message TEXT
);

CREATE INDEX idx_status ON transcriptions(status);
CREATE INDEX idx_created_at ON transcriptions(created_at);

3.2 编写批处理脚本

创建批处理脚本 scripts/batch_process.py

import os
import json
import time
import mysql.connector
import requests
from pathlib import Path

class AudioBatchProcessor:
    def __init__(self, api_url="http://localhost:7860/gradio_api/"):
        self.api_url = api_url
        self.db_config = {
            'host': 'localhost',
            'user': 'your_username',
            'password': 'your_password',
            'database': 'voice_transcription'
        }
    
    def get_db_connection(self):
        """获取数据库连接"""
        return mysql.connector.connect(**self.db_config)
    
    def process_audio_file(self, audio_path):
        """处理单个音频文件"""
        try:
            start_time = time.time()
            
            with open(audio_path, 'rb') as f:
                files = {'audio': f}
                response = requests.post(self.api_url, files=files, timeout=300)
            
            processing_time = time.time() - start_time
            
            if response.status_code == 200:
                result = response.json()
                return {
                    'success': True,
                    'text': result.get('text', ''),
                    'processing_time': processing_time,
                    'language': result.get('language', ''),
                    'confidence': result.get('confidence', 0.0)
                }
            else:
                return {
                    'success': False,
                    'error': f"API请求失败: {response.status_code}"
                }
                
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def process_directory(self, audio_dir, file_extensions=['.wav', '.mp3', '.flac', '.ogg']):
        """处理目录中的所有音频文件"""
        audio_dir = Path(audio_dir)
        processed_files = []
        
        # 获取所有音频文件
        audio_files = []
        for ext in file_extensions:
            audio_files.extend(audio_dir.glob(f'*{ext}'))
        
        print(f"找到 {len(audio_files)} 个音频文件待处理")
        
        for audio_file in audio_files:
            print(f"处理文件: {audio_file.name}")
            
            # 检查是否已处理过
            if self.is_file_processed(audio_file):
                print(f"文件已处理过: {audio_file.name}")
                continue
            
            # 开始处理
            self.update_db_status(audio_file, 'processing')
            
            # 调用识别API
            result = self.process_audio_file(audio_file)
            
            # 保存结果到数据库
            if result['success']:
                self.save_success_result(audio_file, result)
                print(f"✓ 完成: {audio_file.name}")
            else:
                self.save_error_result(audio_file, result['error'])
                print(f"✗ 失败: {audio_file.name} - {result['error']}")
            
            processed_files.append({
                'file': audio_file.name,
                'success': result['success']
            })
            
            # 避免请求过于频繁
            time.sleep(1)
        
        return processed_files
    
    def is_file_processed(self, audio_file):
        """检查文件是否已处理过"""
        conn = self.get_db_connection()
        cursor = conn.cursor()
        
        query = "SELECT id FROM transcriptions WHERE audio_file_path = %s AND status = 'completed'"
        cursor.execute(query, (str(audio_file),))
        result = cursor.fetchone()
        
        cursor.close()
        conn.close()
        
        return result is not None
    
    def update_db_status(self, audio_file, status, error_message=None):
        """更新数据库状态"""
        conn = self.get_db_connection()
        cursor = conn.cursor()
        
        # 检查记录是否存在
        check_query = "SELECT id FROM transcriptions WHERE audio_file_path = %s"
        cursor.execute(check_query, (str(audio_file),))
        existing_record = cursor.fetchone()
        
        if existing_record:
            # 更新现有记录
            update_query = """
                UPDATE transcriptions 
                SET status = %s, error_message = %s, updated_at = NOW() 
                WHERE audio_file_path = %s
            """
            cursor.execute(update_query, (status, error_message, str(audio_file)))
        else:
            # 插入新记录
            insert_query = """
                INSERT INTO transcriptions 
                (audio_file_path, status, error_message) 
                VALUES (%s, %s, %s)
            """
            cursor.execute(insert_query, (str(audio_file), status, error_message))
        
        conn.commit()
        cursor.close()
        conn.close()
    
    def save_success_result(self, audio_file, result):
        """保存成功结果到数据库"""
        conn = self.get_db_connection()
        cursor = conn.cursor()
        
        update_query = """
            UPDATE transcriptions 
            SET transcription_text = %s, 
                confidence_score = %s,
                processing_time = %s,
                language_detected = %s,
                status = 'completed',
                error_message = NULL,
                updated_at = NOW()
            WHERE audio_file_path = %s
        """
        
        cursor.execute(update_query, (
            result['text'],
            result.get('confidence', 0.0),
            result['processing_time'],
            result.get('language', ''),
            str(audio_file)
        ))
        
        conn.commit()
        cursor.close()
        conn.close()
    
    def save_error_result(self, audio_file, error_message):
        """保存错误信息到数据库"""
        self.update_db_status(audio_file, 'failed', error_message)

# 使用示例
if __name__ == "__main__":
    processor = AudioBatchProcessor()
    
    # 处理单个目录
    results = processor.process_directory('./data/audio')
    
    print(f"处理完成! 成功: {sum(1 for r in results if r['success'])},"
          f" 失败: {sum(1 for r in results if not r['success'])}")

4. 配置Airflow定时调度

4.1 安装和配置Airflow

首先安装Apache Airflow:

pip install apache-airflow mysql-connector-python requests

# 初始化Airflow数据库
airflow db init

# 创建管理员用户
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin

配置Airflow连接信息,编辑 airflow.cfg 或通过Web界面添加:

# 启动Airflow Web服务器
airflow webserver --port 8080

# 启动调度器
airflow scheduler

4.2 创建Airflow DAG

创建DAG文件 dags/voice_transcription_dag.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
import requests
import os
from pathlib import Path

default_args = {
    'owner': 'audio_team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 1, 1),
}

def check_asr_service():
    """检查ASR服务是否正常运行"""
    try:
        response = requests.get('http://localhost:7860/', timeout=30)
        if response.status_code == 200:
            print("ASR服务正常运行")
            return True
        else:
            raise Exception(f"ASR服务异常: HTTP {response.status_code}")
    except Exception as e:
        raise Exception(f"ASR服务检查失败: {str(e)}")

def process_audio_files():
    """处理音频文件的主要函数"""
    import sys
    sys.path.append('/opt/airflow/scripts')
    
    from batch_process import AudioBatchProcessor
    
    processor = AudioBatchProcessor()
    
    # 处理音频目录
    audio_dir = '/app/data/audio'  # Docker容器内的路径
    results = processor.process_directory(audio_dir)
    
    # 记录处理结果
    success_count = sum(1 for r in results if r['success'])
    failure_count = sum(1 for r in results if not r['success'])
    
    print(f"处理完成! 成功: {success_count}, 失败: {failure_count}")
    
    return {
        'success_count': success_count,
        'failure_count': failure_count,
        'total_files': len(results)
    }

def cleanup_processed_files():
    """清理已处理的文件(可选)"""
    try:
        # 这里可以添加文件清理逻辑
        # 例如:将已处理的文件移动到归档目录
        print("清理功能待实现")
        return "清理完成"
    except Exception as e:
        return f"清理失败: {str(e)}"

def send_summary_email(**context):
    """发送处理结果摘要邮件"""
    try:
        task_instance = context['ti']
        process_result = task_instance.xcom_pull(task_ids='process_audio_files')
        
        if process_result:
            subject = f"语音转写任务完成 - {datetime.now().strftime('%Y-%m-%d')}"
            body = f"""
            语音转写任务处理完成:
            
            总文件数: {process_result['total_files']}
            成功: {process_result['success_count']}
            失败: {process_result['failure_count']}
            
            处理时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
            """
            
            # 这里可以添加邮件发送逻辑
            print("模拟发送邮件:")
            print(f"主题: {subject}")
            print(f"内容: {body}")
            
            return "邮件发送成功"
        else:
            return "无处理结果数据"
            
    except Exception as e:
        return f"邮件发送失败: {str(e)}"

# 定义DAG
with DAG(
    'voice_transcription_pipeline',
    default_args=default_args,
    description='定时语音转写任务流水线',
    schedule_interval=timedelta(hours=1),  # 每小时执行一次
    catchup=False,
    tags=['audio', 'transcription', 'batch'],
) as dag:

    # 任务1: 检查服务
    check_service = PythonOperator(
        task_id='check_asr_service',
        python_callable=check_asr_service,
    )

    # 任务2: 处理音频文件
    process_files = PythonOperator(
        task_id='process_audio_files',
        python_callable=process_audio_files,
    )

    # 任务3: 清理文件(可选)
    cleanup_files = PythonOperator(
        task_id='cleanup_processed_files',
        python_callable=cleanup_processed_files,
    )

    # 任务4: 发送通知
    send_notification = PythonOperator(
        task_id='send_summary_email',
        python_callable=send_summary_email,
    )

    # 设置任务依赖关系
    check_service >> process_files >> cleanup_files >> send_notification

    # 可选:添加数据库维护任务
    db_maintenance = MySqlOperator(
        task_id='database_maintenance',
        mysql_conn_id='mysql_conn',
        sql="""
            -- 删除30天前的失败记录
            DELETE FROM transcriptions 
            WHERE status = 'failed' 
            AND created_at < DATE_SUB(NOW(), INTERVAL 30 DAY);
            
            -- 优化表
            OPTIMIZE TABLE transcriptions;
        """,
        dag=dag,
    )

    # 每周执行一次数据库维护
    db_maintenance.set_upstream(send_notification)

4.3 配置Airflow连接

在Airflow Web界面中添加MySQL连接:

  1. 访问 http://localhost:8080
  2. 进入 Admin → Connections
  3. 添加新连接:
    • Conn Id: mysql_conn
    • Conn Type: MySQL
    • Host: your_mysql_host
    • Schema: voice_transcription
    • Login: your_username
    • Password: your_password
    • Port: 3306

5. 完整流程测试与监控

5.1 测试整个流水线

创建测试脚本来验证整个流程:

#!/bin/bash
# scripts/test_pipeline.sh

echo "=== 测试语音转写流水线 ==="

# 1. 准备测试音频文件
echo "准备测试音频..."
cp sample_audio.wav ./data/audio/test_audio_$(date +%s).wav

# 2. 运行批处理脚本
echo "运行批处理处理..."
python scripts/batch_process.py

# 3. 检查数据库结果
echo "检查数据库结果..."
mysql -u your_username -p your_password voice_transcription -e "
SELECT audio_file_path, status, LENGTH(transcription_text) as text_length 
FROM transcriptions 
ORDER BY created_at DESC 
LIMIT 5;
"

echo "=== 测试完成 ==="

5.2 添加监控和告警

创建监控脚本 scripts/monitor.py

import mysql.connector
import requests
from datetime import datetime, timedelta

class PipelineMonitor:
    def __init__(self):
        self.db_config = {
            'host': 'localhost',
            'user': 'your_username',
            'password': 'your_password',
            'database': 'voice_transcription'
        }
    
    def check_service_health(self):
        """检查服务健康状态"""
        checks = {}
        
        # 检查ASR服务
        try:
            response = requests.get('http://localhost:7860/', timeout=10)
            checks['asr_service'] = response.status_code == 200
        except:
            checks['asr_service'] = False
        
        # 检查数据库连接
        try:
            conn = mysql.connector.connect(**self.db_config)
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            checks['database'] = True
            conn.close()
        except:
            checks['database'] = False
        
        return checks
    
    def get_pipeline_stats(self, hours=24):
        """获取流水线统计信息"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        # 获取最近24小时统计
        query = """
            SELECT 
                status,
                COUNT(*) as count,
                AVG(processing_time) as avg_time,
                AVG(confidence_score) as avg_confidence
            FROM transcriptions 
            WHERE created_at >= %s
            GROUP BY status
        """
        
        start_time = datetime.now() - timedelta(hours=hours)
        cursor.execute(query, (start_time,))
        stats = cursor.fetchall()
        
        cursor.close()
        conn.close()
        
        return stats
    
    def check_for_issues(self):
        """检查潜在问题"""
        issues = []
        
        # 检查失败率
        stats = self.get_pipeline_stats(24)
        total = sum(item['count'] for item in stats)
        failed = next((item for item in stats if item['status'] == 'failed'), {'count': 0})['count']
        
        if total > 0 and failed / total > 0.1:  # 失败率超过10%
            issues.append(f"高失败率: {failed/total:.1%}")
        
        # 检查处理时间
        completed = next((item for item in stats if item['status'] == 'completed'), None)
        if completed and completed['avg_time'] > 300:  # 平均处理时间超过5分钟
            issues.append(f"处理时间过长: {completed['avg_time']:.1f}秒")
        
        return issues

# 使用示例
if __name__ == "__main__":
    monitor = PipelineMonitor()
    
    print("服务健康检查:", monitor.check_service_health())
    print("24小时统计:", monitor.get_pipeline_stats(24))
    print("发现问题:", monitor.check_for_issues())

6. 总结与后续优化

通过本教程,我们成功搭建了一个完整的自动化语音识别流水线。这个系统能够定时处理音频文件,自动转写为文字,并将结果保存到数据库中。

主要成果

  1. 部署了高性能语音识别服务:使用GLM-ASR-Nano-2512模型,支持多种音频格式
  2. 实现了自动化处理流程:通过Airflow定时调度,无需人工干预
  3. 建立了完整的数据流水线:从音频输入到文字输出再到数据库存储
  4. 配备了监控系统:可以实时了解系统运行状态和识别质量

实际应用场景

  • 每日会议录音自动转写归档
  • 客服通话记录的文字化处理
  • 播客内容的自动字幕生成
  • 音频学习材料的文字转换

后续优化建议

  1. 性能优化:可以考虑批量处理功能,同时处理多个文件
  2. 质量监控:添加语音识别质量自动评估机制
  3. 扩展性:支持分布式处理,应对大量音频文件
  4. 功能增强:添加语音分割、说话人分离等预处理功能

这个解决方案不仅节省了大量人工转写的时间,而且保证了处理过程的一致性和可追溯性。你可以根据自己的具体需求调整调度频率、处理规则和存储策略。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐