基于Agora与AssemblyAI构建实时语音转录机器人的架构与实现
1. 项目概述:当实时语音转录遇上实时音视频
最近在做一个需要实时记录会议讨论内容的项目,客户要求不仅要把语音转成文字,还得能实时显示在屏幕上,方便远程参会的同事跟上节奏。这让我想起了两个老朋友: Agora 和 AssemblyAI 。前者是实时音视频(RTC)领域的标杆,后者在语音AI,特别是转录和实时流处理上表现惊艳。把它们俩结合起来,做一个“实时转录机器人”(Transcription Bot),听起来是个挺酷的活儿。
简单来说,这个项目就是搭建一个服务,它能“旁听”Agora频道里的所有语音对话,然后通过AssemblyAI的 Universal-3 Pro 模型,把这些语音实时、高精度地转换成文字,再通过某种方式(比如WebSocket推送到前端,或者存入数据库)呈现出来。想象一下,在线教育场景里,老师的讲解实时变成字幕;或者在企业会议中,自动生成带发言人的会议纪要——这就是它的核心价值。
Universal-3 Pro是AssemblyAI最新的旗舰级语音识别模型,支持多语言、具备强大的抗噪和说话人分离(Speaker Diarization)能力,对于多人会议场景简直是神器。而Agora则提供了稳定、低延迟的音频流接入能力。这个项目的难点不在于调用单个API,而在于如何让这两个云端服务“流畅对话”,处理好实时音频流的接收、转发、状态管理和错误恢复。
2. 核心架构设计与技术选型
2.1 为什么是“Bot”架构?
首先得明确,这个转录机器人(Bot)并不是运行在终端用户设备上的。让每个用户的客户端同时进行音频录制和上传转录,既不经济(消耗用户流量和算力),也难以管理(多路音频合并、去重、同步是噩梦)。因此, 服务端Bot架构 是更合理的选择。
它的工作流程是这样的:
- Bot作为“隐身用户”加入频道 :我们的服务端程序,使用Agora的SDK,以一个“机器人”虚拟用户的身份,加入到指定的Agora音视频频道。
- 订阅并接收音频流 :Bot订阅频道内所有(或指定)用户的音频流。Agora服务端会将混合后的音频流(或分轨的音频流,取决于配置)推送给Bot。
- 流转发至AssemblyAI :Bot将收到的原始音频数据(通常是PCM格式),按照AssemblyAI实时流API的要求进行封装(如转为合适的编码、分块),并通过WebSocket建立持久连接,持续发送。
- 接收并处理转录结果 :AssemblyAI处理音频流,实时返回转录的文本片段(包括中间结果和最终结果)、说话人标签、时间戳等信息。Bot需要接收这些结果,进行必要的后处理(如合并句子、管理说话人切换)。
- 结果分发 :处理好的转录文本,通过WebSocket、Server-Sent Events (SSE) 或写入消息队列(如Redis Pub/Sub),分发给前端应用或其他下游服务(如纪要生成服务、数据库)。
这种架构的优点是集中化、可控性强。一个Bot可以服务一个频道,转录质量稳定,且不会干扰真实用户的体验。
2.2 关键组件与技术栈拆解
要实现这个流程,我们需要几个核心组件:
-
Bot服务(核心逻辑层) :
- 语言 :Node.js (推荐) / Python / Go。考虑到实时I/O密集和生态,Node.js是很好的选择,其事件驱动模型天然适合处理WebSocket和流数据。Python在AI集成和快速原型上也有优势。
- 核心库 :
- Agora RTC SDK (Server-side) :用于以虚拟用户身份加入频道。Agora提供了专门的“录制SDK”或“云端录制”方案,但对于自定义转录Bot,我们更常用的是 RTM (Real-time Messaging) SDK 或 RESTful API 来获取频道内的音频流地址(Token),然后结合低层级的媒体流处理库。实际上,更直接的方式是使用Agora的 云端录制 服务获取音频流,或者使用支持服务端订阅的SDK(如某些版本的Node.js SDK)。
- AssemblyAI Realtime SDK :官方提供了WebSocket客户端库,极大简化了连接建立、音频发送和结果接收的流程。
- 音频处理库 :如
ffmpeg(通过命令行或fluent-ffmpeg库调用)或专门的音频处理库(如node-lame用于MP3编码)。因为从Agora接收的音频可能是特定的编码格式(如OPUS),而AssemblyAI实时API通常支持PCM、MP3等格式,可能需要转码或重采样。
-
前后端通信层 :
- 结果推送 :使用 Socket.IO 或纯 WebSocket (
ws库) 将转录结果实时推送到网页客户端。Socket.IO提供了房间、自动重连等便利功能。 - API接口 :使用 Express.js (Node.js) 或 FastAPI (Python) 提供RESTful API,用于创建转录任务、管理Bot生命周期(启动/停止)、查询历史记录等。
- 结果推送 :使用 Socket.IO 或纯 WebSocket (
-
状态与数据持久层 :
- 内存/缓存 :使用 Redis 存储活跃的转录会话状态、临时结果、用户-Bot映射关系。Redis的Pub/Sub功能也可用于内部微服务通信。
- 数据库 :使用 PostgreSQL 或 MongoDB 存储最终的转录文稿、会议元数据(频道ID、时间、参与者)等,以供回放和分析。
-
基础设施与部署 :
- 容器化 :使用 Docker 封装Bot服务,确保环境一致性。
- 编排与部署 :使用 Kubernetes 或简单的 Docker Compose 进行部署和管理。由于Bot是有状态服务(每个Bot关联一个频道),需要谨慎管理其生命周期。
- 网络 :确保服务器有良好的网络连接,低延迟地连接Agora和AssemblyAI的全球服务节点。可以考虑将Bot部署在离主要用户区域或Agora数据中心较近的云服务器上。
注意:关于音频流获取的深度解析 这是本项目最大的技术难点之一。Agora官方并不直接提供一个“拉取频道内所有音频”的简单API。常见方案有:
- 云端录制 :开通Agora云端录制,录制文件会存储在云端(如阿里云OSS)。但这不是真正的“实时”,有分钟级延迟。我们可以通过订阅录制事件,在文件生成后立即下载并转给AssemblyAI,适用于对实时性要求不高(如会后纪要生成)的场景。
- 服务端SDK订阅 :某些版本的Agora服务端SDK(需联系销售确认)支持以“监听者”角色加入频道,直接接收音频流。这是最理想的实时方案。
- 客户端转发 :让一个“影子”客户端(由服务端控制)加入频道,录制音频并上传到自己的服务器。这种方法不稳定,且违反大多数服务条款, 不推荐 。 在本博文中,我们将基于 方案2(服务端SDK订阅) 进行假设和设计,这是构建低延迟实时转录Bot的正统路径。如果你的Agora项目不支持此功能,方案1(云端录制+异步处理)是可行的备选。
3. 实操搭建:从零构建转录Bot
3.1 环境准备与依赖安装
假设我们选择Node.js作为主要开发语言。首先初始化项目并安装核心依赖。
mkdir agora-assemblyai-bot
cd agora-assemblyai-bot
npm init -y
安装必要的npm包:
npm install agora-access-token express socket.io redis dotenv
npm install assemblyai # AssemblyAI官方Node.js SDK,包含实时转录功能
# 注意:Agora的服务端Node.js SDK可能需要从官方渠道特定获取,这里假设包名为 `agora-rtc-sdk-server`
# npm install agora-rtc-sdk-server
对于音频处理,我们可能需要 ffmpeg 。在Ubuntu系统上:
sudo apt update
sudo apt install ffmpeg
在项目中,我们可以使用 fluent-ffmpeg 来编程式调用:
npm install fluent-ffmpeg
创建项目基础结构:
agora-assemblyai-bot/
├── .env # 环境变量
├── package.json
├── server.js # 主入口文件
├── bot/ # Bot核心逻辑
│ ├── AgoraClient.js # Agora客户端封装
│ ├── AssemblyAIClient.js # AssemblyAI客户端封装
│ └── TranscriptionSession.js # 转录会话管理
├── routes/ # API路由
│ └── api.js
├── utils/ # 工具函数
│ └── audioProcessor.js
└── public/ # 前端静态文件(可选)
3.2 核心模块实现:Agora客户端封装
bot/AgoraClient.js 负责连接Agora频道并获取音频流。这里我们模拟使用一个支持服务端订阅的SDK。
// bot/AgoraClient.js
const { EventEmitter } = require('events');
// 假设引入的是Agora服务端SDK
const AgoraSDK = require('agora-rtc-sdk-server'); // 示例,实际包名可能不同
class AgoraClient extends EventEmitter {
constructor(appId, appCertificate, channelName, uid) {
super();
this.appId = appId;
this.appCertificate = appCertificate;
this.channelName = channelName;
this.uid = uid || 0; // 服务端Bot的UID,通常用一个数字
this.client = null;
this.audioStream = null;
this.isJoined = false;
}
async join() {
try {
// 1. 创建客户端实例 (假设SDK提供createClient方法)
this.client = AgoraSDK.createClient({ mode: 'live', codec: 'vp8' }); // 音频场景下codec影响不大
// 2. 初始化客户端
await this.client.init(this.appId);
// 3. 加入频道
// 需要生成Token。生产环境应从安全的后端服务获取,此处为演示。
const token = this._generateToken(); // 实现一个Token生成函数
await this.client.join(this.appCertificate, this.channelName, this.uid, token);
this.isJoined = true;
console.log(`Bot joined channel: ${this.channelName} with UID: ${this.uid}`);
// 4. 订阅频道内所有远程音频流
this.client.on('stream-added', (evt) => {
const stream = evt.stream;
if (stream.getType() === 'audio') {
this.client.subscribe(stream, { audio: true, video: false }, (err) => {
if (!err) {
console.log('Subscribed to remote audio stream:', stream.getId());
this._setupAudioStream(stream);
}
});
}
});
// 5. 处理流移除等事件
this.client.on('stream-removed', (evt) => {
console.log('Remote audio stream removed:', evt.stream.getId());
// 处理说话人离开的逻辑
});
} catch (error) {
console.error('Failed to join Agora channel:', error);
throw error;
}
}
_setupAudioStream(remoteStream) {
// 这里是关键:获取原始的音频数据
// 具体方法取决于SDK,可能需要监听 'audio-frame' 事件或使用 `getAudioTrack` 等方法
// 假设SDK提供了 `on('audio-frame')` 事件
remoteStream.on('audio-frame', (frame) => {
// frame 可能包含 buffer, sampleRate, sampleBits, channelCount 等信息
// 将音频帧数据发射出去,供后续处理
this.emit('audioData', {
buffer: frame.buffer,
sampleRate: frame.sampleRate,
channelCount: frame.channelCount,
timestamp: Date.now()
});
});
this.audioStream = remoteStream;
}
_generateToken() {
// 简化版Token生成,生产环境请使用Agora官方Token服务器或安全的后端逻辑
const { RtcTokenBuilder, RtcRole } = require('agora-access-token');
const expirationTimeInSeconds = 3600; // 1小时
const currentTimestamp = Math.floor(Date.now() / 1000);
const privilegeExpiredTs = currentTimestamp + expirationTimeInSeconds;
return RtcTokenBuilder.buildTokenWithUid(
this.appId,
this.appCertificate,
this.channelName,
this.uid,
RtcRole.SUBSCRIBER, // Bot是订阅者角色
privilegeExpiredTs
);
}
async leave() {
if (this.client && this.isJoined) {
await this.client.leave();
this.isJoined = false;
console.log(`Bot left channel: ${this.channelName}`);
}
}
}
module.exports = AgoraClient;
实操心得:Token管理 上面的
_generateToken函数仅用于演示。在生产环境中, 绝对不要 将App Certificate硬编码在客户端或Bot服务端。正确做法是:建立一个独立的、安全的 Token服务器 (可以是一个简单的微服务)。当Bot需要加入频道时,向这个Token服务器请求一个临时Token。Token服务器使用App Certificate生成Token后返回给Bot。这样保证了核心密钥的安全。
3.3 核心模块实现:AssemblyAI实时转录客户端
bot/AssemblyAIClient.js 负责连接AssemblyAI的实时API,发送音频数据并接收转录结果。
// bot/AssemblyAIClient.js
const { AssemblyAI } = require('assemblyai');
const { EventEmitter } = require('events');
class AssemblyAIClient extends EventEmitter {
constructor(apiKey, config = {}) {
super();
this.client = new AssemblyAI({ apiKey });
this.realtimeSession = null;
this.config = {
sampleRate: 16000, // AssemblyAI实时API常用采样率
encoding: 'pcm_s16le', // 常用编码
language: 'en', // 默认英语,Universal-3 Pro支持多语言
...config
};
this.isConnected = false;
}
async connect() {
try {
// 使用SDK创建实时会话
this.realtimeSession = this.client.realtime.transcriber({
sample_rate: this.config.sampleRate,
encoding: this.config.encoding,
language_code: this.config.language,
// 启用说话人分离
speaker_diarization: true,
// 使用Universal-3 Pro模型
model: 'universal-3-pro',
// 可选:启用实时端点检测,优化句子切分
// endpointing: 500, // 静音500ms后判定句子结束
});
// 监听转录结果
this.realtimeSession.on('transcript', (transcript) => {
if (!transcript.text) return; // 忽略空结果
// 发射转录结果,包含文本、说话人、时间戳、是否为最终结果
this.emit('transcription', {
text: transcript.text,
speaker: transcript.speaker, // 说话人标签,如'A', 'B'
isFinal: transcript.type === 'final', // 'partial'为中间结果,'final'为最终结果
timestamp: new Date(),
confidence: transcript.confidence,
words: transcript.words // 词级时间戳(如果启用)
});
});
// 监听会话打开
this.realtimeSession.on('open', () => {
console.log('Connected to AssemblyAI Realtime API');
this.isConnected = true;
this.emit('connected');
});
// 监听错误和关闭
this.realtimeSession.on('error', (error) => {
console.error('AssemblyAI Realtime Error:', error);
this.emit('error', error);
});
this.realtimeSession.on('close', () => {
console.log('Disconnected from AssemblyAI Realtime API');
this.isConnected = false;
this.emit('disconnected');
});
// 建立连接
await this.realtimeSession.connect();
} catch (error) {
console.error('Failed to connect to AssemblyAI:', error);
throw error;
}
}
// 发送音频数据块
sendAudio(audioBuffer) {
if (this.realtimeSession && this.isConnected) {
// 确保音频数据格式正确(如采样率、位深、通道数)
// 这里可能需要根据Agora传来的数据做转换
this.realtimeSession.sendAudio(audioBuffer);
} else {
console.warn('Cannot send audio, AssemblyAI session not ready.');
}
}
async disconnect() {
if (this.realtimeSession) {
await this.realtimeSession.close();
this.realtimeSession = null;
this.isConnected = false;
}
}
}
module.exports = AssemblyAIClient;
3.4 音频处理桥梁:连接两个世界
从Agora接收的音频帧格式(可能是48kHz采样率、OPUS编码)很可能与AssemblyAI实时API要求的格式(如16kHz、PCM S16LE)不匹配。我们需要一个音频处理模块。
utils/audioProcessor.js 负责格式转换。这里我们使用 ffmpeg 进行重采样和转码,因为它能处理几乎所有格式。
// utils/audioProcessor.js
const { spawn } = require('child_process');
const { Writable } = require('stream');
class AudioProcessor {
/**
* 将输入的音频Buffer转换为目标格式
* @param {Buffer} inputBuffer - 原始音频数据
* @param {Object} inputFormat - { sampleRate: 48000, channels: 2, format: 's16le' }
* @param {Object} outputFormat - { sampleRate: 16000, channels: 1, format: 's16le' }
* @returns {Promise<Buffer>} - 转换后的音频Buffer
*/
static async convert(inputBuffer, inputFormat, outputFormat) {
return new Promise((resolve, reject) => {
const args = [
'-f', inputFormat.format, // 输入格式
'-ar', inputFormat.sampleRate.toString(), // 输入采样率
'-ac', inputFormat.channels.toString(), // 输入声道数
'-i', 'pipe:0', // 从标准输入读取
'-f', outputFormat.format, // 输出格式
'-ar', outputFormat.sampleRate.toString(), // 输出采样率
'-ac', outputFormat.channels.toString(), // 输出声道数
'-', // 输出到标准输出
];
const ffmpeg = spawn('ffmpeg', args);
const chunks = [];
ffmpeg.stdout.on('data', (chunk) => chunks.push(chunk));
ffmpeg.stdout.on('end', () => resolve(Buffer.concat(chunks)));
ffmpeg.stderr.on('data', (data) => {
// 可以记录ffmpeg日志,用于调试
// console.debug(`ffmpeg stderr: ${data}`);
});
ffmpeg.on('error', reject);
ffmpeg.on('close', (code) => {
if (code !== 0) {
reject(new Error(`ffmpeg process exited with code ${code}`));
}
});
ffmpeg.stdin.write(inputBuffer);
ffmpeg.stdin.end();
});
}
/**
* 创建一个持续的音频转换流
* 适用于实时音频流处理,避免为每个小帧频繁启动ffmpeg进程(性能差)
* 更高级的实现:使用 `ffmpeg-stream` 或 `node-fluent-ffmpeg` 创建持久化的转换流。
* 此处为简化示例,实际生产环境建议使用更高效的流式处理库。
*/
static createConversionStream(inputFormat, outputFormat) {
// 这是一个复杂但更高效的方法,需要管理ffmpeg子进程的生命周期和流式I/O。
// 由于篇幅,这里仅给出概念提示。
console.warn('createConversionStream not fully implemented. Consider using fluent-ffmpeg for robust streaming.');
// 返回一个Transform流,将输入Buffer转换为输出Buffer
}
}
module.exports = AudioProcessor;
3.5 会话管理:粘合一切的核心
bot/TranscriptionSession.js 是大脑,它实例化AgoraClient和AssemblyAIClient,管理音频数据的流动和转录结果的分发。
// bot/TranscriptionSession.js
const AgoraClient = require('./AgoraClient');
const AssemblyAIClient = require('./AssemblyAIClient');
const AudioProcessor = require('../utils/audioProcessor');
const EventEmitter = require('events');
class TranscriptionSession extends EventEmitter {
constructor(channelName, sessionId, io) {
super();
this.channelName = channelName;
this.sessionId = sessionId; // 唯一会话ID
this.io = io; // Socket.IO实例,用于广播结果
this.agoraClient = null;
this.assemblyAIClient = null;
this.isActive = false;
this.transcriptionBuffer = []; // 用于临时存储和合并句子
}
async start(agoraConfig, assemblyAIConfig) {
if (this.isActive) {
console.warn(`Session ${this.sessionId} is already active.`);
return;
}
try {
// 1. 初始化并连接Agora客户端
this.agoraClient = new AgoraClient(
agoraConfig.appId,
agoraConfig.appCertificate,
this.channelName,
agoraConfig.uid
);
// 监听Agora的音频数据
this.agoraClient.on('audioData', async (audioFrame) => {
if (!this.assemblyAIClient?.isConnected) return;
// 2. 音频格式转换 (示例:Agora OPUS 48kHz -> AssemblyAI PCM 16kHz)
// 注意:需要根据实际从Agora SDK获取的音频帧格式调整参数
const inputFormat = {
sampleRate: audioFrame.sampleRate || 48000,
channels: audioFrame.channelCount || 1,
format: 's16le' // 假设Agora SDK已解码为PCM
};
const outputFormat = {
sampleRate: 16000,
channels: 1, // AssemblyAI实时API通常推荐单声道
format: 's16le'
};
try {
const convertedBuffer = await AudioProcessor.convert(
audioFrame.buffer,
inputFormat,
outputFormat
);
// 3. 发送转换后的音频到AssemblyAI
this.assemblyAIClient.sendAudio(convertedBuffer);
} catch (convError) {
console.error('Audio conversion failed:', convError);
}
});
await this.agoraClient.join();
// 4. 初始化并连接AssemblyAI客户端
this.assemblyAIClient = new AssemblyAIClient(assemblyAIConfig.apiKey, {
sampleRate: 16000,
language: assemblyAIConfig.language || 'en',
});
this.assemblyAIClient.on('transcription', (result) => {
// 5. 处理转录结果
this._handleTranscriptionResult(result);
});
this.assemblyAIClient.on('error', (error) => {
this.emit('error', error);
});
await this.assemblyAIClient.connect();
this.isActive = true;
console.log(`Transcription session started for channel: ${this.channelName}, ID: ${this.sessionId}`);
this.emit('started');
} catch (error) {
console.error(`Failed to start transcription session ${this.sessionId}:`, error);
await this.stop(); // 清理资源
throw error;
}
}
_handleTranscriptionResult(result) {
// 简单的结果后处理:合并中间结果,管理最终结果
if (result.isFinal) {
// 最终结果,可以存储到数据库
console.log(`[Final] Speaker ${result.speaker}: ${result.text}`);
// 广播给所有连接到这个会话的Web客户端
if (this.io) {
this.io.to(this.sessionId).emit('transcription_final', {
sessionId: this.sessionId,
speaker: result.speaker,
text: result.text,
timestamp: result.timestamp,
words: result.words
});
}
// 也可以存入数据库
// await this.saveToDatabase(result);
} else {
// 中间结果,可以实时显示在UI上,增加用户体验
// console.log(`[Partial] Speaker ${result.speaker}: ${result.text}`);
if (this.io) {
this.io.to(this.sessionId).emit('transcription_partial', {
sessionId: this.sessionId,
speaker: result.speaker,
text: result.text,
timestamp: result.timestamp
});
}
}
// 触发自定义事件,供其他服务监听
this.emit('result', result);
}
async stop() {
this.isActive = false;
if (this.assemblyAIClient) {
await this.assemblyAIClient.disconnect();
}
if (this.agoraClient) {
await this.agoraClient.leave();
}
console.log(`Transcription session stopped: ${this.sessionId}`);
this.emit('stopped');
}
getStatus() {
return {
sessionId: this.sessionId,
channelName: this.channelName,
isActive: this.isActive,
agoraJoined: this.agoraClient?.isJoined || false,
assemblyAIConnected: this.assemblyAIClient?.isConnected || false,
};
}
}
module.exports = TranscriptionSession;
3.6 主服务与API路由
server.js 和 routes/api.js 负责启动HTTP/Socket服务器,并提供管理API。
// server.js
require('dotenv').config();
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const Redis = require('redis');
const apiRoutes = require('./routes/api');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: {
origin: process.env.CLIENT_URL || '*', // 生产环境应严格限制
methods: ['GET', 'POST']
}
});
// 内存存储活跃会话(生产环境应用Redis持久化)
const activeSessions = new Map();
// 简单的会话管理器
const sessionManager = {
createSession: (channelName, sessionId) => {
// 从环境变量或配置中心获取密钥
const agoraConfig = {
appId: process.env.AGORA_APP_ID,
appCertificate: process.env.AGORA_APP_CERTIFICATE,
uid: Math.floor(Math.random() * 100000) // 生成一个随机UID
};
const assemblyAIConfig = {
apiKey: process.env.ASSEMBLYAI_API_KEY,
language: 'en' // 可从请求中动态获取
};
const session = new TranscriptionSession(channelName, sessionId, io);
activeSessions.set(sessionId, session);
session.on('error', (err) => {
console.error(`Session ${sessionId} error:`, err);
// 可以考虑错误恢复逻辑
});
session.on('stopped', () => {
activeSessions.delete(sessionId);
});
return session;
},
getSession: (sessionId) => activeSessions.get(sessionId),
stopSession: async (sessionId) => {
const session = activeSessions.get(sessionId);
if (session) {
await session.stop();
}
}
};
// 将管理器附加到app实例,方便路由访问
app.set('sessionManager', sessionManager);
app.use(express.json());
app.use('/api', apiRoutes);
// Socket.IO连接处理
io.on('connection', (socket) => {
console.log('New client connected:', socket.id);
socket.on('join_session', (data) => {
const { sessionId } = data;
if (sessionId) {
socket.join(sessionId);
console.log(`Client ${socket.id} joined session ${sessionId}`);
socket.emit('session_joined', { sessionId });
}
});
socket.on('disconnect', () => {
console.log('Client disconnected:', socket.id);
});
});
const PORT = process.env.PORT || 3000;
server.listen(PORT, () => {
console.log(`Transcription Bot Server running on port ${PORT}`);
});
// routes/api.js
const express = require('express');
const router = express.Router();
// POST /api/session/start - 启动一个新的转录会话
router.post('/session/start', async (req, res) => {
const { channelName, sessionId } = req.body;
const sessionManager = req.app.get('sessionManager');
if (!channelName || !sessionId) {
return res.status(400).json({ error: 'channelName and sessionId are required' });
}
if (sessionManager.getSession(sessionId)) {
return res.status(409).json({ error: 'Session already exists' });
}
try {
const session = sessionManager.createSession(channelName, sessionId);
await session.start(
{
appId: process.env.AGORA_APP_ID,
appCertificate: process.env.AGORA_APP_CERTIFICATE,
uid: Math.floor(Math.random() * 100000)
},
{
apiKey: process.env.ASSEMBLYAI_API_KEY,
language: req.body.language || 'en'
}
);
res.status(201).json({
message: 'Transcription session started',
sessionId,
channelName,
status: session.getStatus()
});
} catch (error) {
console.error('Failed to start session:', error);
res.status(500).json({ error: 'Failed to start transcription session', details: error.message });
}
});
// GET /api/session/:sessionId/status - 获取会话状态
router.get('/session/:sessionId/status', (req, res) => {
const session = req.app.get('sessionManager').getSession(req.params.sessionId);
if (!session) {
return res.status(404).json({ error: 'Session not found' });
}
res.json({ status: session.getStatus() });
});
// POST /api/session/:sessionId/stop - 停止转录会话
router.post('/session/:sessionId/stop', async (req, res) => {
const sessionManager = req.app.get('sessionManager');
try {
await sessionManager.stopSession(req.params.sessionId);
res.json({ message: 'Transcription session stopped' });
} catch (error) {
res.status(500).json({ error: 'Failed to stop session' });
}
});
module.exports = router;
4. 部署、优化与问题排查
4.1 部署到生产环境
-
环境变量配置 :创建
.env文件(但不要提交到Git),并在生产环境(如Docker或服务器环境变量)中设置。AGORA_APP_ID=your_agora_app_id AGORA_APP_CERTIFICATE=your_agora_app_certificate ASSEMBLYAI_API_KEY=your_assemblyai_api_key REDIS_URL=redis://redis:6379 PORT=3000 CLIENT_URL=https://your-frontend.com -
Docker化 :创建
Dockerfile和docker-compose.yml。# Dockerfile FROM node:18-alpine WORKDIR /app COPY package*.json ./ RUN npm ci --only=production RUN apk add --no-cache ffmpeg COPY . . EXPOSE 3000 CMD ["node", "server.js"]# docker-compose.yml version: '3.8' services: bot: build: . ports: - "3000:3000" environment: - NODE_ENV=production - AGORA_APP_ID=${AGORA_APP_ID} - AGORA_APP_CERTIFICATE=${AGORA_APP_CERTIFICATE} - ASSEMBLYAI_API_KEY=${ASSEMBLYAI_API_KEY} - REDIS_URL=redis://redis:6379 depends_on: - redis restart: unless-stopped redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data volumes: redis_data: -
使用PM2或Kubernetes管理进程 :对于高可用部署,使用PM2进行进程管理,或使用Kubernetes部署多个Bot实例,并通过负载均衡器分配频道。注意:每个Bot实例是有状态的(绑定特定频道),因此负载均衡策略需要是“会话亲和性”的。
4.2 性能优化与成本控制
-
音频处理优化 :
- 批处理音频帧 :不要每收到一个音频帧(可能只有几十毫秒)就调用一次
ffmpeg转换。可以缓冲几百毫秒的音频数据,然后批量转换发送,减少进程创建开销。 - 使用原生音频处理库 :对于固定的格式转换(如48kHz OPUS到16kHz PCM),可以考虑使用Node.js的
node-opus、speaker等原生绑定库,性能远高于启动外部ffmpeg进程。 - 调整音频参数 :AssemblyAI Universal-3 Pro在16kHz单声道下已有优秀表现。确保从Agora接收的音频经过 回声消除 和 噪声抑制 处理,能显著提升转录准确率。
- 批处理音频帧 :不要每收到一个音频帧(可能只有几十毫秒)就调用一次
-
AssemblyAI API 使用优化 :
- 合理使用
endpointing参数 :设置合适的静音检测阈值(如endpointing: 500),让模型能更准确地切分句子,减少“一句话说一半就输出”的中间结果,提升最终结果的连贯性。 - 选择性启用功能 :如果不需要词级时间戳,可以关闭
word_boost等高级功能以减少延迟和成本。 - 监控用量 :AssemblyAI按音频时长计费。实现会话自动超时关闭(如频道空闲30分钟后自动停止Bot),避免产生不必要的费用。
- 合理使用
-
Agora 端优化 :
- 使用合流模式 :如果Bot只需要整体的会议录音,可以在Agora云端录制服务中配置合流模式,让Agora服务器将多路音频混合成一路,Bot只需处理一个流,简化逻辑。
- 控制订阅流 :如果频道人数很多,但只有少数人发言,可以动态调整Bot订阅的音频流,而不是订阅所有人。
4.3 常见问题与排查技巧
问题1:转录延迟很高(超过5秒)。
- 排查 :
- 网络链路 :使用
ping和traceroute检查Bot服务器到Agora和AssemblyAI服务节点的延迟。考虑将Bot部署在离用户主要区域或云服务商更近的位置。 - 音频缓冲 :检查音频处理环节的缓冲大小。过大的缓冲会增加延迟。尝试将缓冲时间从500ms减少到200ms。
- AssemblyAI连接 :确保使用的是AssemblyAI的 实时流API ,而不是异步文件上传API。检查WebSocket连接是否稳定,有无频繁重连。
- Bot服务器负载 :检查服务器CPU和内存使用率。音频转码(尤其是
ffmpeg)是CPU密集型操作。
- 网络链路 :使用
问题2:转录准确率低,尤其在多人同时说话时。
- 排查与解决 :
- 确认模型 :确保在AssemblyAIClient配置中明确指定了
model: 'universal-3-pro',这是目前准确率最高的模型。 - 启用说话人分离 :检查
speaker_diarization: true是否已设置。Universal-3 Pro的说话人分离能力很强。 - 音频质量 :检查从Agora接收的音频质量。如果原始音频有严重回声、啸叫或背景噪声,再好的模型也无能为力。确保Agora频道内的用户使用了合适的音频设备并开启了Agora SDK的音频预处理(如AEC、ANS)。
- 声道问题 :确保发送给AssemblyAI的是 单声道 音频。立体声可能会干扰模型。
- 确认模型 :确保在AssemblyAIClient配置中明确指定了
问题3:Bot加入Agora频道失败,报错“Token无效”或“动态密钥过期”。
- 解决 :
- Token生成逻辑 :确保Token服务器使用的
appId、appCertificate、channelName和uid与Bot尝试加入时使用的完全一致。 - Token过期 :Token有有效期(通常1小时)。实现Token自动刷新机制。在Bot中加入逻辑,在Token过期前(如剩余5分钟时)向Token服务器申请新Token,并使用Agora SDK的
renewToken方法更新。 - 角色权限 :Bot加入频道时使用的Token,其角色(
RtcRole)必须包含订阅流的权限(即SUBSCRIBER)。
- Token生成逻辑 :确保Token服务器使用的
问题4:WebSocket连接(Socket.IO)不稳定,前端收不到实时转录。
- 排查 :
- 防火墙/代理 :确保服务器防火墙开放了Socket.IO使用的端口(默认是3000,以及可能的WebSocket升级端口)。
- 心跳与重连 :Socket.IO客户端应配置自动重连。服务器端也需要处理连接中断后的清理工作。
- 房间管理 :确保前端
socket.emit('join_session', {sessionId})时,sessionId与后端创建的会话ID完全匹配,且后端正确执行了socket.join(sessionId)。
问题5:内存泄漏,Bot运行一段时间后崩溃。
- 排查 :
- 事件监听器 :确保在
TranscriptionSession的stop方法中,移除了所有自定义事件监听器,并正确关闭了Agora和AssemblyAI的连接。 - 音频缓冲 :检查
transcriptionBuffer或任何用于累积音频/文本的数组是否被定期清理。 - 使用内存分析工具 :在Node.js中,可以使用
--inspect标志启动服务,然后使用Chrome DevTools或clinic.js等工具进行堆内存快照分析,查找未被释放的对象引用。
- 事件监听器 :确保在
构建这样一个实时转录Bot,就像在Agora和AssemblyAI这两座强大的云服务之间架起一座高精度、低延迟的数据桥梁。每一个环节——从音频流的抓取、格式转换、实时传输,到结果的处理与分发——都需要仔细设计和调试。当看到第一句清晰的、带说话人标签的文字从嘈杂的会议音频中实时呈现出来时,你会觉得这一切的折腾都是值得的。这个项目不仅是一个工具,更是一个理解现代实时音视频与AI语音处理如何协同工作的绝佳范例。
更多推荐

所有评论(0)