1. 项目概述:当实时语音转录遇上实时音视频

最近在做一个需要实时记录会议讨论内容的项目,客户要求不仅要把语音转成文字,还得能实时显示在屏幕上,方便远程参会的同事跟上节奏。这让我想起了两个老朋友: Agora AssemblyAI 。前者是实时音视频(RTC)领域的标杆,后者在语音AI,特别是转录和实时流处理上表现惊艳。把它们俩结合起来,做一个“实时转录机器人”(Transcription Bot),听起来是个挺酷的活儿。

简单来说,这个项目就是搭建一个服务,它能“旁听”Agora频道里的所有语音对话,然后通过AssemblyAI的 Universal-3 Pro 模型,把这些语音实时、高精度地转换成文字,再通过某种方式(比如WebSocket推送到前端,或者存入数据库)呈现出来。想象一下,在线教育场景里,老师的讲解实时变成字幕;或者在企业会议中,自动生成带发言人的会议纪要——这就是它的核心价值。

Universal-3 Pro是AssemblyAI最新的旗舰级语音识别模型,支持多语言、具备强大的抗噪和说话人分离(Speaker Diarization)能力,对于多人会议场景简直是神器。而Agora则提供了稳定、低延迟的音频流接入能力。这个项目的难点不在于调用单个API,而在于如何让这两个云端服务“流畅对话”,处理好实时音频流的接收、转发、状态管理和错误恢复。

2. 核心架构设计与技术选型

2.1 为什么是“Bot”架构?

首先得明确,这个转录机器人(Bot)并不是运行在终端用户设备上的。让每个用户的客户端同时进行音频录制和上传转录,既不经济(消耗用户流量和算力),也难以管理(多路音频合并、去重、同步是噩梦)。因此, 服务端Bot架构 是更合理的选择。

它的工作流程是这样的:

  1. Bot作为“隐身用户”加入频道 :我们的服务端程序,使用Agora的SDK,以一个“机器人”虚拟用户的身份,加入到指定的Agora音视频频道。
  2. 订阅并接收音频流 :Bot订阅频道内所有(或指定)用户的音频流。Agora服务端会将混合后的音频流(或分轨的音频流,取决于配置)推送给Bot。
  3. 流转发至AssemblyAI :Bot将收到的原始音频数据(通常是PCM格式),按照AssemblyAI实时流API的要求进行封装(如转为合适的编码、分块),并通过WebSocket建立持久连接,持续发送。
  4. 接收并处理转录结果 :AssemblyAI处理音频流,实时返回转录的文本片段(包括中间结果和最终结果)、说话人标签、时间戳等信息。Bot需要接收这些结果,进行必要的后处理(如合并句子、管理说话人切换)。
  5. 结果分发 :处理好的转录文本,通过WebSocket、Server-Sent Events (SSE) 或写入消息队列(如Redis Pub/Sub),分发给前端应用或其他下游服务(如纪要生成服务、数据库)。

这种架构的优点是集中化、可控性强。一个Bot可以服务一个频道,转录质量稳定,且不会干扰真实用户的体验。

2.2 关键组件与技术栈拆解

