1. 项目概述:当实时音视频遇上AI转录

最近在做一个挺有意思的集成项目,核心是把Agora的实时音视频通信能力和AssemblyAI最新的Universal-3 Pro语音转文字模型结合起来,打造一个能实时转录会议、直播、客服对话的“智能速记员”。这个组合听起来简单,但实际做下来,从架构设计到性能调优,每一步都有不少门道。Agora大家应该不陌生,做实时音视频通信的头部服务商,SDK成熟稳定,延迟能做到毫秒级。AssemblyAI则是语音AI领域的佼佼者,他们的Universal-3 Pro模型号称在准确率、多语言支持和实时性上都有显著提升,特别适合需要高精度转录的场景。

这个转录机器人的核心价值在于,它能把实时发生的语音对话,几乎是同步地转换成结构化的文字稿。想象一下在线教育场景,老师讲课的同时,侧边栏就实时滚动着字幕和重点摘要;或者是在跨国会议里,不同语言的发言能被实时翻译并转写成文字,打破沟通壁垒;再比如内容审核,对直播语音进行实时关键词监测。这些场景对延迟和准确率的要求都非常苛刻,传统的录音后处理方案根本满足不了。

我自己在搭建过程中,主要解决了几个核心问题:如何低延迟、高可靠地从Agora拉取音频流;如何将音频流适配成AssemblyAI API要求的格式并进行高效传输;如何处理转录结果的返回、展示与存储;以及如何应对网络波动、服务中断等异常情况。整个项目涉及前端、后端、音视频处理、网络通信多个层面,是一个典型的全栈集成项目。接下来,我就把整个实现思路、关键步骤以及踩过的坑,详细拆解一遍。

2. 核心架构设计与技术选型

2.1 为什么选择Agora + AssemblyAI这个组合?

在做技术选型时,我对比过几种方案。比如直接用浏览器的 Web Speech API WebRTC getUserMedia 配合本地语音识别库。前者识别准确率一般,且对中文支持不佳;后者则对客户端性能要求高,模型体积大,不适合复杂场景。也考虑过用FFmpeg拉流推到其他云服务商的语音识别接口,但整体链路长,延迟控制复杂。

最终选择Agora + AssemblyAI,是基于以下几个核心考量:

  1. 专业性分工 :Agora专注解决实时音视频的“传输”问题,在全球节点部署、抗弱网、超低延迟方面是专家。AssemblyAI专注解决“识别”问题,其Universal-3 Pro模型在嘈杂环境、多人对话、专业术语识别上表现突出。让专业的人做专业的事,集成起来反而更可靠。
  2. 延迟与实时性 :Agora的音频流可以极低延迟(通常<200ms)地推送到自建服务端。AssemblyAI的实时转录API(Realtime API)支持WebSocket长连接,音频流分片发送,转录结果流式返回,端到端延迟可以控制在1-2秒内,对于大多数交互场景是可接受的。
  3. 可控性与成本 :相比于使用某些打包好的“一站式”解决方案,自己集成Agora和AssemblyAI,对数据流、业务逻辑有完全的控制权。你可以决定哪些频道的音频需要转录,转录结果如何处理(是实时显示、存储还是触发其他业务动作)。成本上也更清晰,Agora按语音时长计费,AssemblyAI按音频处理时长计费,用多少算多少。
  4. 功能扩展性 :AssemblyAI Universal-3 Pro不仅提供转写,还支持说话人分离(谁在说话)、情感分析、内容摘要、实体识别(人名、地点)等高级功能。通过API可以灵活启用这些功能,为业务赋能,比如自动生成会议纪要要点,或标记出讨论中的关键决策。

基于这些原因,我决定采用 服务端集中处理 的架构。所有Agora频道内的音频流,由我们部署的一个中间服务(转录机器人)来订阅、处理并转发给AssemblyAI,再将结果分发出去。这样避免了客户端性能差异和网络环境的影响,也便于统一管理和监控。

2.2 系统架构全景图与组件职责

整个系统的数据流是这样的:

[Agora客户端 (说话者)] --(音频流)--> [Agora云服务]
                                           |
                                           | (通过Agora服务端SDK订阅)
                                           v
                                [转录机器人服务 (Node.js/Python)]
                                           |
                              (音频格式转换、分片、流式发送)
                                           v
                                [AssemblyAI Realtime API]
                                           |
                              (流式返回JSON格式的转录结果)
                                           v
                                [转录机器人服务]
                                           |
             (结果处理:合并、说话人标签、分发到WebSocket/存储/业务系统)
                                           v
                     [Web客户端 / 数据存储 / 消息队列]

