Agora与AssemblyAI集成:构建低延迟实时语音转录系统
1. 项目概述:当实时音视频遇上AI转录
最近在做一个挺有意思的集成项目,核心是把Agora的实时音视频通信能力和AssemblyAI最新的Universal-3 Pro语音转文字模型结合起来,打造一个能实时转录会议、直播、客服对话的“智能速记员”。这个组合听起来简单,但实际做下来,从架构设计到性能调优,每一步都有不少门道。Agora大家应该不陌生,做实时音视频通信的头部服务商,SDK成熟稳定,延迟能做到毫秒级。AssemblyAI则是语音AI领域的佼佼者,他们的Universal-3 Pro模型号称在准确率、多语言支持和实时性上都有显著提升,特别适合需要高精度转录的场景。
这个转录机器人的核心价值在于,它能把实时发生的语音对话,几乎是同步地转换成结构化的文字稿。想象一下在线教育场景,老师讲课的同时,侧边栏就实时滚动着字幕和重点摘要;或者是在跨国会议里,不同语言的发言能被实时翻译并转写成文字,打破沟通壁垒;再比如内容审核,对直播语音进行实时关键词监测。这些场景对延迟和准确率的要求都非常苛刻,传统的录音后处理方案根本满足不了。
我自己在搭建过程中,主要解决了几个核心问题:如何低延迟、高可靠地从Agora拉取音频流;如何将音频流适配成AssemblyAI API要求的格式并进行高效传输;如何处理转录结果的返回、展示与存储;以及如何应对网络波动、服务中断等异常情况。整个项目涉及前端、后端、音视频处理、网络通信多个层面,是一个典型的全栈集成项目。接下来,我就把整个实现思路、关键步骤以及踩过的坑,详细拆解一遍。
2. 核心架构设计与技术选型
2.1 为什么选择Agora + AssemblyAI这个组合?
在做技术选型时,我对比过几种方案。比如直接用浏览器的 Web Speech API 或 WebRTC 的 getUserMedia 配合本地语音识别库。前者识别准确率一般,且对中文支持不佳;后者则对客户端性能要求高,模型体积大,不适合复杂场景。也考虑过用FFmpeg拉流推到其他云服务商的语音识别接口,但整体链路长,延迟控制复杂。
最终选择Agora + AssemblyAI,是基于以下几个核心考量:
- 专业性分工 :Agora专注解决实时音视频的“传输”问题,在全球节点部署、抗弱网、超低延迟方面是专家。AssemblyAI专注解决“识别”问题,其Universal-3 Pro模型在嘈杂环境、多人对话、专业术语识别上表现突出。让专业的人做专业的事,集成起来反而更可靠。
- 延迟与实时性 :Agora的音频流可以极低延迟(通常<200ms)地推送到自建服务端。AssemblyAI的实时转录API(Realtime API)支持WebSocket长连接,音频流分片发送,转录结果流式返回,端到端延迟可以控制在1-2秒内,对于大多数交互场景是可接受的。
- 可控性与成本 :相比于使用某些打包好的“一站式”解决方案,自己集成Agora和AssemblyAI,对数据流、业务逻辑有完全的控制权。你可以决定哪些频道的音频需要转录,转录结果如何处理(是实时显示、存储还是触发其他业务动作)。成本上也更清晰,Agora按语音时长计费,AssemblyAI按音频处理时长计费,用多少算多少。
- 功能扩展性 :AssemblyAI Universal-3 Pro不仅提供转写,还支持说话人分离(谁在说话)、情感分析、内容摘要、实体识别(人名、地点)等高级功能。通过API可以灵活启用这些功能,为业务赋能,比如自动生成会议纪要要点,或标记出讨论中的关键决策。
基于这些原因,我决定采用 服务端集中处理 的架构。所有Agora频道内的音频流,由我们部署的一个中间服务(转录机器人)来订阅、处理并转发给AssemblyAI,再将结果分发出去。这样避免了客户端性能差异和网络环境的影响,也便于统一管理和监控。
2.2 系统架构全景图与组件职责
整个系统的数据流是这样的:
[Agora客户端 (说话者)] --(音频流)--> [Agora云服务]
|
| (通过Agora服务端SDK订阅)
v
[转录机器人服务 (Node.js/Python)]
|
(音频格式转换、分片、流式发送)
v
[AssemblyAI Realtime API]
|
(流式返回JSON格式的转录结果)
v
[转录机器人服务]
|
(结果处理:合并、说话人标签、分发到WebSocket/存储/业务系统)
v
[Web客户端 / 数据存储 / 消息队列]
各核心组件职责详解:
- Agora客户端 :负责采集和发送用户的音频。对于转录机器人来说,它是音频源。我们需要确保客户端使用合适的音频编码格式(如OPUS)和参数(采样率、声道数),以保证音质的同时不过度占用带宽。
- Agora云服务 :负责全球范围内的音频流路由、转发和混音。这是我们无法干预的黑盒,但提供了强大的服务端SDK(如Agora的RESTful API或更底层的云端录制SDK),允许我们以“特权用户”身份加入频道,订阅所有或指定用户的音频流。
- 转录机器人服务(核心) :这是我们自己实现的后端服务,是整个系统的“大脑”。它需要完成以下关键任务:
- 频道管理 :根据配置,动态加入或离开Agora频道。
- 音频流接收与解码 :通过Agora服务端SDK接收音频包(可能是编码后的)。由于AssemblyAI API通常接受PCM音频数据,因此可能需要先对音频进行解码(如果Agora SDK输出的是编码格式)。
- 音频预处理与分片 :将连续的音频流按固定时长(如200-500毫秒)分片。同时,可能需要进行重采样(确保采样率与AssemblyAI要求一致,如16kHz)、声道转换(单声道)等操作。
- 与AssemblyAI建立并维护WebSocket连接 :为每个需要转录的音频源(或整个混合后的音频)建立一个到AssemblyAI Realtime API的WebSocket连接。需要处理连接建立、认证(API Key)、重连逻辑。
- 流式发送与接收 :将音频分片通过WebSocket实时发送。同时监听WebSocket返回的消息,解析JSON格式的转录结果(包括临时性的
PartialTranscript和最终确认的FinalTranscript)。 - 结果后处理与分发 :对返回的文本进行必要的处理,如根据说话人ID区分不同发言者,将零散的分片结果合并成连贯的句子。然后通过WebSocket服务器将结果推送给前端页面,或者存入数据库(如MongoDB用于全文检索),又或者发布到消息队列(如Redis Pub/Sub, Kafka)供其他业务系统消费。
- 结果消费端 :可以是显示实时字幕的Web页面(通过WebSocket连接机器人服务),可以是用于存档的关系型数据库或搜索引擎,也可以是与Slack、Teams集成的聊天机器人接口。
这个架构的关键在于 异步和非阻塞 。机器人服务必须能够同时处理多个Agora频道的音频流,每个音频流对应一个独立的AssemblyAI WebSocket连接。这就要求服务采用事件驱动模型(Node.js的天然优势)或异步框架(如Python的asyncio)。
注意: 一个常见的误区是试图在一个WebSocket连接里发送多个人的混合音频,然后指望AssemblyAI能自动区分说话人。对于多人对话场景,更好的做法是 利用Agora服务端SDK获取每个用户的独立音频流 ,为每个用户建立独立的AssemblyAI连接。这样Universal-3 Pro模型能更精准地为每个音频源进行识别和说话人标注。如果只能拿到混合音频,则需要在建立AssemblyAI连接时,显式启用
speaker_labels参数,模型会尝试区分,但在多人同时讲话(重叠语音)时,效果会打折扣。
3. 关键实现步骤与代码拆解
3.1 环境准备与依赖安装
首先,你需要准备好以下账号和资源:
- Agora账号 :在Agora控制台创建项目,获取
App ID。更重要的是,要启用并获取 服务端相关凭证 。对于云端订阅音频流,你可能需要:- RESTful API的Customer ID和Certificate :用于调用云端录制等高级REST API。
- 或者,使用 Agora RTC SDK for Node.js (Server-side) :这允许你的Node.js服务像客户端一样加入频道,但需要动态生成Token(使用App ID, App Certificate, Channel Name, User ID)。这是更直接获取原始音频包的方式。
- AssemblyAI账号 :注册后,在设置中生成一个
API Key。确保你的账户有足够的额度,并且注意Realtime API可能处于Beta阶段或有特殊申请流程。 - 服务器环境 :推荐使用Node.js (>= 16) 或 Python (>= 3.8) 环境。服务器需要有公网IP,并且网络能够稳定访问Agora和AssemblyAI的服务器(通常都在海外主流云上,确保低延迟)。
Node.js项目初始化与核心依赖:
mkdir agora-assemblyai-bot && cd agora-assemblyai-bot
npm init -y
npm install agora-access-token agora-rtc-sdk-ng assemblyai ws express
agora-access-token: 用于在服务端生成加入频道所需的动态Token。agora-rtc-sdk-ng: Agora的Node.js服务端SDK,注意这是用于 服务端 的版本,它允许你的Node.js程序作为一个“客户端”加入频道并拉流。assemblyai: AssemblyAI的官方Node.js SDK,封装了Realtime API的WebSocket连接。ws: 一个简单强大的WebSocket库,用于创建向浏览器推送转录结果的WebSocket服务器。express: 可选,用于提供简单的管理界面或健康检查接口。
关键配置管理: 创建一个 .env 文件或使用配置管理工具,安全地存储以下信息:
AGORA_APP_ID=你的Agora App ID
AGORA_APP_CERTIFICATE=你的Agora App Certificate
ASSEMBLYAI_API_KEY=你的AssemblyAI API Key
SERVER_PORT=3000 # 你的机器人服务端口
3.2 构建Agora音频流订阅服务
这部分是整个流程的源头,目标是让我们的Node.js服务能“听到”Agora频道里的声音。
步骤一:生成加入频道所需的Token Agora服务端SDK加入频道需要Token。我们可以在服务端动态生成。
// utils/agoraToken.js
const { RtcTokenBuilder, RtcRole } = require('agora-access-token');
function generateRtcToken(channelName, uid) {
const appId = process.env.AGORA_APP_ID;
const appCertificate = process.env.AGORA_APP_CERTIFICATE;
// Token有效期,单位秒。设置一个合理的值,如3600(1小时)
const expirationTimeInSeconds = 3600;
// 服务端加入频道,通常使用一个固定的、高权限的UID,如100000
const role = RtcRole.PUBLISHER; // 服务端需要发布权限吗?通常只需要SUBSCRIBER,但SDK可能需要PUBLISHER角色来初始化。
const currentTimestamp = Math.floor(Date.now() / 1000);
const privilegeExpiredTs = currentTimestamp + expirationTimeInSeconds;
// 注意:服务端作为SUBSCRIBER加入时,理论上不需要发布权限。但根据SDK文档,生成Token时可能仍需指定角色。
// 更安全的做法是生成一个包含SUBSCRIBER权限的Token。这里使用PUBLISHER是为了兼容性。
const token = RtcTokenBuilder.buildTokenWithUid(
appId,
appCertificate,
channelName,
uid,
role,
privilegeExpiredTs
);
return token;
}
module.exports = { generateRtcToken };
步骤二:使用Agora Node.js SDK加入频道并处理音频 这是核心部分。Agora的服务端SDK允许我们创建一个“客户端”实例,加入频道,并订阅远程用户的音频流。
// services/agoraService.js
const AgoraRTC = require('agora-rtc-sdk-ng');
const { generateRtcToken } = require('../utils/agoraToken');
class AgoraService {
constructor(channelName, uid = 100000) {
this.channelName = channelName;
this.uid = uid;
this.client = null;
this.audioStream = null;
this.onAudioFrame = null; // 回调函数,用于接收音频帧
}
async joinChannel() {
try {
// 创建客户端实例,模式为直播模式(live)
this.client = AgoraRTC.createClient({ mode: 'live', codec: 'vp8' });
// 监听远端用户发布流的事件
this.client.on('user-published', async (user, mediaType) => {
if (mediaType === 'audio') {
// 订阅远端用户的音频流
await this.client.subscribe(user, mediaType);
console.log(`成功订阅用户 ${user.uid} 的音频`);
// 获取远端音频轨道
const remoteAudioTrack = user.audioTrack;
// 这里的关键:我们需要获取原始的音频数据帧
// Agora SDK可能不直接提供获取PCM数据的回调。一种常见做法是:
// 1. 通过SDK将音频播放到一个虚拟的AudioContext中(在Node.js环境需要模拟)
// 2. 或者,更直接的方法是使用Agora的“云端录制”或“扩展服务”REST API来获取音频流。
// 由于Node.js环境没有Web Audio API,直接通过客户端SDK获取原始音频帧比较复杂。
// 重要提示:在Node.js服务端环境中,使用Agora的客户端SDK来获取原始音频数据流并非标准做法,且可能受限。
// 更可靠、官方的方案是使用 Agora 的“云端录制”REST API 或 “媒体流加速”扩展服务。
// 这些服务可以将频道内的音视频流推送到你指定的服务器地址(RTMP/RTMPS)。
// 然后,你可以使用FFmpeg等工具拉取这个RTMP流,解码得到PCM数据。
}
});
const token = generateRtcToken(this.channelName, this.uid);
await this.client.join(
process.env.AGORA_APP_ID,
this.channelName,
token,
this.uid
);
console.log(`服务端加入频道 ${this.channelName} 成功,UID: ${this.uid}`);
} catch (error) {
console.error('加入Agora频道失败:', error);
throw error;
}
}
async leaveChannel() {
if (this.client) {
await this.client.leave();
this.client = null;
console.log(`已离开频道 ${this.channelName}`);
}
}
}
module.exports = AgoraService;
实操心得: 上面代码揭示了一个关键挑战:在Node.js服务端,通过Agora的 客户端SDK 直接获取原始音频帧(PCM数据)非常困难,因为SDK设计初衷是用于浏览器环境,依赖WebRTC和Web Audio API。因此,对于生产环境,我强烈推荐使用 Agora云端录制 方案。你可以通过Agora的RESTful API,启动一个云端录制任务,将指定频道的音频录制下来,并 实时推送(RTMP) 到你自己的服务器。你的转录机器人服务则运行一个RTMP拉流客户端(如使用
node-media-server或ffmpeg的Node.js封装),拉取音频流并解码为PCM。这是更稳定、官方支持的获取高音质音频流的方式。
3.3 集成AssemblyAI实时转录API
假设我们已经通过某种方式(如云端录制RTMP拉流)获得了连续的PCM音频数据。接下来,我们需要将这些数据流式地发送给AssemblyAI。
步骤一:建立并管理AssemblyAI Realtime WebSocket连接 AssemblyAI的Realtime API使用WebSocket协议。我们需要建立一个连接,并在连接上发送音频数据和接收转录结果。
// services/assemblyAIService.js
const WebSocket = require('ws');
const { Readable } = require('stream');
class AssemblyAIRealtimeService {
constructor(apiKey, sampleRate = 16000) {
this.apiKey = apiKey;
this.sampleRate = sampleRate;
this.socket = null;
this.sessionId = null;
this.isConnected = false;
this.onTranscript = null; // 回调函数,用于接收转录结果
this.onError = null;
this.onClose = null;
}
connect() {
return new Promise((resolve, reject) => {
// AssemblyAI Realtime API的WebSocket端点
const wsUrl = 'wss://api.assemblyai.com/v2/realtime/ws?sample_rate=16000';
this.socket = new WebSocket(wsUrl, {
headers: {
'Authorization': this.apiKey,
},
});
this.socket.on('open', () => {
console.log('已连接到AssemblyAI Realtime API');
this.isConnected = true;
resolve();
});
this.socket.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleMessage(message);
} catch (error) {
console.error('解析AssemblyAI消息失败:', error);
}
});
this.socket.on('error', (error) => {
console.error('AssemblyAI WebSocket错误:', error);
this.isConnected = false;
if (this.onError) this.onError(error);
reject(error);
});
this.socket.on('close', (code, reason) => {
console.log(`AssemblyAI连接关闭,代码: ${code}, 原因: ${reason}`);
this.isConnected = false;
this.sessionId = null;
if (this.onClose) this.onClose(code, reason);
});
});
}
handleMessage(message) {
// 处理不同类型的返回消息
switch (message.message_type) {
case 'SessionBegins':
this.sessionId = message.session_id;
console.log(`会话开始,ID: ${this.sessionId}`);
break;
case 'PartialTranscript':
// 临时性转录结果,文本可能还会变化
if (this.onTranscript) {
this.onTranscript({
type: 'partial',
text: message.text,
audioStart: message.audio_start,
audioEnd: message.audio_end,
words: message.words, // 包含每个词的时间戳
});
}
break;
case 'FinalTranscript':
// 最终确认的转录结果
if (this.onTranscript) {
this.onTranscript({
type: 'final',
text: message.text,
audioStart: message.audio_start,
audioEnd: message.audio_end,
words: message.words,
speaker: message.speaker, // 说话人标识(如果启用了说话人分离)
});
}
break;
case 'Error':
console.error('AssemblyAI返回错误:', message.error);
break;
default:
// 忽略其他消息类型,如`UtteranceEnd`等
break;
}
}
// 发送音频数据分片。audioData应该是一个包含PCM数据的Buffer,采样率16kHz,单声道,16位有符号整数。
sendAudio(audioBuffer) {
if (!this.isConnected || !this.socket) {
console.warn('WebSocket未连接,无法发送音频');
return false;
}
// AssemblyAI要求音频数据以Base64编码的字符串形式发送
const audioBase64 = audioBuffer.toString('base64');
const message = {
audio_data: audioBase64,
};
this.socket.send(JSON.stringify(message));
return true;
}
// 发送一个标记,表示音频流结束
sendTerminate() {
if (this.isConnected && this.socket) {
const message = { terminate_session: true };
this.socket.send(JSON.stringify(message));
}
}
disconnect() {
if (this.socket) {
this.socket.close();
this.socket = null;
}
this.isConnected = false;
}
}
module.exports = AssemblyAIRealtimeService;
步骤二:音频格式处理与流式发送 从Agora云端录制RTMP流拉取到的音频,很可能不是AssemblyAI要求的格式(16kHz, 单声道, 16-bit PCM)。我们需要使用 ffmpeg 进行实时转码。
// utils/audioProcessor.js
const { spawn } = require('child_process');
const { Readable } = require('stream');
class AudioProcessor {
/**
* 创建一个FFmpeg进程,将输入音频流实时转码为AssemblyAI要求的格式。
* @param {Readable} inputStream - 输入的音频流(例如来自RTMP拉流)
* @returns {Readable} - 输出流(PCM数据流)
*/
static createTranscodeStream(inputStream) {
// FFmpeg命令参数:
// -i pipe:0 从标准输入读取
// -ar 16000 重采样到16kHz
// -ac 1 转换为单声道
// -f s16le 输出格式为有符号16位小端PCM
// pipe:1 输出到标准输出
const ffmpegArgs = [
'-i', 'pipe:0',
'-ar', '16000', // 采样率
'-ac', '1', // 声道数
'-f', 's16le', // 输出格式
'pipe:1'
];
const ffmpegProcess = spawn('ffmpeg', ffmpegArgs);
// 将输入流导入FFmpeg的标准输入
inputStream.pipe(ffmpegProcess.stdin);
// 处理错误
ffmpegProcess.stderr.on('data', (data) => {
// 可以记录日志,但通常不需要处理,除非出错
// console.error('FFmpeg stderr:', data.toString());
});
ffmpegProcess.on('error', (err) => {
console.error('启动FFmpeg进程失败:', err);
});
// FFmpeg的标准输出就是转码后的PCM流
return ffmpegProcess.stdout;
}
/**
* 将PCM流按固定时长分片。
* @param {Readable} pcmStream - PCM数据流
* @param {number} chunkDurationMs - 分片时长,毫秒(如200)
* @returns {AsyncGenerator<Buffer>} - 返回一个异步生成器,每次yield一个音频分片Buffer
*/
static async *chunkPCMStream(pcmStream, chunkDurationMs = 200) {
// 计算每个分片的大小(字节)
// PCM格式:16位 = 2字节,单声道,采样率16000。
// 每秒字节数 = 采样率 * 样本大小(字节) * 声道数 = 16000 * 2 * 1 = 32000 字节/秒
const bytesPerSecond = 16000 * 2; // 32000
const chunkSize = Math.floor(bytesPerSecond * (chunkDurationMs / 1000));
let buffer = Buffer.alloc(0);
for await (const chunk of pcmStream) {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= chunkSize) {
const slice = buffer.slice(0, chunkSize);
buffer = buffer.slice(chunkSize);
yield slice;
}
}
// 发送剩余不足一个分片的数据(如果有)
if (buffer.length > 0) {
yield buffer;
}
}
}
module.exports = AudioProcessor;
3.4 核心协调服务:串联音频流与转录
现在我们需要一个“大脑”来协调Agora音频流拉取、转码分片、AssemblyAI连接和结果分发。
// services/transcriptionBot.js
const AgoraService = require('./agoraService'); // 假设这是改造后能提供音频流的服务
const AssemblyAIRealtimeService = require('./assemblyAIService');
const AudioProcessor = require('../utils/audioProcessor');
const WebSocket = require('ws');
class TranscriptionBot {
constructor(channelName, resultWebSocketServer) {
this.channelName = channelName;
this.agoraService = null;
this.assemblyAIService = null;
this.isRunning = false;
this.resultWSS = resultWebSocketServer; // 用于向前端广播结果的WebSocket服务器实例
}
async start() {
if (this.isRunning) {
console.log(`转录机器人已在运行中: ${this.channelName}`);
return;
}
console.log(`启动转录机器人,频道: ${this.channelName}`);
try {
// 1. 初始化AssemblyAI服务
this.assemblyAIService = new AssemblyAIRealtimeService(process.env.ASSEMBLYAI_API_KEY);
this.assemblyAIService.onTranscript = this.handleTranscript.bind(this);
this.assemblyAIService.onError = this.handleAssemblyAIError.bind(this);
this.assemblyAIService.onClose = this.handleAssemblyAIClose.bind(this);
// 2. 连接AssemblyAI
await this.assemblyAIService.connect();
// 3. 初始化并加入Agora频道(这里假设AgoraService已改造为能提供音频流Readable)
this.agoraService = new AgoraService(this.channelName);
// 假设 agoraService.getAudioStream() 返回一个Readable流(例如来自RTMP拉流后的PCM流)
const rawAudioStream = await this.agoraService.joinAndGetStream();
// 4. 音频处理流水线
const pcmStream = AudioProcessor.createTranscodeStream(rawAudioStream);
// 5. 流式分片并发送
this.isRunning = true;
this.streamAudioToAssemblyAI(pcmStream);
console.log(`转录机器人启动成功: ${this.channelName}`);
} catch (error) {
console.error(`启动转录机器人失败: ${this.channelName}`, error);
this.stop();
throw error;
}
}
async streamAudioToAssemblyAI(pcmStream) {
// 使用异步生成器按分片发送
for await (const audioChunk of AudioProcessor.chunkPCMStream(pcmStream, 200)) {
if (!this.isRunning || !this.assemblyAIService?.isConnected) {
break;
}
this.assemblyAIService.sendAudio(audioChunk);
// 可以在这里加入简单的流量控制,避免发送过快
await new Promise(resolve => setTimeout(resolve, 50)); // 模拟实时性,约200ms发送一次
}
// 音频流结束,发送终止信号
if (this.assemblyAIService?.isConnected) {
this.assemblyAIService.sendTerminate();
}
}
handleTranscript(transcript) {
console.log(`[${transcript.type}] ${transcript.text}`);
// 将结果广播给所有连接的WebSocket客户端(例如前端页面)
const resultToSend = {
channel: this.channelName,
timestamp: Date.now(),
...transcript
};
if (this.resultWSS) {
this.resultWSS.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(resultToSend));
}
});
}
// 也可以同时存储到数据库或发送到消息队列
// this.saveToDatabase(resultToSend);
}
handleAssemblyAIError(error) {
console.error(`AssemblyAI服务错误:`, error);
// 可以实现重连逻辑
}
handleAssemblyAIClose(code, reason) {
console.log(`AssemblyAI连接关闭,尝试重启...`);
// 简单的重连逻辑,生产环境需要更健壮
setTimeout(() => {
if (this.isRunning) {
this.assemblyAIService.connect().catch(err => console.error('重连失败:', err));
}
}, 3000);
}
async stop() {
this.isRunning = false;
if (this.assemblyAIService) {
this.assemblyAIService.disconnect();
this.assemblyAIService = null;
}
if (this.agoraService) {
await this.agoraService.leaveChannel();
this.agoraService = null;
}
console.log(`转录机器人已停止: ${this.channelName}`);
}
}
module.exports = TranscriptionBot;
3.5 构建一个简单的WebSocket结果服务器与前端示例
为了让前端能实时看到转录结果,我们需要一个WebSocket服务器。
// server.js
const express = require('express');
const WebSocket = require('ws');
const TranscriptionBotManager = require('./services/transcriptionBotManager'); // 一个管理多个机器人的管理器
const app = express();
const PORT = process.env.SERVER_PORT || 3000;
// 创建HTTP服务器,用于Express和WebSocket共享端口
const server = app.listen(PORT, () => {
console.log(`转录机器人服务运行在 http://localhost:${PORT}`);
});
// 创建WebSocket服务器,附着到同一个HTTP服务器上
const wss = new WebSocket.Server({ server });
// 存储所有连接的客户端
const clients = new Set();
wss.on('connection', (ws) => {
console.log('新的WebSocket客户端连接');
clients.add(ws);
ws.on('close', () => {
console.log('WebSocket客户端断开连接');
clients.delete(ws);
});
ws.on('error', (error) => {
console.error('WebSocket客户端错误:', error);
});
});
// 初始化机器人管理器,并传入WebSocket服务器用于广播
const botManager = new TranscriptionBotManager(wss);
// 简单的REST API接口,用于控制机器人(例如启动/停止某个频道的转录)
app.use(express.json());
app.post('/api/start', async (req, res) => {
const { channelName } = req.body;
if (!channelName) {
return res.status(400).json({ error: '缺少 channelName 参数' });
}
try {
await botManager.startBot(channelName);
res.json({ success: true, message: `已启动频道 ${channelName} 的转录` });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/stop', async (req, res) => {
const { channelName } = req.body;
if (!channelName) {
return res.status(400).json({ error: '缺少 channelName 参数' });
}
try {
await botManager.stopBot(channelName);
res.json({ success: true, message: `已停止频道 ${channelName} 的转录` });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// 健康检查
app.get('/health', (req, res) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
一个简单的前端页面,用于连接WebSocket并显示实时字幕:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Agora转录实时字幕</title>
<style>
body { font-family: sans-serif; padding: 20px; }
#transcript-container {
border: 1px solid #ccc;
padding: 20px;
height: 400px;
overflow-y: auto;
margin-bottom: 20px;
}
.message { margin-bottom: 10px; padding: 8px; border-radius: 4px; }
.partial { background-color: #e0f7fa; border-left: 4px solid #00bcd4; }
.final { background-color: #f1f8e9; border-left: 4px solid #8bc34a; }
.speaker { font-weight: bold; color: #555; }
</style>
</head>
<body>
<h1>实时转录字幕</h1>
<div>频道: <span id="channel-name">--</span></div>
<div id="transcript-container"></div>
<div>
<button onclick="connectWS()">连接</button>
<button onclick="disconnectWS()">断开</button>
</div>
<script>
let ws = null;
const transcriptContainer = document.getElementById('transcript-container');
const channelNameSpan = document.getElementById('channel-name');
function connectWS() {
// 假设你的机器人服务运行在 localhost:3000
const wsUrl = `ws://localhost:3000`;
ws = new WebSocket(wsUrl);
ws.onopen = () => {
console.log('WebSocket连接已打开');
addMessage('系统', '已连接到转录服务', 'system');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('收到转录:', data);
channelNameSpan.textContent = data.channel;
const speaker = data.speaker ? `发言人 ${data.speaker}: ` : '';
addMessage(speaker, data.text, data.type);
};
ws.onerror = (error) => {
console.error('WebSocket错误:', error);
addMessage('系统', '连接出错', 'system-error');
};
ws.onclose = () => {
console.log('WebSocket连接已关闭');
addMessage('系统', '连接已断开', 'system');
};
}
function disconnectWS() {
if (ws) {
ws.close();
ws = null;
}
}
function addMessage(speaker, text, type) {
const msgDiv = document.createElement('div');
msgDiv.className = `message ${type}`;
msgDiv.innerHTML = `<span class="speaker">${speaker}</span> ${text}`;
transcriptContainer.appendChild(msgDiv);
// 自动滚动到底部
transcriptContainer.scrollTop = transcriptContainer.scrollHeight;
}
// 页面加载后自动连接(可选)
// window.onload = connectWS;
</script>
</body>
</html>
4. 部署、优化与故障排查实录
4.1 服务器部署与性能考量
这个转录机器人服务对CPU和网络有一定要求,尤其是在处理多个频道并发时。
- 服务器规格 :对于单频道,1核2GB内存的云服务器可能勉强够用。但FFmpeg转码是CPU密集型操作,多频道并发时,CPU会成为瓶颈。建议从2核4GB起步,并根据实际负载监控进行升级。使用
top或htop命令监控CPU使用率,确保在音频处理高峰期不超过80%。 - 网络带宽 :主要考虑两个方向的流量:
- 入向 :从Agora云端录制RTMP拉流的带宽。一个单声道、16kHz的PCM流带宽很低(约256 kbps),但原始RTMP流可能包含更高质量的音频,带宽会更高。
- 出向 :向AssemblyAI发送Base64编码的音频数据(数据量约为原始PCM的4/3),以及接收返回的JSON文本。流量不大。
- 建议 :选择网络质量好、到Agora和AssemblyAI服务器延迟低的云服务商区域(如北美、欧洲、新加坡等)。可以使用
ping和traceroute测试网络质量。
- 进程管理 :使用
PM2或Docker管理Node.js进程,确保服务崩溃后能自动重启。PM2的配置文件ecosystem.config.js可以这样写:module.exports = { apps: [{ name: 'transcription-bot', script: './server.js', instances: 1, // 根据CPU核心数调整 exec_mode: 'fork', // 或者'cluster'模式利用多核 env: { NODE_ENV: 'production', }, max_memory_restart: '500M', // 内存超过500M重启 error_file: './logs/err.log', out_file: './logs/out.log', log_file: './logs/combined.log', time: true, }] }; - 日志与监控 :务必记录详细的日志,包括机器人的启动/停止、AssemblyAI连接状态、音频发送频率、错误信息等。可以使用
winston或pino等日志库。同时,监控关键指标:各频道转录延迟(从音频发生到收到文字的时间)、服务CPU/内存使用率、WebSocket连接数、AssemblyAI API调用错误率。
4.2 延迟优化实战技巧
实时转录的延迟是核心体验指标。目标是将端到端延迟控制在2秒以内。
- 音频分片大小 :
AudioProcessor.chunkPCMStream函数中的chunkDurationMs参数是关键。太小(如50ms)会导致WebSocket通信开销增大,增加网络往返延迟;太大(如500ms)会导致AssemblyAI处理延迟增加,因为模型需要积累更多音频才能做出准确判断。 经过实测,200ms是一个比较好的平衡点 。AssemblyAI官方也推荐200-500ms。 - 网络链路优化 :
- 服务器位置 :你的转录机器人服务器、Agora云端录制服务器、AssemblyAI服务器三者之间的网络延迟应尽可能低。如果主要用户在中国,可以考虑使用Agora在中国大陆的数据中心,并将机器人部署在离该数据中心近的海外区域(如新加坡),同时该区域到AssemblyAI(通常在北美)的网络也要好。
- WebSocket连接复用 :为每个音频源建立独立的AssemblyAI连接是必要的,但连接建立本身有开销。可以考虑连接池或长连接保活机制,避免频繁重建连接。
- AssemblyAI参数调优 :
- 在建立WebSocket连接时,可以传递
word_boost参数来提升特定词汇(如产品名、专业术语)的识别准确率,减少因识别错误导致的反复修正延迟。 - 启用
endpointing(端点检测)可以让模型更智能地判断一句话何时结束,从而更快地返回FinalTranscript。但过于敏感的端点检测可能导致长句被切分。
- 在建立WebSocket连接时,可以传递
- 前端渲染优化 :前端收到
PartialTranscript时立即显示,收到FinalTranscript时更新替换。这种“流式”显示能极大提升用户感知的实时性,即使最终确认稍有延迟,用户也能看到实时识别的过程。
4.3 常见问题与排查清单
在实际部署和运行中,我遇到了不少问题,这里整理成一个排查清单:
| 问题现象 | 可能原因 | 排查步骤与解决方案 |
|---|---|---|
| 无法连接到Agora频道 | 1. Token无效或过期。 2. App ID或Certificate错误。 3. 频道名包含非法字符。 4. 网络防火墙阻止。 |
1. 检查Token生成逻辑和时间戳。确保服务端时间准确(使用NTP)。 2. 核对控制台的App ID和Certificate。 3. 频道名只允许数字、字母和下划线。 4. 在服务器上使用 telnet 或 nc 测试Agora服务端地址的特定端口(如TCP 443)是否可达。 |
| 能连接Agora,但收不到音频 | 1. 服务端加入的UID与频道内已有用户冲突。 2. 没有用户发布音频流。 3. 使用了错误的云端录制配置。 |
1. 使用一个频道内唯一的、高数字的UID(如100000+)。 2. 确认有真实用户客户端在频道内并开启了麦克风。 3. 如果使用云端录制,确保录制模式配置正确(如合流模式),并且RTMP地址配置无误,服务器能成功拉流。 |
| AssemblyAI WebSocket连接失败 | 1. API Key无效或额度不足。 2. 服务器网络无法访问 api.assemblyai.com 。 3. 采样率参数不匹配。 |
1. 在AssemblyAI控制台检查API Key状态和用量。 2. 在服务器上 curl -v https://api.assemblyai.com/v2/realtime/ws ,看是否能建立WebSocket握手。 3. 确保连接URL中的 sample_rate 参数与发送的音频数据采样率一致(通常为16000)。 |
| AssemblyAI连接成功,但不返回转录结果 | 1. 发送的音频数据格式错误。 2. 音频数据发送过快或过慢。 3. 音频数据是静音或音量过低。 |
1. 最关键的一步 :验证音频格式。确保是16kHz、单声道、16位有符号整数PCM。可以先将一段已知内容的WAV文件(符合格式)通过脚本发送测试。 2. 控制发送节奏,模拟实时。每发送一个200ms的分片后,等待约200ms再发下一个。 3. 检查音频源是否有有效声音。可以在发送前计算音频分片的RMS(均方根)值,过滤掉静音帧。 |
| 转录结果延迟非常高(>5秒) | 1. 网络延迟高。 2. 音频分片太大。 3. 服务器CPU负载高,处理不过来。 |
1. 使用 ping 和 mtr 诊断到AssemblyAI服务器的网络延迟和丢包。 2. 将分片大小从500ms调整为200ms试试。 3. 监控服务器CPU,升级配置或优化代码(如FFmpeg参数调优,使用更高效的Buffer处理)。 |
| 转录准确率低 | 1. 音频质量差(有噪音、回声)。 2. 包含大量专业术语或口音。 3. 多人同时说话。 |
1. 建议Agora客户端开启音频降噪、回声消除(AEC)等预处理。 2. 使用AssemblyAI的 word_boost 参数,提供术语列表。 3. 如果可能,为每个用户建立独立的音频流和转录连接。如果只能混合音频,确保在连接AssemblyAI时启用 speaker_labels: true 。 |
| 服务运行一段时间后内存泄漏 | 1. 未正确关闭WebSocket连接或Agora客户端。 2. 未清理事件监听器。 3. Buffer或大对象未及时释放。 |
1. 在 stop() 方法中确保调用 disconnect() 和 leaveChannel() 。 2. 使用Node.js的 --inspect flag和Chrome DevTools进行内存堆快照分析。 3. 确保 for await...of 循环中的临时变量能被正常垃圾回收。对于长时间运行的服务,定期重启(如通过PM2)也是一个简单有效的策略。 |
| 前端收不到WebSocket消息 | 1. WebSocket服务器未正确广播。 2. 前端WebSocket URL错误。 3. 浏览器跨域问题(如果前端与服务器不同域)。 |
1. 在机器人服务的 handleTranscript 方法中打印日志,确认是否执行到广播代码。 2. 检查前端连接的WebSocket URL端口和路径是否正确。 3. 如果跨域,需要在WebSocket服务器端设置正确的 Origin 验证,或者使用Nginx反向代理将前后端统一到同域。 |
4.4 成本控制与扩展建议
- 成本监控 :
- Agora :费用主要来自语音时长(服务端加入频道也会产生时长)和云端录制服务(如果使用)。在控制台设置用量告警。
- AssemblyAI :按音频处理时长计费。Universal-3 Pro是高级模型,费率可能比标准模型高。务必在代码中做好异常处理,避免在无音频时持续发送数据产生费用。可以在发送前进行静音检测。
- 扩展性设计 :
- 多频道支持 :上述架构中的
TranscriptionBotManager就是为管理多个机器人实例设计的。你需要一个机制(如数据库或配置中心)来动态管理需要转录的频道列表。 - 水平扩展 :当单个服务器无法承载更多频道时,可以考虑水平扩展。引入一个消息队列(如Redis或RabbitMQ),将“需要启动转录”的任务发布到队列,由多个工人(Worker)服务消费并执行。工人服务是无状态的,可以方便地扩容。
- 结果聚合 :如果多个工人服务处理同一个频道的不同用户流,可能需要一个聚合服务将不同说话人的文字按时间线合并,再推送给前端。
- 多频道支持 :上述架构中的
- 功能增强 :
- 实时翻译 :在收到AssemblyAI的转录文本后,可以立即调用翻译API(如Google Cloud Translation, DeepL),实现同声传译效果。
- 敏感词过滤 :对转录文本进行实时敏感词匹配,触发告警或自动屏蔽。
- 摘要生成 :定期(如每5分钟)将一段时间内的
FinalTranscript发送给AssemblyAI的摘要API,生成阶段性会议摘要。
这个项目从构想到实现,涉及了实时音视频、网络编程、音频处理、AI服务集成等多个领域。最大的挑战不在于调用某个API,而在于如何将不同服务稳定、高效、低延迟地串联起来,并处理好各种边界情况和异常。希望这份详细的拆解,能帮你避开我踩过的那些坑,顺利搭建起自己的实时语音转录系统。
更多推荐

所有评论(0)