要实现这个流程,我们需要几个核心组件:

  1. Bot服务(核心逻辑层)

    • 语言 :Node.js (推荐) / Python / Go。考虑到实时I/O密集和生态,Node.js是很好的选择,其事件驱动模型天然适合处理WebSocket和流数据。Python在AI集成和快速原型上也有优势。
    • 核心库
      • Agora RTC SDK (Server-side) :用于以虚拟用户身份加入频道。Agora提供了专门的“录制SDK”或“云端录制”方案,但对于自定义转录Bot,我们更常用的是 RTM (Real-time Messaging) SDK RESTful API 来获取频道内的音频流地址(Token),然后结合低层级的媒体流处理库。实际上,更直接的方式是使用Agora的 云端录制 服务获取音频流,或者使用支持服务端订阅的SDK(如某些版本的Node.js SDK)。
      • AssemblyAI Realtime SDK :官方提供了WebSocket客户端库,极大简化了连接建立、音频发送和结果接收的流程。
    • 音频处理库 :如 ffmpeg (通过命令行或 fluent-ffmpeg 库调用)或专门的音频处理库(如 node-lame 用于MP3编码)。因为从Agora接收的音频可能是特定的编码格式(如OPUS),而AssemblyAI实时API通常支持PCM、MP3等格式,可能需要转码或重采样。
  2. 前后端通信层

    • 结果推送 :使用 Socket.IO 或纯 WebSocket ( ws 库) 将转录结果实时推送到网页客户端。Socket.IO提供了房间、自动重连等便利功能。
    • API接口 :使用 Express.js (Node.js) 或 FastAPI (Python) 提供RESTful API,用于创建转录任务、管理Bot生命周期(启动/停止)、查询历史记录等。
  3. 状态与数据持久层

    • 内存/缓存 :使用 Redis 存储活跃的转录会话状态、临时结果、用户-Bot映射关系。Redis的Pub/Sub功能也可用于内部微服务通信。
    • 数据库 :使用 PostgreSQL MongoDB 存储最终的转录文稿、会议元数据(频道ID、时间、参与者)等,以供回放和分析。
  4. 基础设施与部署

    • 容器化 :使用 Docker 封装Bot服务,确保环境一致性。
    • 编排与部署 :使用 Kubernetes 或简单的 Docker Compose 进行部署和管理。由于Bot是有状态服务(每个Bot关联一个频道),需要谨慎管理其生命周期。
    • 网络 :确保服务器有良好的网络连接,低延迟地连接Agora和AssemblyAI的全球服务节点。可以考虑将Bot部署在离主要用户区域或Agora数据中心较近的云服务器上。

注意:关于音频流获取的深度解析 这是本项目最大的技术难点之一。Agora官方并不直接提供一个“拉取频道内所有音频”的简单API。常见方案有:

  1. 云端录制 :开通Agora云端录制,录制文件会存储在云端(如阿里云OSS)。但这不是真正的“实时”,有分钟级延迟。我们可以通过订阅录制事件,在文件生成后立即下载并转给AssemblyAI,适用于对实时性要求不高(如会后纪要生成)的场景。
  2. 服务端SDK订阅 :某些版本的Agora服务端SDK(需联系销售确认)支持以“监听者”角色加入频道,直接接收音频流。这是最理想的实时方案。
  3. 客户端转发 :让一个“影子”客户端(由服务端控制)加入频道,录制音频并上传到自己的服务器。这种方法不稳定,且违反大多数服务条款, 不推荐 。 在本博文中,我们将基于 方案2(服务端SDK订阅) 进行假设和设计,这是构建低延迟实时转录Bot的正统路径。如果你的Agora项目不支持此功能,方案1(云端录制+异步处理)是可行的备选。

3. 实操搭建:从零构建转录Bot

3.1 环境准备与依赖安装

假设我们选择Node.js作为主要开发语言。首先初始化项目并安装核心依赖。

mkdir agora-assemblyai-bot
cd agora-assemblyai-bot
npm init -y

安装必要的npm包:

npm install agora-access-token express socket.io redis dotenv
npm install assemblyai  # AssemblyAI官方Node.js SDK,包含实时转录功能
# 注意:Agora的服务端Node.js SDK可能需要从官方渠道特定获取,这里假设包名为 `agora-rtc-sdk-server`
# npm install agora-rtc-sdk-server

对于音频处理,我们可能需要 ffmpeg 。在Ubuntu系统上:

sudo apt update
sudo apt install ffmpeg

在项目中,我们可以使用 fluent-ffmpeg 来编程式调用:

npm install fluent-ffmpeg

创建项目基础结构:

agora-assemblyai-bot/
├── .env                    # 环境变量
├── package.json
├── server.js              # 主入口文件
├── bot/                   # Bot核心逻辑
│   ├── AgoraClient.js    # Agora客户端封装
│   ├── AssemblyAIClient.js # AssemblyAI客户端封装
│   └── TranscriptionSession.js # 转录会话管理
├── routes/               # API路由
│   └── api.js
├── utils/                # 工具函数
│   └── audioProcessor.js
└── public/               # 前端静态文件(可选)

3.2 核心模块实现:Agora客户端封装

bot/AgoraClient.js 负责连接Agora频道并获取音频流。这里我们模拟使用一个支持服务端订阅的SDK。

// bot/AgoraClient.js
const { EventEmitter } = require('events');
// 假设引入的是Agora服务端SDK
const AgoraSDK = require('agora-rtc-sdk-server'); // 示例,实际包名可能不同

class AgoraClient extends EventEmitter {
  constructor(appId, appCertificate, channelName, uid) {
    super();
    this.appId = appId;
    this.appCertificate = appCertificate;
    this.channelName = channelName;
    this.uid = uid || 0; // 服务端Bot的UID,通常用一个数字
    this.client = null;
    this.audioStream = null;
    this.isJoined = false;
  }