各核心组件职责详解:

  1. Agora客户端 :负责采集和发送用户的音频。对于转录机器人来说,它是音频源。我们需要确保客户端使用合适的音频编码格式(如OPUS)和参数(采样率、声道数),以保证音质的同时不过度占用带宽。
  2. Agora云服务 :负责全球范围内的音频流路由、转发和混音。这是我们无法干预的黑盒,但提供了强大的服务端SDK(如Agora的RESTful API或更底层的云端录制SDK),允许我们以“特权用户”身份加入频道,订阅所有或指定用户的音频流。
  3. 转录机器人服务(核心) :这是我们自己实现的后端服务,是整个系统的“大脑”。它需要完成以下关键任务:
    • 频道管理 :根据配置,动态加入或离开Agora频道。
    • 音频流接收与解码 :通过Agora服务端SDK接收音频包(可能是编码后的)。由于AssemblyAI API通常接受PCM音频数据,因此可能需要先对音频进行解码(如果Agora SDK输出的是编码格式)。
    • 音频预处理与分片 :将连续的音频流按固定时长(如200-500毫秒)分片。同时,可能需要进行重采样(确保采样率与AssemblyAI要求一致,如16kHz)、声道转换(单声道)等操作。
    • 与AssemblyAI建立并维护WebSocket连接 :为每个需要转录的音频源(或整个混合后的音频)建立一个到AssemblyAI Realtime API的WebSocket连接。需要处理连接建立、认证(API Key)、重连逻辑。
    • 流式发送与接收 :将音频分片通过WebSocket实时发送。同时监听WebSocket返回的消息,解析JSON格式的转录结果(包括临时性的 PartialTranscript 和最终确认的 FinalTranscript )。
    • 结果后处理与分发 :对返回的文本进行必要的处理,如根据说话人ID区分不同发言者,将零散的分片结果合并成连贯的句子。然后通过WebSocket服务器将结果推送给前端页面,或者存入数据库(如MongoDB用于全文检索),又或者发布到消息队列(如Redis Pub/Sub, Kafka)供其他业务系统消费。
  4. 结果消费端 :可以是显示实时字幕的Web页面(通过WebSocket连接机器人服务),可以是用于存档的关系型数据库或搜索引擎,也可以是与Slack、Teams集成的聊天机器人接口。

这个架构的关键在于 异步和非阻塞 。机器人服务必须能够同时处理多个Agora频道的音频流,每个音频流对应一个独立的AssemblyAI WebSocket连接。这就要求服务采用事件驱动模型(Node.js的天然优势)或异步框架(如Python的asyncio)。

注意: 一个常见的误区是试图在一个WebSocket连接里发送多个人的混合音频,然后指望AssemblyAI能自动区分说话人。对于多人对话场景,更好的做法是 利用Agora服务端SDK获取每个用户的独立音频流 ,为每个用户建立独立的AssemblyAI连接。这样Universal-3 Pro模型能更精准地为每个音频源进行识别和说话人标注。如果只能拿到混合音频,则需要在建立AssemblyAI连接时,显式启用 speaker_labels 参数,模型会尝试区分,但在多人同时讲话(重叠语音)时,效果会打折扣。

3. 关键实现步骤与代码拆解

3.1 环境准备与依赖安装

首先,你需要准备好以下账号和资源:

  1. Agora账号 :在Agora控制台创建项目,获取 App ID 。更重要的是,要启用并获取 服务端相关凭证 。对于云端订阅音频流,你可能需要:
    • RESTful API的Customer ID和Certificate :用于调用云端录制等高级REST API。
    • 或者,使用 Agora RTC SDK for Node.js (Server-side) :这允许你的Node.js服务像客户端一样加入频道,但需要动态生成Token(使用App ID, App Certificate, Channel Name, User ID)。这是更直接获取原始音频包的方式。
  2. AssemblyAI账号 :注册后,在设置中生成一个 API Key 。确保你的账户有足够的额度,并且注意Realtime API可能处于Beta阶段或有特殊申请流程。
  3. 服务器环境 :推荐使用Node.js (>= 16) 或 Python (>= 3.8) 环境。服务器需要有公网IP,并且网络能够稳定访问Agora和AssemblyAI的服务器(通常都在海外主流云上,确保低延迟)。

Node.js项目初始化与核心依赖:

mkdir agora-assemblyai-bot && cd agora-assemblyai-bot
npm init -y
npm install agora-access-token agora-rtc-sdk-ng assemblyai ws express
  • agora-access-token : 用于在服务端生成加入频道所需的动态Token。
  • agora-rtc-sdk-ng : Agora的Node.js服务端SDK,注意这是用于 服务端 的版本,它允许你的Node.js程序作为一个“客户端”加入频道并拉流。
  • assemblyai : AssemblyAI的官方Node.js SDK,封装了Realtime API的WebSocket连接。
  • ws : 一个简单强大的WebSocket库,用于创建向浏览器推送转录结果的WebSocket服务器。
  • express : 可选,用于提供简单的管理界面或健康检查接口。

关键配置管理: 创建一个 .env 文件或使用配置管理工具,安全地存储以下信息:

AGORA_APP_ID=你的Agora App ID
AGORA_APP_CERTIFICATE=你的Agora App Certificate
ASSEMBLYAI_API_KEY=你的AssemblyAI API Key
SERVER_PORT=3000 # 你的机器人服务端口

3.2 构建Agora音频流订阅服务

这部分是整个流程的源头,目标是让我们的Node.js服务能“听到”Agora频道里的声音。

步骤一:生成加入频道所需的Token Agora服务端SDK加入频道需要Token。我们可以在服务端动态生成。

// utils/agoraToken.js
const { RtcTokenBuilder, RtcRole } = require('agora-access-token');