  async join() {
    try {
      // 1. 创建客户端实例 (假设SDK提供createClient方法)
      this.client = AgoraSDK.createClient({ mode: 'live', codec: 'vp8' }); // 音频场景下codec影响不大

      // 2. 初始化客户端
      await this.client.init(this.appId);

      // 3. 加入频道
      // 需要生成Token。生产环境应从安全的后端服务获取,此处为演示。
      const token = this._generateToken(); // 实现一个Token生成函数
      await this.client.join(this.appCertificate, this.channelName, this.uid, token);

      this.isJoined = true;
      console.log(`Bot joined channel: ${this.channelName} with UID: ${this.uid}`);

      // 4. 订阅频道内所有远程音频流
      this.client.on('stream-added', (evt) => {
        const stream = evt.stream;
        if (stream.getType() === 'audio') {
          this.client.subscribe(stream, { audio: true, video: false }, (err) => {
            if (!err) {
              console.log('Subscribed to remote audio stream:', stream.getId());
              this._setupAudioStream(stream);
            }
          });
        }
      });

      // 5. 处理流移除等事件
      this.client.on('stream-removed', (evt) => {
        console.log('Remote audio stream removed:', evt.stream.getId());
        // 处理说话人离开的逻辑
      });

    } catch (error) {
      console.error('Failed to join Agora channel:', error);
      throw error;
    }
  }

  _setupAudioStream(remoteStream) {
    // 这里是关键:获取原始的音频数据
    // 具体方法取决于SDK,可能需要监听 'audio-frame' 事件或使用 `getAudioTrack` 等方法
    // 假设SDK提供了 `on('audio-frame')` 事件
    remoteStream.on('audio-frame', (frame) => {
      // frame 可能包含 buffer, sampleRate, sampleBits, channelCount 等信息
      // 将音频帧数据发射出去,供后续处理
      this.emit('audioData', {
        buffer: frame.buffer,
        sampleRate: frame.sampleRate,
        channelCount: frame.channelCount,
        timestamp: Date.now()
      });
    });

    this.audioStream = remoteStream;
  }

  _generateToken() {
    // 简化版Token生成,生产环境请使用Agora官方Token服务器或安全的后端逻辑
    const { RtcTokenBuilder, RtcRole } = require('agora-access-token');
    const expirationTimeInSeconds = 3600; // 1小时
    const currentTimestamp = Math.floor(Date.now() / 1000);
    const privilegeExpiredTs = currentTimestamp + expirationTimeInSeconds;

    return RtcTokenBuilder.buildTokenWithUid(
      this.appId,
      this.appCertificate,
      this.channelName,
      this.uid,
      RtcRole.SUBSCRIBER, // Bot是订阅者角色
      privilegeExpiredTs
    );
  }

  async leave() {
    if (this.client && this.isJoined) {
      await this.client.leave();
      this.isJoined = false;
      console.log(`Bot left channel: ${this.channelName}`);
    }
  }
}

module.exports = AgoraClient;

实操心得:Token管理 上面的 _generateToken 函数仅用于演示。在生产环境中, 绝对不要 将App Certificate硬编码在客户端或Bot服务端。正确做法是:建立一个独立的、安全的 Token服务器 (可以是一个简单的微服务)。当Bot需要加入频道时,向这个Token服务器请求一个临时Token。Token服务器使用App Certificate生成Token后返回给Bot。这样保证了核心密钥的安全。

3.3 核心模块实现:AssemblyAI实时转录客户端

bot/AssemblyAIClient.js 负责连接AssemblyAI的实时API,发送音频数据并接收转录结果。

// bot/AssemblyAIClient.js
const { AssemblyAI } = require('assemblyai');
const { EventEmitter } = require('events');

class AssemblyAIClient extends EventEmitter {
  constructor(apiKey, config = {}) {
    super();
    this.client = new AssemblyAI({ apiKey });
    this.realtimeSession = null;
    this.config = {
      sampleRate: 16000, // AssemblyAI实时API常用采样率
      encoding: 'pcm_s16le', // 常用编码
      language: 'en', // 默认英语,Universal-3 Pro支持多语言
      ...config
    };
    this.isConnected = false;
  }

  async connect() {
    try {
      // 使用SDK创建实时会话
      this.realtimeSession = this.client.realtime.transcriber({
        sample_rate: this.config.sampleRate,
        encoding: this.config.encoding,
        language_code: this.config.language,
        // 启用说话人分离
        speaker_diarization: true,
        // 使用Universal-3 Pro模型
        model: 'universal-3-pro',
        // 可选:启用实时端点检测,优化句子切分
        // endpointing: 500, // 静音500ms后判定句子结束
      });

      // 监听转录结果
      this.realtimeSession.on('transcript', (transcript) => {
        if (!transcript.text) return; // 忽略空结果
        // 发射转录结果,包含文本、说话人、时间戳、是否为最终结果
        this.emit('transcription', {
          text: transcript.text,
          speaker: transcript.speaker, // 说话人标签,如'A', 'B'
          isFinal: transcript.type === 'final', // 'partial'为中间结果,'final'为最终结果
          timestamp: new Date(),
          confidence: transcript.confidence,
          words: transcript.words // 词级时间戳(如果启用)
        });
      });

      // 监听会话打开
      this.realtimeSession.on('open', () => {
        console.log('Connected to AssemblyAI Realtime API');
        this.isConnected = true;
        this.emit('connected');
      });

      // 监听错误和关闭
      this.realtimeSession.on('error', (error) => {
        console.error('AssemblyAI Realtime Error:', error);
        this.emit('error', error);
      });

      this.realtimeSession.on('close', () => {
        console.log('Disconnected from AssemblyAI Realtime API');
        this.isConnected = false;
        this.emit('disconnected');
      });

      // 建立连接
      await this.realtimeSession.connect();

    } catch (error) {
      console.error('Failed to connect to AssemblyAI:', error);
      throw error;
    }
  }

  // 发送音频数据块
  sendAudio(audioBuffer) {
    if (this.realtimeSession && this.isConnected) {
      // 确保音频数据格式正确(如采样率、位深、通道数)
      // 这里可能需要根据Agora传来的数据做转换
      this.realtimeSession.sendAudio(audioBuffer);
    } else {
      console.warn('Cannot send audio, AssemblyAI session not ready.');
    }
  }

  async disconnect() {
    if (this.realtimeSession) {
      await this.realtimeSession.close();
      this.realtimeSession = null;
      this.isConnected = false;
    }
  }
}

module.exports = AssemblyAIClient;

3.4 音频处理桥梁:连接两个世界

从Agora接收的音频帧格式(可能是48kHz采样率、OPUS编码)很可能与AssemblyAI实时API要求的格式(如16kHz、PCM S16LE)不匹配。我们需要一个音频处理模块。

utils/audioProcessor.js 负责格式转换。这里我们使用 ffmpeg 进行重采样和转码,因为它能处理几乎所有格式。

// utils/audioProcessor.js
const { spawn } = require('child_process');
const { Writable } = require('stream');

class AudioProcessor {
  /**
   * 将输入的音频Buffer转换为目标格式
   * @param {Buffer} inputBuffer - 原始音频数据
   * @param {Object} inputFormat - { sampleRate: 48000, channels: 2, format: 's16le' }
   * @param {Object} outputFormat - { sampleRate: 16000, channels: 1, format: 's16le' }
   * @returns {Promise<Buffer>} - 转换后的音频Buffer
   */
  static async convert(inputBuffer, inputFormat, outputFormat) {
    return new Promise((resolve, reject) => {
      const args = [
        '-f', inputFormat.format, // 输入格式
        '-ar', inputFormat.sampleRate.toString(), // 输入采样率
        '-ac', inputFormat.channels.toString(), // 输入声道数
        '-i', 'pipe:0', // 从标准输入读取
        '-f', outputFormat.format, // 输出格式
        '-ar', outputFormat.sampleRate.toString(), // 输出采样率
        '-ac', outputFormat.channels.toString(), // 输出声道数
        '-', // 输出到标准输出
      ];

      const ffmpeg = spawn('ffmpeg', args);
      const chunks = [];

      ffmpeg.stdout.on('data', (chunk) => chunks.push(chunk));
      ffmpeg.stdout.on('end', () => resolve(Buffer.concat(chunks)));
      
      ffmpeg.stderr.on('data', (data) => {
        // 可以记录ffmpeg日志,用于调试
        // console.debug(`ffmpeg stderr: ${data}`);
      });

      ffmpeg.on('error', reject);
      ffmpeg.on('close', (code) => {
        if (code !== 0) {
          reject(new Error(`ffmpeg process exited with code ${code}`));
        }
      });

      ffmpeg.stdin.write(inputBuffer);
      ffmpeg.stdin.end();
    });
  }