function generateRtcToken(channelName, uid) {
  const appId = process.env.AGORA_APP_ID;
  const appCertificate = process.env.AGORA_APP_CERTIFICATE;
  // Token有效期,单位秒。设置一个合理的值,如3600(1小时)
  const expirationTimeInSeconds = 3600;
  // 服务端加入频道,通常使用一个固定的、高权限的UID,如100000
  const role = RtcRole.PUBLISHER; // 服务端需要发布权限吗?通常只需要SUBSCRIBER,但SDK可能需要PUBLISHER角色来初始化。

  const currentTimestamp = Math.floor(Date.now() / 1000);
  const privilegeExpiredTs = currentTimestamp + expirationTimeInSeconds;

  // 注意:服务端作为SUBSCRIBER加入时,理论上不需要发布权限。但根据SDK文档,生成Token时可能仍需指定角色。
  // 更安全的做法是生成一个包含SUBSCRIBER权限的Token。这里使用PUBLISHER是为了兼容性。
  const token = RtcTokenBuilder.buildTokenWithUid(
    appId,
    appCertificate,
    channelName,
    uid,
    role,
    privilegeExpiredTs
  );
  return token;
}

module.exports = { generateRtcToken };

步骤二:使用Agora Node.js SDK加入频道并处理音频 这是核心部分。Agora的服务端SDK允许我们创建一个“客户端”实例,加入频道,并订阅远程用户的音频流。

// services/agoraService.js
const AgoraRTC = require('agora-rtc-sdk-ng');
const { generateRtcToken } = require('../utils/agoraToken');

class AgoraService {
  constructor(channelName, uid = 100000) {
    this.channelName = channelName;
    this.uid = uid;
    this.client = null;
    this.audioStream = null;
    this.onAudioFrame = null; // 回调函数,用于接收音频帧
  }

  async joinChannel() {
    try {
      // 创建客户端实例,模式为直播模式(live)
      this.client = AgoraRTC.createClient({ mode: 'live', codec: 'vp8' });

      // 监听远端用户发布流的事件
      this.client.on('user-published', async (user, mediaType) => {
        if (mediaType === 'audio') {
          // 订阅远端用户的音频流
          await this.client.subscribe(user, mediaType);
          console.log(`成功订阅用户 ${user.uid} 的音频`);

          // 获取远端音频轨道
          const remoteAudioTrack = user.audioTrack;
          // 这里的关键:我们需要获取原始的音频数据帧
          // Agora SDK可能不直接提供获取PCM数据的回调。一种常见做法是:
          // 1. 通过SDK将音频播放到一个虚拟的AudioContext中(在Node.js环境需要模拟)
          // 2. 或者,更直接的方法是使用Agora的“云端录制”或“扩展服务”REST API来获取音频流。
          // 由于Node.js环境没有Web Audio API,直接通过客户端SDK获取原始音频帧比较复杂。

          // 重要提示:在Node.js服务端环境中,使用Agora的客户端SDK来获取原始音频数据流并非标准做法,且可能受限。
          // 更可靠、官方的方案是使用 Agora 的“云端录制”REST API 或 “媒体流加速”扩展服务。
          // 这些服务可以将频道内的音视频流推送到你指定的服务器地址(RTMP/RTMPS)。
          // 然后,你可以使用FFmpeg等工具拉取这个RTMP流,解码得到PCM数据。
        }
      });

      const token = generateRtcToken(this.channelName, this.uid);
      await this.client.join(
        process.env.AGORA_APP_ID,
        this.channelName,
        token,
        this.uid
      );
      console.log(`服务端加入频道 ${this.channelName} 成功,UID: ${this.uid}`);
    } catch (error) {
      console.error('加入Agora频道失败:', error);
      throw error;
    }
  }

  async leaveChannel() {
    if (this.client) {
      await this.client.leave();
      this.client = null;
      console.log(`已离开频道 ${this.channelName}`);
    }
  }
}

module.exports = AgoraService;

实操心得: 上面代码揭示了一个关键挑战:在Node.js服务端,通过Agora的 客户端SDK 直接获取原始音频帧(PCM数据)非常困难,因为SDK设计初衷是用于浏览器环境,依赖WebRTC和Web Audio API。因此,对于生产环境,我强烈推荐使用 Agora云端录制 方案。你可以通过Agora的RESTful API,启动一个云端录制任务,将指定频道的音频录制下来,并 实时推送(RTMP) 到你自己的服务器。你的转录机器人服务则运行一个RTMP拉流客户端(如使用 node-media-server ffmpeg 的Node.js封装),拉取音频流并解码为PCM。这是更稳定、官方支持的获取高音质音频流的方式。

3.3 集成AssemblyAI实时转录API

假设我们已经通过某种方式(如云端录制RTMP拉流)获得了连续的PCM音频数据。接下来,我们需要将这些数据流式地发送给AssemblyAI。

步骤一:建立并管理AssemblyAI Realtime WebSocket连接 AssemblyAI的Realtime API使用WebSocket协议。我们需要建立一个连接,并在连接上发送音频数据和接收转录结果。