  /**
   * 创建一个持续的音频转换流
   * 适用于实时音频流处理,避免为每个小帧频繁启动ffmpeg进程(性能差)
   * 更高级的实现:使用 `ffmpeg-stream` 或 `node-fluent-ffmpeg` 创建持久化的转换流。
   * 此处为简化示例,实际生产环境建议使用更高效的流式处理库。
   */
  static createConversionStream(inputFormat, outputFormat) {
    // 这是一个复杂但更高效的方法,需要管理ffmpeg子进程的生命周期和流式I/O。
    // 由于篇幅,这里仅给出概念提示。
    console.warn('createConversionStream not fully implemented. Consider using fluent-ffmpeg for robust streaming.');
    // 返回一个Transform流,将输入Buffer转换为输出Buffer
  }
}

module.exports = AudioProcessor;

3.5 会话管理:粘合一切的核心

bot/TranscriptionSession.js 是大脑,它实例化AgoraClient和AssemblyAIClient,管理音频数据的流动和转录结果的分发。

// bot/TranscriptionSession.js
const AgoraClient = require('./AgoraClient');
const AssemblyAIClient = require('./AssemblyAIClient');
const AudioProcessor = require('../utils/audioProcessor');
const EventEmitter = require('events');

class TranscriptionSession extends EventEmitter {
  constructor(channelName, sessionId, io) {
    super();
    this.channelName = channelName;
    this.sessionId = sessionId; // 唯一会话ID
    this.io = io; // Socket.IO实例,用于广播结果
    this.agoraClient = null;
    this.assemblyAIClient = null;
    this.isActive = false;
    this.transcriptionBuffer = []; // 用于临时存储和合并句子
  }

  async start(agoraConfig, assemblyAIConfig) {
    if (this.isActive) {
      console.warn(`Session ${this.sessionId} is already active.`);
      return;
    }

    try {
      // 1. 初始化并连接Agora客户端
      this.agoraClient = new AgoraClient(
        agoraConfig.appId,
        agoraConfig.appCertificate,
        this.channelName,
        agoraConfig.uid
      );
      
      // 监听Agora的音频数据
      this.agoraClient.on('audioData', async (audioFrame) => {
        if (!this.assemblyAIClient?.isConnected) return;

        // 2. 音频格式转换 (示例:Agora OPUS 48kHz -> AssemblyAI PCM 16kHz)
        // 注意:需要根据实际从Agora SDK获取的音频帧格式调整参数
        const inputFormat = {
          sampleRate: audioFrame.sampleRate || 48000,
          channels: audioFrame.channelCount || 1,
          format: 's16le' // 假设Agora SDK已解码为PCM
        };
        const outputFormat = {
          sampleRate: 16000,
          channels: 1, // AssemblyAI实时API通常推荐单声道
          format: 's16le'
        };

        try {
          const convertedBuffer = await AudioProcessor.convert(
            audioFrame.buffer,
            inputFormat,
            outputFormat
          );
          
          // 3. 发送转换后的音频到AssemblyAI
          this.assemblyAIClient.sendAudio(convertedBuffer);
        } catch (convError) {
          console.error('Audio conversion failed:', convError);
        }
      });

      await this.agoraClient.join();

      // 4. 初始化并连接AssemblyAI客户端
      this.assemblyAIClient = new AssemblyAIClient(assemblyAIConfig.apiKey, {
        sampleRate: 16000,
        language: assemblyAIConfig.language || 'en',
      });

      this.assemblyAIClient.on('transcription', (result) => {
        // 5. 处理转录结果
        this._handleTranscriptionResult(result);
      });

      this.assemblyAIClient.on('error', (error) => {
        this.emit('error', error);
      });

      await this.assemblyAIClient.connect();

      this.isActive = true;
      console.log(`Transcription session started for channel: ${this.channelName}, ID: ${this.sessionId}`);
      this.emit('started');

    } catch (error) {
      console.error(`Failed to start transcription session ${this.sessionId}:`, error);
      await this.stop(); // 清理资源
      throw error;
    }
  }

  _handleTranscriptionResult(result) {
    // 简单的结果后处理:合并中间结果,管理最终结果
    if (result.isFinal) {
      // 最终结果,可以存储到数据库
      console.log(`[Final] Speaker ${result.speaker}: ${result.text}`);
      // 广播给所有连接到这个会话的Web客户端
      if (this.io) {
        this.io.to(this.sessionId).emit('transcription_final', {
          sessionId: this.sessionId,
          speaker: result.speaker,
          text: result.text,
          timestamp: result.timestamp,
          words: result.words
        });
      }
      // 也可以存入数据库
      // await this.saveToDatabase(result);
    } else {
      // 中间结果,可以实时显示在UI上,增加用户体验
      // console.log(`[Partial] Speaker ${result.speaker}: ${result.text}`);
      if (this.io) {
        this.io.to(this.sessionId).emit('transcription_partial', {
          sessionId: this.sessionId,
          speaker: result.speaker,
          text: result.text,
          timestamp: result.timestamp
        });
      }
    }
    // 触发自定义事件,供其他服务监听
    this.emit('result', result);
  }

  async stop() {
    this.isActive = false;
    if (this.assemblyAIClient) {
      await this.assemblyAIClient.disconnect();
    }
    if (this.agoraClient) {
      await this.agoraClient.leave();
    }
    console.log(`Transcription session stopped: ${this.sessionId}`);
    this.emit('stopped');
  }

  getStatus() {
    return {
      sessionId: this.sessionId,
      channelName: this.channelName,
      isActive: this.isActive,
      agoraJoined: this.agoraClient?.isJoined || false,
      assemblyAIConnected: this.assemblyAIClient?.isConnected || false,
    };
  }
}

module.exports = TranscriptionSession;

3.6 主服务与API路由

server.js routes/api.js 负责启动HTTP/Socket服务器,并提供管理API。

// server.js
require('dotenv').config();
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const Redis = require('redis');
const apiRoutes = require('./routes/api');

const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
  cors: {
    origin: process.env.CLIENT_URL || '*', // 生产环境应严格限制
    methods: ['GET', 'POST']
  }
});

// 内存存储活跃会话(生产环境应用Redis持久化)
const activeSessions = new Map();

// 简单的会话管理器
const sessionManager = {
  createSession: (channelName, sessionId) => {
    // 从环境变量或配置中心获取密钥
    const agoraConfig = {
      appId: process.env.AGORA_APP_ID,
      appCertificate: process.env.AGORA_APP_CERTIFICATE,
      uid: Math.floor(Math.random() * 100000) // 生成一个随机UID
    };
    const assemblyAIConfig = {
      apiKey: process.env.ASSEMBLYAI_API_KEY,
      language: 'en' // 可从请求中动态获取
    };

    const session = new TranscriptionSession(channelName, sessionId, io);
    activeSessions.set(sessionId, session);

    session.on('error', (err) => {
      console.error(`Session ${sessionId} error:`, err);
      // 可以考虑错误恢复逻辑
    });

    session.on('stopped', () => {
      activeSessions.delete(sessionId);
    });

    return session;
  },
  getSession: (sessionId) => activeSessions.get(sessionId),
  stopSession: async (sessionId) => {
    const session = activeSessions.get(sessionId);
    if (session) {
      await session.stop();
    }
  }
};

// 将管理器附加到app实例,方便路由访问
app.set('sessionManager', sessionManager);

app.use(express.json());
app.use('/api', apiRoutes);

// Socket.IO连接处理
io.on('connection', (socket) => {
  console.log('New client connected:', socket.id);

  socket.on('join_session', (data) => {
    const { sessionId } = data;
    if (sessionId) {
      socket.join(sessionId);
      console.log(`Client ${socket.id} joined session ${sessionId}`);
      socket.emit('session_joined', { sessionId });
    }
  });

  socket.on('disconnect', () => {
    console.log('Client disconnected:', socket.id);
  });
});

const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
  console.log(`Transcription Bot Server running on port ${PORT}`);
});
// routes/api.js
const express = require('express');
const router = express.Router();