// services/assemblyAIService.js
const WebSocket = require('ws');
const { Readable } = require('stream');

class AssemblyAIRealtimeService {
  constructor(apiKey, sampleRate = 16000) {
    this.apiKey = apiKey;
    this.sampleRate = sampleRate;
    this.socket = null;
    this.sessionId = null;
    this.isConnected = false;
    this.onTranscript = null; // 回调函数,用于接收转录结果
    this.onError = null;
    this.onClose = null;
  }

  connect() {
    return new Promise((resolve, reject) => {
      // AssemblyAI Realtime API的WebSocket端点
      const wsUrl = 'wss://api.assemblyai.com/v2/realtime/ws?sample_rate=16000';
      this.socket = new WebSocket(wsUrl, {
        headers: {
          'Authorization': this.apiKey,
        },
      });

      this.socket.on('open', () => {
        console.log('已连接到AssemblyAI Realtime API');
        this.isConnected = true;
        resolve();
      });

      this.socket.on('message', (data) => {
        try {
          const message = JSON.parse(data.toString());
          this.handleMessage(message);
        } catch (error) {
          console.error('解析AssemblyAI消息失败:', error);
        }
      });

      this.socket.on('error', (error) => {
        console.error('AssemblyAI WebSocket错误:', error);
        this.isConnected = false;
        if (this.onError) this.onError(error);
        reject(error);
      });

      this.socket.on('close', (code, reason) => {
        console.log(`AssemblyAI连接关闭,代码: ${code}, 原因: ${reason}`);
        this.isConnected = false;
        this.sessionId = null;
        if (this.onClose) this.onClose(code, reason);
      });
    });
  }

  handleMessage(message) {
    // 处理不同类型的返回消息
    switch (message.message_type) {
      case 'SessionBegins':
        this.sessionId = message.session_id;
        console.log(`会话开始,ID: ${this.sessionId}`);
        break;
      case 'PartialTranscript':
        // 临时性转录结果,文本可能还会变化
        if (this.onTranscript) {
          this.onTranscript({
            type: 'partial',
            text: message.text,
            audioStart: message.audio_start,
            audioEnd: message.audio_end,
            words: message.words, // 包含每个词的时间戳
          });
        }
        break;
      case 'FinalTranscript':
        // 最终确认的转录结果
        if (this.onTranscript) {
          this.onTranscript({
            type: 'final',
            text: message.text,
            audioStart: message.audio_start,
            audioEnd: message.audio_end,
            words: message.words,
            speaker: message.speaker, // 说话人标识(如果启用了说话人分离)
          });
        }
        break;
      case 'Error':
        console.error('AssemblyAI返回错误:', message.error);
        break;
      default:
        // 忽略其他消息类型,如`UtteranceEnd`等
        break;
    }
  }

  // 发送音频数据分片。audioData应该是一个包含PCM数据的Buffer,采样率16kHz,单声道,16位有符号整数。
  sendAudio(audioBuffer) {
    if (!this.isConnected || !this.socket) {
      console.warn('WebSocket未连接,无法发送音频');
      return false;
    }
    // AssemblyAI要求音频数据以Base64编码的字符串形式发送
    const audioBase64 = audioBuffer.toString('base64');
    const message = {
      audio_data: audioBase64,
    };
    this.socket.send(JSON.stringify(message));
    return true;
  }

  // 发送一个标记,表示音频流结束
  sendTerminate() {
    if (this.isConnected && this.socket) {
      const message = { terminate_session: true };
      this.socket.send(JSON.stringify(message));
    }
  }

  disconnect() {
    if (this.socket) {
      this.socket.close();
      this.socket = null;
    }
    this.isConnected = false;
  }
}

module.exports = AssemblyAIRealtimeService;

步骤二:音频格式处理与流式发送 从Agora云端录制RTMP流拉取到的音频,很可能不是AssemblyAI要求的格式(16kHz, 单声道, 16-bit PCM)。我们需要使用 ffmpeg 进行实时转码。

// utils/audioProcessor.js
const { spawn } = require('child_process');
const { Readable } = require('stream');

class AudioProcessor {
  /**
   * 创建一个FFmpeg进程,将输入音频流实时转码为AssemblyAI要求的格式。
   * @param {Readable} inputStream - 输入的音频流(例如来自RTMP拉流)
   * @returns {Readable} - 输出流(PCM数据流)
   */
  static createTranscodeStream(inputStream) {
    // FFmpeg命令参数:
    // -i pipe:0 从标准输入读取
    // -ar 16000 重采样到16kHz
    // -ac 1 转换为单声道
    // -f s16le 输出格式为有符号16位小端PCM
    // pipe:1 输出到标准输出
    const ffmpegArgs = [
      '-i', 'pipe:0',
      '-ar', '16000', // 采样率
      '-ac', '1',     // 声道数
      '-f', 's16le',  // 输出格式
      'pipe:1'
    ];

    const ffmpegProcess = spawn('ffmpeg', ffmpegArgs);

    // 将输入流导入FFmpeg的标准输入
    inputStream.pipe(ffmpegProcess.stdin);

    // 处理错误
    ffmpegProcess.stderr.on('data', (data) => {
      // 可以记录日志,但通常不需要处理,除非出错
      // console.error('FFmpeg stderr:', data.toString());
    });

    ffmpegProcess.on('error', (err) => {
      console.error('启动FFmpeg进程失败:', err);
    });

    // FFmpeg的标准输出就是转码后的PCM流
    return ffmpegProcess.stdout;
  }