// POST /api/session/start - 启动一个新的转录会话
router.post('/session/start', async (req, res) => {
  const { channelName, sessionId } = req.body;
  const sessionManager = req.app.get('sessionManager');

  if (!channelName || !sessionId) {
    return res.status(400).json({ error: 'channelName and sessionId are required' });
  }

  if (sessionManager.getSession(sessionId)) {
    return res.status(409).json({ error: 'Session already exists' });
  }

  try {
    const session = sessionManager.createSession(channelName, sessionId);
    await session.start(
      {
        appId: process.env.AGORA_APP_ID,
        appCertificate: process.env.AGORA_APP_CERTIFICATE,
        uid: Math.floor(Math.random() * 100000)
      },
      {
        apiKey: process.env.ASSEMBLYAI_API_KEY,
        language: req.body.language || 'en'
      }
    );
    res.status(201).json({ 
      message: 'Transcription session started',
      sessionId,
      channelName,
      status: session.getStatus()
    });
  } catch (error) {
    console.error('Failed to start session:', error);
    res.status(500).json({ error: 'Failed to start transcription session', details: error.message });
  }
});

// GET /api/session/:sessionId/status - 获取会话状态
router.get('/session/:sessionId/status', (req, res) => {
  const session = req.app.get('sessionManager').getSession(req.params.sessionId);
  if (!session) {
    return res.status(404).json({ error: 'Session not found' });
  }
  res.json({ status: session.getStatus() });
});

// POST /api/session/:sessionId/stop - 停止转录会话
router.post('/session/:sessionId/stop', async (req, res) => {
  const sessionManager = req.app.get('sessionManager');
  try {
    await sessionManager.stopSession(req.params.sessionId);
    res.json({ message: 'Transcription session stopped' });
  } catch (error) {
    res.status(500).json({ error: 'Failed to stop session' });
  }
});

module.exports = router;

4. 部署、优化与问题排查

4.1 部署到生产环境

  1. 环境变量配置 :创建 .env 文件(但不要提交到Git),并在生产环境(如Docker或服务器环境变量)中设置。

    AGORA_APP_ID=your_agora_app_id
    AGORA_APP_CERTIFICATE=your_agora_app_certificate
    ASSEMBLYAI_API_KEY=your_assemblyai_api_key
    REDIS_URL=redis://redis:6379
    PORT=3000
    CLIENT_URL=https://your-frontend.com
    
  2. Docker化 :创建 Dockerfile docker-compose.yml

    # Dockerfile
    FROM node:18-alpine
    WORKDIR /app
    COPY package*.json ./
    RUN npm ci --only=production
    RUN apk add --no-cache ffmpeg
    COPY . .
    EXPOSE 3000
    CMD ["node", "server.js"]
    
    # docker-compose.yml
    version: '3.8'
    services:
      bot:
        build: .
        ports:
          - "3000:3000"
        environment:
          - NODE_ENV=production
          - AGORA_APP_ID=${AGORA_APP_ID}
          - AGORA_APP_CERTIFICATE=${AGORA_APP_CERTIFICATE}
          - ASSEMBLYAI_API_KEY=${ASSEMBLYAI_API_KEY}
          - REDIS_URL=redis://redis:6379
        depends_on:
          - redis
        restart: unless-stopped
      redis:
        image: redis:7-alpine
        ports:
          - "6379:6379"
        volumes:
          - redis_data:/data
    volumes:
      redis_data:
    
  3. 使用PM2或Kubernetes管理进程 :对于高可用部署,使用PM2进行进程管理,或使用Kubernetes部署多个Bot实例,并通过负载均衡器分配频道。注意:每个Bot实例是有状态的(绑定特定频道),因此负载均衡策略需要是“会话亲和性”的。

4.2 性能优化与成本控制

  1. 音频处理优化

    • 批处理音频帧 :不要每收到一个音频帧(可能只有几十毫秒)就调用一次 ffmpeg 转换。可以缓冲几百毫秒的音频数据,然后批量转换发送,减少进程创建开销。
    • 使用原生音频处理库 :对于固定的格式转换(如48kHz OPUS到16kHz PCM),可以考虑使用Node.js的 node-opus speaker 等原生绑定库,性能远高于启动外部 ffmpeg 进程。
    • 调整音频参数 :AssemblyAI Universal-3 Pro在16kHz单声道下已有优秀表现。确保从Agora接收的音频经过 回声消除 噪声抑制 处理,能显著提升转录准确率。
  2. AssemblyAI API 使用优化

    • 合理使用 endpointing 参数 :设置合适的静音检测阈值(如 endpointing: 500 ),让模型能更准确地切分句子,减少“一句话说一半就输出”的中间结果,提升最终结果的连贯性。
    • 选择性启用功能 :如果不需要词级时间戳,可以关闭 word_boost 等高级功能以减少延迟和成本。
    • 监控用量 :AssemblyAI按音频时长计费。实现会话自动超时关闭(如频道空闲30分钟后自动停止Bot),避免产生不必要的费用。
  3. Agora 端优化

    • 使用合流模式 :如果Bot只需要整体的会议录音,可以在Agora云端录制服务中配置合流模式,让Agora服务器将多路音频混合成一路,Bot只需处理一个流,简化逻辑。
    • 控制订阅流 :如果频道人数很多,但只有少数人发言,可以动态调整Bot订阅的音频流,而不是订阅所有人。

4.3 常见问题与排查技巧

问题1:转录延迟很高(超过5秒)。

  • 排查
    1. 网络链路 :使用 ping traceroute 检查Bot服务器到Agora和AssemblyAI服务节点的延迟。考虑将Bot部署在离用户主要区域或云服务商更近的位置。
    2. 音频缓冲 :检查音频处理环节的缓冲大小。过大的缓冲会增加延迟。尝试将缓冲时间从500ms减少到200ms。
    3. AssemblyAI连接 :确保使用的是AssemblyAI的 实时流API ,而不是异步文件上传API。检查WebSocket连接是否稳定,有无频繁重连。
    4. Bot服务器负载 :检查服务器CPU和内存使用率。音频转码(尤其是 ffmpeg )是CPU密集型操作。

问题2:转录准确率低,尤其在多人同时说话时。

  • 排查与解决
    1. 确认模型 :确保在AssemblyAIClient配置中明确指定了 model: 'universal-3-pro' ,这是目前准确率最高的模型。
    2. 启用说话人分离 :检查 speaker_diarization: true 是否已设置。Universal-3 Pro的说话人分离能力很强。
    3. 音频质量 :检查从Agora接收的音频质量。如果原始音频有严重回声、啸叫或背景噪声,再好的模型也无能为力。确保Agora频道内的用户使用了合适的音频设备并开启了Agora SDK的音频预处理(如AEC、ANS)。
    4. 声道问题 :确保发送给AssemblyAI的是 单声道 音频。立体声可能会干扰模型。

问题3:Bot加入Agora频道失败,报错“Token无效”或“动态密钥过期”。

  • 解决
    1. Token生成逻辑 :确保Token服务器使用的 appId appCertificate channelName uid 与Bot尝试加入时使用的完全一致。
    2. Token过期 :Token有有效期(通常1小时)。实现Token自动刷新机制。在Bot中加入逻辑,在Token过期前(如剩余5分钟时)向Token服务器申请新Token,并使用Agora SDK的 renewToken 方法更新。
    3. 角色权限 :Bot加入频道时使用的Token,其角色( RtcRole )必须包含订阅流的权限(即 SUBSCRIBER )。

问题4:WebSocket连接(Socket.IO)不稳定,前端收不到实时转录。

  • 排查
    1. 防火墙/代理 :确保服务器防火墙开放了Socket.IO使用的端口(默认是3000,以及可能的WebSocket升级端口)。
    2. 心跳与重连 :Socket.IO客户端应配置自动重连。服务器端也需要处理连接中断后的清理工作。
    3. 房间管理 :确保前端 socket.emit('join_session', {sessionId}) 时, sessionId 与后端创建的会话ID完全匹配,且后端正确执行了 socket.join(sessionId)

问题5:内存泄漏,Bot运行一段时间后崩溃。

  • 排查
    1. 事件监听器 :确保在 TranscriptionSession stop 方法中,移除了所有自定义事件监听器,并正确关闭了Agora和AssemblyAI的连接。
    2. 音频缓冲 :检查 transcriptionBuffer 或任何用于累积音频/文本的数组是否被定期清理。
    3. 使用内存分析工具 :在Node.js中,可以使用 --inspect 标志启动服务,然后使用Chrome DevTools或 clinic.js 等工具进行堆内存快照分析,查找未被释放的对象引用。

构建这样一个实时转录Bot,就像在Agora和AssemblyAI这两座强大的云服务之间架起一座高精度、低延迟的数据桥梁。每一个环节——从音频流的抓取、格式转换、实时传输,到结果的处理与分发——都需要仔细设计和调试。当看到第一句清晰的、带说话人标签的文字从嘈杂的会议音频中实时呈现出来时,你会觉得这一切的折腾都是值得的。这个项目不仅是一个工具,更是一个理解现代实时音视频与AI语音处理如何协同工作的绝佳范例。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