  /**
   * 将PCM流按固定时长分片。
   * @param {Readable} pcmStream - PCM数据流
   * @param {number} chunkDurationMs - 分片时长,毫秒(如200)
   * @returns {AsyncGenerator<Buffer>} - 返回一个异步生成器,每次yield一个音频分片Buffer
   */
  static async *chunkPCMStream(pcmStream, chunkDurationMs = 200) {
    // 计算每个分片的大小(字节)
    // PCM格式:16位 = 2字节,单声道,采样率16000。
    // 每秒字节数 = 采样率 * 样本大小(字节) * 声道数 = 16000 * 2 * 1 = 32000 字节/秒
    const bytesPerSecond = 16000 * 2; // 32000
    const chunkSize = Math.floor(bytesPerSecond * (chunkDurationMs / 1000));

    let buffer = Buffer.alloc(0);

    for await (const chunk of pcmStream) {
      buffer = Buffer.concat([buffer, chunk]);

      while (buffer.length >= chunkSize) {
        const slice = buffer.slice(0, chunkSize);
        buffer = buffer.slice(chunkSize);
        yield slice;
      }
    }

    // 发送剩余不足一个分片的数据(如果有)
    if (buffer.length > 0) {
      yield buffer;
    }
  }
}

module.exports = AudioProcessor;

3.4 核心协调服务:串联音频流与转录

现在我们需要一个“大脑”来协调Agora音频流拉取、转码分片、AssemblyAI连接和结果分发。

// services/transcriptionBot.js
const AgoraService = require('./agoraService'); // 假设这是改造后能提供音频流的服务
const AssemblyAIRealtimeService = require('./assemblyAIService');
const AudioProcessor = require('../utils/audioProcessor');
const WebSocket = require('ws');

class TranscriptionBot {
  constructor(channelName, resultWebSocketServer) {
    this.channelName = channelName;
    this.agoraService = null;
    this.assemblyAIService = null;
    this.isRunning = false;
    this.resultWSS = resultWebSocketServer; // 用于向前端广播结果的WebSocket服务器实例
  }

  async start() {
    if (this.isRunning) {
      console.log(`转录机器人已在运行中: ${this.channelName}`);
      return;
    }
    console.log(`启动转录机器人,频道: ${this.channelName}`);

    try {
      // 1. 初始化AssemblyAI服务
      this.assemblyAIService = new AssemblyAIRealtimeService(process.env.ASSEMBLYAI_API_KEY);
      this.assemblyAIService.onTranscript = this.handleTranscript.bind(this);
      this.assemblyAIService.onError = this.handleAssemblyAIError.bind(this);
      this.assemblyAIService.onClose = this.handleAssemblyAIClose.bind(this);

      // 2. 连接AssemblyAI
      await this.assemblyAIService.connect();

      // 3. 初始化并加入Agora频道(这里假设AgoraService已改造为能提供音频流Readable)
      this.agoraService = new AgoraService(this.channelName);
      // 假设 agoraService.getAudioStream() 返回一个Readable流(例如来自RTMP拉流后的PCM流)
      const rawAudioStream = await this.agoraService.joinAndGetStream();

      // 4. 音频处理流水线
      const pcmStream = AudioProcessor.createTranscodeStream(rawAudioStream);

      // 5. 流式分片并发送
      this.isRunning = true;
      this.streamAudioToAssemblyAI(pcmStream);

      console.log(`转录机器人启动成功: ${this.channelName}`);
    } catch (error) {
      console.error(`启动转录机器人失败: ${this.channelName}`, error);
      this.stop();
      throw error;
    }
  }

  async streamAudioToAssemblyAI(pcmStream) {
    // 使用异步生成器按分片发送
    for await (const audioChunk of AudioProcessor.chunkPCMStream(pcmStream, 200)) {
      if (!this.isRunning || !this.assemblyAIService?.isConnected) {
        break;
      }
      this.assemblyAIService.sendAudio(audioChunk);
      // 可以在这里加入简单的流量控制,避免发送过快
      await new Promise(resolve => setTimeout(resolve, 50)); // 模拟实时性,约200ms发送一次
    }
    // 音频流结束,发送终止信号
    if (this.assemblyAIService?.isConnected) {
      this.assemblyAIService.sendTerminate();
    }
  }

  handleTranscript(transcript) {
    console.log(`[${transcript.type}] ${transcript.text}`);
    
    // 将结果广播给所有连接的WebSocket客户端(例如前端页面)
    const resultToSend = {
      channel: this.channelName,
      timestamp: Date.now(),
      ...transcript
    };

    if (this.resultWSS) {
      this.resultWSS.clients.forEach(client => {
        if (client.readyState === WebSocket.OPEN) {
          client.send(JSON.stringify(resultToSend));
        }
      });
    }

    // 也可以同时存储到数据库或发送到消息队列
    // this.saveToDatabase(resultToSend);
  }

  handleAssemblyAIError(error) {
    console.error(`AssemblyAI服务错误:`, error);
    // 可以实现重连逻辑
  }

  handleAssemblyAIClose(code, reason) {
    console.log(`AssemblyAI连接关闭,尝试重启...`);
    // 简单的重连逻辑,生产环境需要更健壮
    setTimeout(() => {
      if (this.isRunning) {
        this.assemblyAIService.connect().catch(err => console.error('重连失败:', err));
      }
    }, 3000);
  }

  async stop() {
    this.isRunning = false;
    if (this.assemblyAIService) {
      this.assemblyAIService.disconnect();
      this.assemblyAIService = null;
    }
    if (this.agoraService) {
      await this.agoraService.leaveChannel();
      this.agoraService = null;
    }
    console.log(`转录机器人已停止: ${this.channelName}`);
  }
}

module.exports = TranscriptionBot;

3.5 构建一个简单的WebSocket结果服务器与前端示例

为了让前端能实时看到转录结果,我们需要一个WebSocket服务器。

// server.js
const express = require('express');
const WebSocket = require('ws');
const TranscriptionBotManager = require('./services/transcriptionBotManager'); // 一个管理多个机器人的管理器

const app = express();
const PORT = process.env.SERVER_PORT || 3000;

// 创建HTTP服务器,用于Express和WebSocket共享端口
const server = app.listen(PORT, () => {
  console.log(`转录机器人服务运行在 http://localhost:${PORT}`);
});

// 创建WebSocket服务器,附着到同一个HTTP服务器上
const wss = new WebSocket.Server({ server });

// 存储所有连接的客户端
const clients = new Set();
wss.on('connection', (ws) => {
  console.log('新的WebSocket客户端连接');
  clients.add(ws);
  ws.on('close', () => {
    console.log('WebSocket客户端断开连接');
    clients.delete(ws);
  });
  ws.on('error', (error) => {
    console.error('WebSocket客户端错误:', error);
  });
});

// 初始化机器人管理器,并传入WebSocket服务器用于广播
const botManager = new TranscriptionBotManager(wss);

// 简单的REST API接口,用于控制机器人(例如启动/停止某个频道的转录)
app.use(express.json());
app.post('/api/start', async (req, res) => {
  const { channelName } = req.body;
  if (!channelName) {
    return res.status(400).json({ error: '缺少 channelName 参数' });
  }
  try {
    await botManager.startBot(channelName);
    res.json({ success: true, message: `已启动频道 ${channelName} 的转录` });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.post('/api/stop', async (req, res) => {
  const { channelName } = req.body;
  if (!channelName) {
    return res.status(400).json({ error: '缺少 channelName 参数' });
  }
  try {
    await botManager.stopBot(channelName);
    res.json({ success: true, message: `已停止频道 ${channelName} 的转录` });
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

// 健康检查
app.get('/health', (req, res) => {
  res.json({ status: 'ok', timestamp: new Date().toISOString() });
});

一个简单的前端页面,用于连接WebSocket并显示实时字幕:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Agora转录实时字幕</title>
    <style>
        body { font-family: sans-serif; padding: 20px; }
        #transcript-container {
            border: 1px solid #ccc;
            padding: 20px;
            height: 400px;
            overflow-y: auto;
            margin-bottom: 20px;
        }
        .message { margin-bottom: 10px; padding: 8px; border-radius: 4px; }
        .partial { background-color: #e0f7fa; border-left: 4px solid #00bcd4; }
        .final { background-color: #f1f8e9; border-left: 4px solid #8bc34a; }
        .speaker { font-weight: bold; color: #555; }
    </style>
</head>
<body>
    <h1>实时转录字幕</h1>
    <div>频道: <span id="channel-name">--</span></div>
    <div id="transcript-container"></div>
    <div>
        <button onclick="connectWS()">连接</button>
        <button onclick="disconnectWS()">断开</button>
    </div>

    <script>
        let ws = null;
        const transcriptContainer = document.getElementById('transcript-container');
        const channelNameSpan = document.getElementById('channel-name');

        function connectWS() {
            // 假设你的机器人服务运行在 localhost:3000
            const wsUrl = `ws://localhost:3000`;
            ws = new WebSocket(wsUrl);

            ws.onopen = () => {
                console.log('WebSocket连接已打开');
                addMessage('系统', '已连接到转录服务', 'system');
            };

            ws.onmessage = (event) => {
                const data = JSON.parse(event.data);
                console.log('收到转录:', data);
                channelNameSpan.textContent = data.channel;
                const speaker = data.speaker ? `发言人 ${data.speaker}: ` : '';
                addMessage(speaker, data.text, data.type);
            };

            ws.onerror = (error) => {
                console.error('WebSocket错误:', error);
                addMessage('系统', '连接出错', 'system-error');
            };

            ws.onclose = () => {
                console.log('WebSocket连接已关闭');
                addMessage('系统', '连接已断开', 'system');
            };
        }

        function disconnectWS() {
            if (ws) {
                ws.close();
                ws = null;
            }
        }

        function addMessage(speaker, text, type) {
            const msgDiv = document.createElement('div');
            msgDiv.className = `message ${type}`;
            msgDiv.innerHTML = `<span class="speaker">${speaker}</span> ${text}`;
            transcriptContainer.appendChild(msgDiv);
            // 自动滚动到底部
            transcriptContainer.scrollTop = transcriptContainer.scrollHeight;
        }

        // 页面加载后自动连接(可选)
        // window.onload = connectWS;
    </script>
</body>
</html>

4. 部署、优化与故障排查实录

4.1 服务器部署与性能考量

这个转录机器人服务对CPU和网络有一定要求,尤其是在处理多个频道并发时。

  1. 服务器规格 :对于单频道,1核2GB内存的云服务器可能勉强够用。但FFmpeg转码是CPU密集型操作,多频道并发时,CPU会成为瓶颈。建议从2核4GB起步,并根据实际负载监控进行升级。使用 top htop 命令监控CPU使用率,确保在音频处理高峰期不超过80%。
  2. 网络带宽 :主要考虑两个方向的流量:
    • 入向 :从Agora云端录制RTMP拉流的带宽。一个单声道、16kHz的PCM流带宽很低(约256 kbps),但原始RTMP流可能包含更高质量的音频,带宽会更高。
    • 出向 :向AssemblyAI发送Base64编码的音频数据(数据量约为原始PCM的4/3),以及接收返回的JSON文本。流量不大。
    • 建议 :选择网络质量好、到Agora和AssemblyAI服务器延迟低的云服务商区域(如北美、欧洲、新加坡等)。可以使用 ping traceroute 测试网络质量。
  3. 进程管理 :使用 PM2 Docker 管理Node.js进程,确保服务崩溃后能自动重启。PM2的配置文件 ecosystem.config.js 可以这样写:
    module.exports = {
      apps: [{
        name: 'transcription-bot',
        script: './server.js',
        instances: 1, // 根据CPU核心数调整
        exec_mode: 'fork', // 或者'cluster'模式利用多核
        env: {
          NODE_ENV: 'production',
        },
        max_memory_restart: '500M', // 内存超过500M重启
        error_file: './logs/err.log',
        out_file: './logs/out.log',
        log_file: './logs/combined.log',
        time: true,
      }]
    };
    
  4. 日志与监控 :务必记录详细的日志,包括机器人的启动/停止、AssemblyAI连接状态、音频发送频率、错误信息等。可以使用 winston pino 等日志库。同时,监控关键指标:各频道转录延迟(从音频发生到收到文字的时间)、服务CPU/内存使用率、WebSocket连接数、AssemblyAI API调用错误率。

4.2 延迟优化实战技巧

实时转录的延迟是核心体验指标。目标是将端到端延迟控制在2秒以内。

  1. 音频分片大小 AudioProcessor.chunkPCMStream 函数中的 chunkDurationMs 参数是关键。太小(如50ms)会导致WebSocket通信开销增大,增加网络往返延迟;太大(如500ms)会导致AssemblyAI处理延迟增加,因为模型需要积累更多音频才能做出准确判断。 经过实测,200ms是一个比较好的平衡点 。AssemblyAI官方也推荐200-500ms。
  2. 网络链路优化
    • 服务器位置 :你的转录机器人服务器、Agora云端录制服务器、AssemblyAI服务器三者之间的网络延迟应尽可能低。如果主要用户在中国,可以考虑使用Agora在中国大陆的数据中心,并将机器人部署在离该数据中心近的海外区域(如新加坡),同时该区域到AssemblyAI(通常在北美)的网络也要好。
    • WebSocket连接复用 :为每个音频源建立独立的AssemblyAI连接是必要的,但连接建立本身有开销。可以考虑连接池或长连接保活机制,避免频繁重建连接。
  3. AssemblyAI参数调优
    • 在建立WebSocket连接时,可以传递 word_boost 参数来提升特定词汇(如产品名、专业术语)的识别准确率,减少因识别错误导致的反复修正延迟。
    • 启用 endpointing (端点检测)可以让模型更智能地判断一句话何时结束,从而更快地返回 FinalTranscript 。但过于敏感的端点检测可能导致长句被切分。
  4. 前端渲染优化 :前端收到 PartialTranscript 时立即显示,收到 FinalTranscript 时更新替换。这种“流式”显示能极大提升用户感知的实时性,即使最终确认稍有延迟,用户也能看到实时识别的过程。

4.3 常见问题与排查清单

在实际部署和运行中,我遇到了不少问题,这里整理成一个排查清单:

问题现象 可能原因 排查步骤与解决方案
无法连接到Agora频道 1. Token无效或过期。
2. App ID或Certificate错误。
3. 频道名包含非法字符。
4. 网络防火墙阻止。
1. 检查Token生成逻辑和时间戳。确保服务端时间准确(使用NTP)。
2. 核对控制台的App ID和Certificate。
3. 频道名只允许数字、字母和下划线。
4. 在服务器上使用 telnet nc 测试Agora服务端地址的特定端口(如TCP 443)是否可达。
能连接Agora,但收不到音频 1. 服务端加入的UID与频道内已有用户冲突。
2. 没有用户发布音频流。
3. 使用了错误的云端录制配置。
1. 使用一个频道内唯一的、高数字的UID(如100000+)。
2. 确认有真实用户客户端在频道内并开启了麦克风。
3. 如果使用云端录制,确保录制模式配置正确(如合流模式),并且RTMP地址配置无误,服务器能成功拉流。
AssemblyAI WebSocket连接失败 1. API Key无效或额度不足。
2. 服务器网络无法访问 api.assemblyai.com
3. 采样率参数不匹配。
1. 在AssemblyAI控制台检查API Key状态和用量。
2. 在服务器上 curl -v https://api.assemblyai.com/v2/realtime/ws ,看是否能建立WebSocket握手。
3. 确保连接URL中的 sample_rate 参数与发送的音频数据采样率一致(通常为16000)。
AssemblyAI连接成功,但不返回转录结果 1. 发送的音频数据格式错误。
2. 音频数据发送过快或过慢。
3. 音频数据是静音或音量过低。
1. 最关键的一步 :验证音频格式。确保是16kHz、单声道、16位有符号整数PCM。可以先将一段已知内容的WAV文件(符合格式)通过脚本发送测试。
2. 控制发送节奏,模拟实时。每发送一个200ms的分片后,等待约200ms再发下一个。
3. 检查音频源是否有有效声音。可以在发送前计算音频分片的RMS(均方根)值,过滤掉静音帧。
转录结果延迟非常高(>5秒) 1. 网络延迟高。
2. 音频分片太大。
3. 服务器CPU负载高,处理不过来。
1. 使用 ping mtr 诊断到AssemblyAI服务器的网络延迟和丢包。
2. 将分片大小从500ms调整为200ms试试。
3. 监控服务器CPU,升级配置或优化代码(如FFmpeg参数调优,使用更高效的Buffer处理)。
转录准确率低 1. 音频质量差(有噪音、回声)。
2. 包含大量专业术语或口音。
3. 多人同时说话。
1. 建议Agora客户端开启音频降噪、回声消除(AEC)等预处理。
2. 使用AssemblyAI的 word_boost 参数,提供术语列表。
3. 如果可能,为每个用户建立独立的音频流和转录连接。如果只能混合音频,确保在连接AssemblyAI时启用 speaker_labels: true
服务运行一段时间后内存泄漏 1. 未正确关闭WebSocket连接或Agora客户端。
2. 未清理事件监听器。
3. Buffer或大对象未及时释放。
1. 在 stop() 方法中确保调用 disconnect() leaveChannel()
2. 使用Node.js的 --inspect flag和Chrome DevTools进行内存堆快照分析。
3. 确保 for await...of 循环中的临时变量能被正常垃圾回收。对于长时间运行的服务,定期重启(如通过PM2)也是一个简单有效的策略。
前端收不到WebSocket消息 1. WebSocket服务器未正确广播。
2. 前端WebSocket URL错误。
3. 浏览器跨域问题(如果前端与服务器不同域)。
1. 在机器人服务的 handleTranscript 方法中打印日志,确认是否执行到广播代码。
2. 检查前端连接的WebSocket URL端口和路径是否正确。
3. 如果跨域,需要在WebSocket服务器端设置正确的 Origin 验证,或者使用Nginx反向代理将前后端统一到同域。

4.4 成本控制与扩展建议

  1. 成本监控
    • Agora :费用主要来自语音时长(服务端加入频道也会产生时长)和云端录制服务(如果使用)。在控制台设置用量告警。
    • AssemblyAI :按音频处理时长计费。Universal-3 Pro是高级模型,费率可能比标准模型高。务必在代码中做好异常处理,避免在无音频时持续发送数据产生费用。可以在发送前进行静音检测。
  2. 扩展性设计
    • 多频道支持 :上述架构中的 TranscriptionBotManager 就是为管理多个机器人实例设计的。你需要一个机制(如数据库或配置中心)来动态管理需要转录的频道列表。
    • 水平扩展 :当单个服务器无法承载更多频道时,可以考虑水平扩展。引入一个消息队列(如Redis或RabbitMQ),将“需要启动转录”的任务发布到队列,由多个工人(Worker)服务消费并执行。工人服务是无状态的,可以方便地扩容。
    • 结果聚合 :如果多个工人服务处理同一个频道的不同用户流,可能需要一个聚合服务将不同说话人的文字按时间线合并,再推送给前端。
  3. 功能增强
    • 实时翻译 :在收到AssemblyAI的转录文本后,可以立即调用翻译API(如Google Cloud Translation, DeepL),实现同声传译效果。
    • 敏感词过滤 :对转录文本进行实时敏感词匹配,触发告警或自动屏蔽。
    • 摘要生成 :定期(如每5分钟)将一段时间内的 FinalTranscript 发送给AssemblyAI的摘要API,生成阶段性会议摘要。

这个项目从构想到实现,涉及了实时音视频、网络编程、音频处理、AI服务集成等多个领域。最大的挑战不在于调用某个API,而在于如何将不同服务稳定、高效、低延迟地串联起来,并处理好各种边界情况和异常。希望这份详细的拆解,能帮你避开我踩过的那些坑,顺利搭建起自己的实时语音转录系统。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