1. 项目概述:为什么语音AI是下一个开发前沿

如果你最近在捣鼓AI应用,尤其是想让它能听会说,那你肯定体会过那种“拼接怪”的痛苦。传统的语音AI流水线,简直就是一场技术栈的噩梦:你得先找个服务把语音转成文字(Speech-to-Text, STT),比如用OpenAI的Whisper API;然后把文字扔给大语言模型(LLM),比如Claude或GPT,让它思考怎么回答;最后再把生成的文字答案,通过另一个服务合成语音(Text-to-Speech, TTS),比如ElevenLabs。这还没完,为了让体验流畅,你还得处理这三者之间的实时流式传输、状态管理、错误处理和成本监控。每一个环节都是一个独立的SDK,有着不同的认证方式、计费模式和错误码,光是让它们协同工作不掉链子,就足以消磨掉你所有的热情。

这就是为什么当我接触到NeuroLink这个TypeScript SDK时,感觉像是发现了一个新大陆。它把STT、LLM推理和TTS这三个核心环节,统一封装成了一个连贯的、流式优先的API。简单来说,它让你能用处理文本流的方式来处理语音流。你不再需要关心音频数据怎么喂给Whisper,LLM返回的文本流怎么实时地、逐句地送给TTS引擎。NeuroLink帮你打理好了一切,让你可以专注于构建对话逻辑和用户体验本身。这篇内容,就是基于我最近用NeuroLink构建一个实时语音助手的实战经验,我会手把手带你走通整个流程,从环境搭建、核心流水线构建,到高级功能如打断唤醒、多语言支持和生产环境优化。无论你是想做一个智能客服机器人、一个语音编程助手,还是一个实时翻译工具,这里面的思路和代码都能直接拿来用。

2. 核心架构解析:NeuroLink如何统一语音AI流水线

在深入代码之前,我们得先理解NeuroLink设计的核心理念。传统的语音AI架构是离散的、管道式的,而NeuroLink倡导的是“语音即流”的一体化思想。

2.1 传统架构的痛点与NeuroLink的解法

先看一张传统架构的思维图,虽然我们不用图表,但可以描述清楚:用户的语音通过麦克风进入,首先被送到STT服务(如Whisper),转换成文本;这段文本随后被发送到LLM服务(如Anthropic Claude),生成回复文本;最后,回复文本被送到TTS服务(如ElevenLabs),合成语音并播放。这个链条带来了几个典型问题:

  1. 延迟叠加 :每个环节(STT、LLM推理、TTS)都会引入200-500毫秒不等的延迟。用户说一句话,可能要等上1-2秒才能听到回复,这种滞后感会严重破坏对话的自然性。
  2. 供应商碎片化 :三个环节可能来自三个不同的供应商,你需要管理三套API密钥、三套SDK、三套错误处理逻辑。当Deepgram的STT服务不稳定时,如何无缝切换到AssemblyAI?这需要大量的胶水代码。
  3. 流式传输复杂 :为了实现低延迟,理想状态是流式处理。即STT一边听一边转,LLM一边生成一边回,TTS一边收文本一边合成语音。但这三个流的节奏和数据类型完全不同(音频流、文本令牌流、音频流),协调它们就像指挥三个不同步的乐队。
  4. 状态管理困难 :如何维持多轮对话的上下文?用户的当前语音可能和30秒前的话题相关,你需要一个机制来存储和检索对话历史,并确保它在STT、LLM、TTS的整个链条中保持一致。

NeuroLink的解法非常巧妙:它引入了一个“工具”(Tools)的概念,并将STT和TTS都抽象成了LLM可以调用的工具。但更重要的是,它提供了一个顶层的 stream() API。你可以把一个音频流直接丢给NeuroLink实例,并指定输出需要包含文本和音频。NeuroLink内部会帮你自动编排:调用STT工具转文本,将文本流式喂给LLM,同时将LLM产生的文本流实时喂给TTS工具,最终将合成的音频流返回给你。对你而言,你只是在消费一个统一的流。

2.2 NeuroLink核心配置与初始化

让我们看看如何初始化一个具备语音能力的AI智能体。首先,你需要安装NeuroLink:

npm install @juspay/neurolink

接下来是核心的配置。这里我强烈建议你从一开始就考虑好生产环境的需求,比如记忆(对话历史)和系统提示词(定义AI角色)。

import { NeuroLink } from "@juspay/neurolink";

interface VoiceConfig {
  sttProvider: "whisper" | "deepgram" | "assembly";
  llmProvider: "anthropic" | "openai" | "google-ai";
  ttsProvider: "elevenlabs" | "openai" | "azure";
}

const voiceAgent = new NeuroLink({
  // 核心LLM配置:选择推理大脑
  provider: "anthropic",
  model: "claude-3-5-sonnet-20241022", // 使用较新的模型版本
  apiKey: process.env.ANTHROPIC_API_KEY, // 务必从环境变量读取

  // 记忆配置:这是实现多轮对话的关键
  memory: {
    enabled: true,
    backend: "redis", // 生产环境推荐Redis,开发可用`memory`
    ttl: 3600, // 会话存活时间1小时
    config: {
      url: process.env.REDIS_URL // 连接信息
    }
  },

  // 系统提示词:塑造AI的语音人格和回答风格
  systemPrompt: `你是一个有帮助的语音助手。请使用口语化的语言进行交流。
  保持回答简洁,通常2-3句话为宜,这样听起来更自然。
  避免使用Markdown、代码块或任何特殊格式。
  如果用户的问题需要较长解释,先给出核心结论,再询问是否需要详细说明。`,

  // 工具配置:赋予AI“听”和“说”的能力
  tools: [
    {
      name: "speechToText",
      provider: "whisper", // 默认STT提供商
      config: { language: "auto" } // 自动检测语言
    },
    {
      name: "textToSpeech",
      provider: "elevenlabs", // 默认TTS提供商
      config: {
        voiceId: process.env.ELEVENLABS_DEFAULT_VOICE_ID,
        model: "eleven_multilingual_v2",
        stability: 0.5, // 语音稳定性参数
        similarity_boost: 0.8 // 声音相似度参数
      }
    }
  ],

  // 高级配置:重试与回退策略
  fallback: {
    providers: ["whisper", "deepgram"], // STT主备
    strategy: "sequential"
  },
  retry: {
    attempts: 2,
    backoff: "exponential"
  }
});

注意 :系统提示词( systemPrompt )对语音助手至关重要。因为TTS合成的语音是线性的、瞬时的,用户无法“回看”。过于复杂、冗长或包含视觉格式(如列表符号、代码段)的回答,听起来会非常糟糕。务必指示AI使用简短、连贯的口语句子。

这个配置对象是NeuroLink能力的基石。 memory 的配置确保了不同会话之间的隔离和同一会话内的上下文连贯。 tools 的配置声明了我们将要使用的STT和TTS服务及其参数。这里我选择了Whisper和ElevenLabs,因为它们分别在准确性和音质上表现突出,且NeuroLink对其集成良好。

3. 构建端到端语音流水线:从麦克风到扬声器

有了配置好的 voiceAgent ,我们现在来搭建一个完整的、可运行的语音循环。这个循环包括五个核心步骤:音频采集、语音转文字、LLM处理、文字转语音、音频播放。

3.1 音频采集与语音活动检测

在Node.js环境下,我们可以使用 node-record-lpcm16 库来捕获麦克风的原始PCM数据。但更优的做法是集成语音活动检测,只在检测到人声时才进行录制和处理,这能节省大量的计算资源和API调用成本。

import { Recorder } from 'node-record-lpcm16';
import { MicVAD } from '@ricky0123/vad-web'; // 注意:这是Web端库,Node端需用其他方案如`snowboy`或`vad`

// 方案一:基础持续录音(适用于始终在线的场景,如客服坐席)
async function* captureAudioContinuous(): AsyncIterable<Buffer> {
  const recorder = new Recorder({
    sampleRate: 16000, // Whisper推荐的采样率
    channels: 1, // 单声道
    audioType: 'wav',
    threshold: 0.5, // 录音阈值,可过滤环境噪音
    silence: '1.0' // 持续1秒静音后停止,但对我们流式场景可能不适用
  });

  const stream = recorder.record().stream();
  for await (const chunk of stream) {
    yield chunk;
  }
}

// 方案二:集成VAD的智能录音(推荐,节省资源)
// 由于Node.js没有完美的VAD库,这里演示一个简化逻辑:使用能量阈值模拟VAD
class SimpleVAD {
  private energyThreshold: number;
  private silenceFrames: number;
  private maxSilenceFrames: number;

  constructor(threshold = 0.01, maxSilenceSec = 1.5, sampleRate = 16000) {
    this.energyThreshold = threshold;
    this.maxSilenceFrames = maxSilenceSec * sampleRate / 1024; // 假设帧大小1024
    this.silenceFrames = 0;
  }

  isSpeech(audioBuffer: Buffer): boolean {
    // 计算音频帧的能量(均方根)
    let sum = 0;
    for (let i = 0; i < audioBuffer.length; i += 2) {
      const sample = audioBuffer.readInt16LE(i);
      sum += sample * sample;
    }
    const rms = Math.sqrt(sum / (audioBuffer.length / 2));
    const normalizedEnergy = rms / 32768; // 16位有符号整数归一化

    if (normalizedEnergy > this.energyThreshold) {
      this.silenceFrames = 0;
      return true;
    } else {
      this.silenceFrames++;
      return this.silenceFrames < this.maxSilenceFrames; // 静音未超时,仍算在语句内
    }
  }

  reset(): void {
    this.silenceFrames = 0;
  }
}

async function* captureAudioWithVAD(): AsyncIterable<Buffer> {
  const recorder = new Recorder({ sampleRate: 16000, channels: 1 });
  const vad = new SimpleVAD();
  const stream = recorder.record().stream();
  let speechBuffer: Buffer[] = [];
  let isSpeechActive = false;

  for await (const chunk of stream) {
    if (vad.isSpeech(chunk)) {
      if (!isSpeechActive) {
        console.log('🎙️ 检测到语音开始');
        isSpeechActive = true;
      }
      speechBuffer.push(chunk);
      yield chunk; // 实时产出音频块
    } else {
      if (isSpeechActive) {
        // 静音超时,判定语音结束
        console.log('⏹️ 检测到语音结束');
        isSpeechActive = false;
        speechBuffer = []; // 清空缓冲区
        vad.reset();
        // 这里可以抛出一个特殊事件或标记,通知流水线当前语句结束
      }
      // 非语音时段,不产出数据,节省处理
    }
  }
}

VAD的引入是生产级应用的关键。它能有效过滤背景噪音,只在用户真正说话时触发后续昂贵的STT和LLM调用。上述的 SimpleVAD 是一个简化实现,生产环境可以考虑使用更专业的库或服务端的VAD方案。

3.2 语音转文字与LLM流式响应

这是流水线的核心环节。我们将捕获的音频流送入NeuroLink,并利用其集成的STT工具进行转写,同时将转写结果流式地发送给LLM。

// 核心处理函数:将音频流转为LLM的文本响应流
async function* processAudioStream(
  audioStream: AsyncIterable<Buffer>,
  sessionId: string
): AsyncIterable<{ type: 'text' | 'tool'; content: any }> {
  // 关键:使用NeuroLink的stream API,并指定需要音频输入和文本输出
  const streamResponse = await voiceAgent.stream({
    input: {
      audio: audioStream, // 直接传入音频流
      // 可选的初始文本指令,引导STT行为,例如:
      // text: '请将以下语音转写成简体中文文本。'
    },
    session: {
      id: sessionId // 关联会话ID,用于记忆回溯
    },
    output: {
      formats: ['text'], // 我们暂时只关心文本流输出
      streaming: true // 启用流式输出
    },
    // 可以在这里覆盖默认的工具配置,例如指定使用某个特定的STT引擎
    tools: [{
      name: 'speechToText',
      provider: 'whisper',
      config: {
        model: 'whisper-1',
        response_format: 'verbose_json', // 获取带时间戳的详细结果
        temperature: 0.0 // 确定性转写
      }
    }]
  });

  // 消费流式结果
  for await (const chunk of streamResponse.stream) {
    // 类型守卫:检查chunk的类型
    if ('content' in chunk && chunk.content) {
      // 这是LLM产生的文本内容块
      yield {
        type: 'text',
        content: chunk.content
      };
      // 可以实时打印到控制台,模拟逐字显示效果
      process.stdout.write(chunk.content);
    }
    // 注意:在这个流程中,STT工具的结果被NeuroLink内部消化并直接喂给了LLM。
    // 如果我们想获取原始的转写文本,需要在工具调用结果中寻找。
    if ('toolResults' in chunk) {
      // 处理工具调用结果(如果有)
      yield {
        type: 'tool',
        content: chunk.toolResults
      };
    }
  }
}

这段代码的精髓在于 voiceAgent.stream() 调用。我们直接把 audioStream 丢给了它。NeuroLink内部会:

  1. 自动调用配置的 speechToText 工具(Whisper),将音频流转换为文本流。
  2. 将转换出的文本流,作为用户输入,实时地喂给LLM(Claude)。
  3. LLM开始流式生成思考后的回复文本。
  4. 我们通过循环 streamResponse.stream 来消费这些文本块。

实操心得 sessionId 是维持对话连续性的生命线。务必为每个独立的对话会话(例如一个用户的一次完整咨询)生成一个唯一的ID。NeuroLink会利用这个ID在配置的 memory 后端(如Redis)中存储和检索该会话的历史消息。这样,当用户说“你刚才说的是什么意思?”时,AI才能正确理解“刚才”指的是什么。

3.3 文本转语音与流式音频播放

LLM的文本流已经生成,下一步是将其转化为听起来自然的语音。这里有一个关键优化点: 逐句合成 。如果等LLM生成完一整段话再合成,用户会经历漫长的等待。我们应该在LLM生成完一个完整的句子(以句号、问号、感叹号分隔)时,就立即触发该句子的TTS合成。

import { Speaker } from 'speaker'; // 用于Node.js播放音频
import { Readable } from 'stream';

// 核心函数:将文本流转为音频流,并逐句合成以降低延迟
async function* synthesizeSpeech(
  textStream: AsyncIterable<string>,
  voiceId: string = process.env.ELEVENLABS_VOICE_ID || 'default'
): AsyncIterable<Buffer> {
  let sentenceBuffer = '';

  for await (const textChunk of textStream) {
    sentenceBuffer += textChunk;

    // 使用正则表达式尝试从缓冲区中提取完整的句子
    // 匹配以 . ! ? 结尾的序列,尽可能多地匹配
    const sentenceRegex = /[^.!?]+[.!?]+/g;
    const sentences: string[] = [];
    let match;
    while ((match = sentenceRegex.exec(sentenceBuffer)) !== null) {
      sentences.push(match[0]);
    }

    if (sentences.length > 0) {
      // 从缓冲区中移除已处理的句子
      const processedText = sentences.join('');
      sentenceBuffer = sentenceBuffer.slice(processedText.length);

      // 并行合成所有提取出的句子(进一步优化延迟)
      const synthesisPromises = sentences.map(sentence =>
        voiceAgent.generate({
          input: { text: sentence.trim() },
          tools: [{
            name: 'textToSpeech',
            provider: 'elevenlabs',
            config: {
              voiceId: voiceId,
              model: 'eleven_multilingual_v2',
              stability: 0.5,
              similarity_boost: 0.8,
              streaming: true // 请求流式音频(如果提供商支持)
            }
          }]
        }).then(result => {
          const audioData = result.toolResults?.textToSpeech?.audio;
          if (!audioData) {
            throw new Error('TTS合成失败,未返回音频数据');
          }
          // 返回音频Buffer和句子长度(可用于更精细的播放控制)
          return {
            buffer: Buffer.from(audioData, 'base64'),
            sentence: sentence
          };
        })
      );

      // 按顺序产出音频Buffer
      const audioResults = await Promise.all(synthesisPromises);
      for (const result of audioResults) {
        console.log(` 🔊 合成句子: "${result.sentence.trim()}"`);
        yield result.buffer;
      }
    }
  }

  // 处理缓冲区中剩余的、不构成完整句子的文本(例如最后一段)
  if (sentenceBuffer.trim()) {
    console.log(` 🔊 合成剩余文本: "${sentenceBuffer.trim()}"`);
    const finalResult = await voiceAgent.generate({
      input: { text: sentenceBuffer.trim() },
      tools: [{
        name: 'textToSpeech',
        provider: 'elevenlabs',
        config: { voiceId, model: 'eleven_multilingual_v2' }
      }]
    });
    const finalAudio = finalResult.toolResults?.textToSpeech?.audio;
    if (finalAudio) {
      yield Buffer.from(finalAudio, 'base64');
    }
  }
}

// 音频播放器函数
function playAudioStream(audioStream: AsyncIterable<Buffer>): void {
  const speaker = new Speaker({
    channels: 1,
    bitDepth: 16,
    sampleRate: 24000, // 需与TTS输出采样率匹配,ElevenLabs通常是24000
  });

  // 将异步迭代器转换为Readable Stream以便播放
  const readable = Readable.from(audioStream);
  readable.pipe(speaker);

  speaker.on('close', () => {
    console.log('音频播放结束。');
  });
}

synthesizeSpeech 函数是降低感知延迟的关键。它不会傻等所有文本生成完毕,而是利用一个缓冲区 sentenceBuffer 来积累文本块,一旦通过正则表达式检测到完整的句子,就立即触发该句子的TTS请求。 Promise.all 用于并行合成多个句子,这比顺序合成更快。最后, playAudioStream 函数使用 speaker 库将得到的PCM音频流实时播放出来。

3.4 整合完整循环与错误处理

现在,我们把所有部分组装起来,并加上必要的错误处理和用户交互。

async function runVoiceAssistant() {
  // 生成一个唯一的会话ID,例如使用用户ID+时间戳
  const sessionId = `user_${process.env.USER_ID || 'anonymous'}_${Date.now()}`;
  console.log(`🚀 启动语音助手,会话ID: ${sessionId}`);
  console.log('🎤 请开始说话...(按Ctrl+C退出)');

  try {
    // 1. 捕获带VAD的音频流
    const audioStream = captureAudioWithVAD();

    // 2. & 3. 将音频流送入核心处理管道,得到LLM文本响应流
    // 我们需要一个适配器,将 processAudioStream 返回的复合流转换为纯文本流
    async function* extractTextStream(
      audioStream: AsyncIterable<Buffer>,
      sessionId: string
    ): AsyncIterable<string> {
      for await (const item of processAudioStream(audioStream, sessionId)) {
        if (item.type === 'text') {
          yield item.content;
        }
        // 可以在这里处理工具调用结果,例如记录STT的原始转写
      }
    }

    const llmTextStream = extractTextStream(audioStream, sessionId);

    // 4. 将LLM文本流合成语音流
    const speechAudioStream = synthesizeSpeech(llmTextStream);

    // 5. 播放合成的语音流
    playAudioStream(speechAudioStream);

    // 由于播放是异步且阻塞的(直到音频流结束),我们需要处理退出信号
    process.on('SIGINT', () => {
      console.log('\n👋 收到退出信号,关闭语音助手。');
      process.exit(0);
    });

  } catch (error) {
    console.error('❌ 语音助手运行出错:', error);
    // 根据错误类型进行恢复操作,例如网络错误可以尝试重连
    if (error.code === 'NETWORK_ERROR') {
      console.log('网络不稳定,5秒后重试...');
      await new Promise(resolve => setTimeout(resolve, 5000));
      return runVoiceAssistant(); // 谨慎使用递归重试,最好有次数限制
    }
    process.exit(1);
  }
}

// 启动助手
runVoiceAssistant();

这个 runVoiceAssistant 函数勾勒出了最基本的语音交互循环。它从麦克风捕获声音,通过NeuroLink进行STT和LLM处理,再TTS合成并播放,形成了一个闭环。错误处理部分捕捉了可能的异常,比如网络中断,并尝试进行简单的重试。

4. 高级模式与生产环境考量

一个基础的语音循环跑通后,接下来我们要让它变得更智能、更健壮,能够应对真实场景中的各种挑战。

4.1 打断唤醒与上下文管理

在真实对话中,用户可能会随时打断AI的发言。一个优秀的语音助手必须能处理这种中断。

class InterruptibleVoiceAgent {
  private currentProcessingStream: AbortController | null = null;
  private isSpeaking: boolean = false;
  private currentSessionId: string;

  constructor(sessionId: string) {
    this.currentSessionId = sessionId;
  }

  async handleUserSpeech(audioStream: AsyncIterable<Buffer>): Promise<void> {
    // 如果AI正在说话,立即中止当前的TTS和LLM流
    if (this.isSpeaking) {
      console.log('⏸️ 检测到用户打断,停止当前响应。');
      this.currentProcessingStream?.abort();
      this.isSpeaking = false;
      // 可以在这里播放一个简短的提示音,如“嘟”
    }

    // 为新的处理流程创建中止控制器
    const abortController = new AbortController();
    this.currentProcessingStream = abortController;

    try {
      // 1. 转录用户语音
      const userTranscript = await this.transcribeAudio(audioStream, abortController.signal);
      if (!userTranscript || abortController.signal.aborted) return;

      console.log(`👤 用户说: "${userTranscript}"`);

      // 2. 检查是否为打断指令(如“停一下”、“等等”)
      if (this.isInterruptionCommand(userTranscript)) {
        await this.playInterruptionAcknowledgment();
        return; // 不进行LLM处理
      }

      // 3. 处理用户查询并生成响应
      this.isSpeaking = true;
      await this.generateAndSpeakResponse(userTranscript, abortController.signal);
    } catch (error: any) {
      if (error.name === 'AbortError') {
        // 流被正常中止,忽略错误
        console.log('处理流程被用户中断。');
      } else {
        console.error('处理用户语音时出错:', error);
        await this.playErrorSound();
      }
    } finally {
      if (!abortController.signal.aborted) {
        this.isSpeaking = false;
      }
    }
  }

  private async transcribeAudio(
    stream: AsyncIterable<Buffer>,
    signal: AbortSignal
  ): Promise<string | null> {
    // 这里调用NeuroLink的STT功能,并传递abort signal
    const result = await voiceAgent.generate({
      input: { audio: stream },
      tools: [{ name: 'speechToText' }],
      signal // 传递中止信号,以便在外部中止时取消请求
    });
    return result.toolResults?.speechToText?.text || null;
  }

  private isInterruptionCommand(transcript: string): boolean {
    const interruptionKeywords = ['停一下', '等等', '先别说了', '打断一下', 'stop', 'wait', 'hold on'];
    const lowerTranscript = transcript.toLowerCase();
    return interruptionKeywords.some(keyword => lowerTranscript.includes(keyword));
  }

  private async playInterruptionAcknowledgment(): Promise<void> {
    // 播放一个简短的确认音效,或者合成一句“好的,请说”
    console.log('🆗 已确认用户打断。');
    // 可以调用一个预先生成的简短音频Buffer,或者用TTS快速合成
  }

  private async generateAndSpeakResponse(
    userMessage: string,
    signal: AbortSignal
  ): Promise<void> {
    const llmStream = await voiceAgent.stream({
      input: { text: userMessage },
      session: { id: this.currentSessionId },
      output: { formats: ['text'], streaming: true },
      signal // 传递中止信号
    });

    const textStream = (async function* () {
      for await (const chunk of llmStream.stream) {
        if ('content' in chunk) {
          yield chunk.content;
        }
        if (signal.aborted) break;
      }
    })();

    const audioStream = synthesizeSpeech(textStream);
    for await (const audioChunk of audioStream) {
      if (signal.aborted) break;
      // 这里需要将audioChunk写入一个可中断的播放器
      // 例如使用Web Audio API或一个可以停止的speaker实例
      await this.playAudioChunk(audioChunk);
    }
  }

  private async playAudioChunk(chunk: Buffer): Promise<void> {
    // 实现具体的、可中断的音频播放逻辑
    // 例如,将chunk推入一个队列,由另一个可控制的播放线程消费
  }
}

这个 InterruptibleVoiceAgent 类引入了状态管理( isSpeaking )和中止控制器( AbortController )。当检测到用户的新语音输入时,如果AI正在说话,它会立即中止当前的TTS合成和播放流程。 isInterruptionCommand 方法用于识别用户是否发出了明确的打断指令,如果是,则直接停止并播放确认音,不再进行LLM处理,这提升了交互的智能性和响应速度。

4.2 多语言支持与动态配置

对于国际化的应用,语音助手需要能识别和响应多种语言。NeuroLink的统一API让这变得相对简单。

// 语言配置映射表
const languageProfiles: Record<string, {
  stt: { provider: string; model?: string; languageCode?: string };
  tts: { provider: string; voiceId: string; model?: string };
  llm: { systemPrompt: string }; // 可针对不同语言优化系统提示
}> = {
  'en': {
    stt: { provider: 'whisper', model: 'whisper-1' },
    tts: { provider: 'elevenlabs', voiceId: 'Rachel', model: 'eleven_multilingual_v2' },
    llm: { systemPrompt: 'You are a helpful English assistant. Respond concisely.' }
  },
  'zh-CN': {
    stt: { provider: 'whisper', model: 'whisper-1' }, // Whisper支持中文
    tts: { provider: 'elevenlabs', voiceId: 'Alice', model: 'eleven_multilingual_v2' }, // 需选择支持中文的语音
    llm: { systemPrompt: '你是一个有用的中文助手。请用口语化的中文回答,保持简洁。' }
  },
  'ja': {
    stt: { provider: 'deepgram', languageCode: 'ja' }, // 可能对日语有更好支持
    tts: { provider: 'openai', voiceId: 'alloy', model: 'tts-1' }, // OpenAI TTS
    llm: { systemPrompt: 'あなたは親切な日本語アシスタントです。簡潔にお答えください。' }
  },
  'hi': {
    stt: { provider: 'google', languageCode: 'hi-IN' }, // 假设使用Google STT
    tts: { provider: 'azure', voiceId: 'hi-IN-SwaraNeural' }, // Azure Neural TTS
    llm: { systemPrompt: 'आप एक सहायक हिंदी सहायक हैं। संक्षिप्त में उत्तर दें।' }
  }
};

async function detectAndProcessLanguage(audioStream: AsyncIterable<Buffer>): Promise<void> {
  // 第一步:语言检测。可以使用专门的检测服务,或利用支持自动检测的STT(如Whisper)
  const detectionResult = await voiceAgent.generate({
    input: { audio: audioStream },
    tools: [{
      name: 'speechToText',
      provider: 'whisper',
      config: { 
        task: 'detect_language' // 假设Whisper支持此参数,或使用其返回的语言字段
      }
    }]
  });

  const detectedLang = detectionResult.toolResults?.speechToText?.language || 'en';
  console.log(`🌐 检测到语言: ${detectedLang}`);

  const profile = languageProfiles[detectedLang] || languageProfiles['en']; // 回退到英语

  // 第二步:使用检测到的语言配置创建或更新Agent
  const multilingualAgent = new NeuroLink({
    provider: 'anthropic',
    model: 'claude-3-5-sonnet',
    systemPrompt: profile.llm.systemPrompt,
    tools: [
      { name: 'speechToText', provider: profile.stt.provider, config: profile.stt },
      { name: 'textToSpeech', provider: profile.tts.provider, config: profile.tts }
    ]
  });

  // 第三步:用配置好的Agent处理完整的语音请求(这里需要重新获取音频流,演示逻辑)
  // 实际应用中,可能需要将音频流缓存或复制一份用于检测和处理。
  console.log(`🗣️ 将使用 ${profile.tts.voiceId} 的声音进行回复。`);
  // ... 后续使用 multilingualAgent 进行流式处理 ...
}

多语言处理的核心在于动态配置。我们根据检测到的语言代码,选择对应的STT引擎(可能对特定语言优化更好)、TTS语音(需要匹配语言的发音和语调)以及LLM的系统提示词(引导AI用该语言思考和回答)。这确保了从识别、思考到回复,整个流水线都适配目标语言。

4.3 生产环境优化:延迟、成本与监控

将语音助手上线到生产环境,必须考虑性能、成本和可观测性。

延迟优化策略

延迟是语音交互体验的死敌。以下是几个关键的优化点及其效果估算:

优化策略 实现方法 预计延迟降低 注意事项
流式STT 使用支持流式识别的STT服务(如Deepgram Streaming),边录边转。 200-500ms 需要处理中间结果和修正。
LLM流式响应 使用LLM的流式API,首个令牌生成时间(TTFT)是关键。 300-1000ms 选择TTFT低的模型,如Claude Haiku。
逐句TTS 如本文所述,在LLM生成完整句子后立即合成,无需等待全文。 500-1500ms 句子切分算法影响体验,避免切分不当。
内存缓存 使用Redis等缓存对话历史,避免每次重新构建长上下文。 50-200ms 节省LLM上下文窗口的令牌数,间接降低处理时间。
网络优化 使用WebSocket代替HTTP,减少连接开销;服务部署靠近用户。 50-200ms 全球部署或使用CDN。
TTS预加载 在LLM生成第一个句子时,预加载TTS引擎或连接。 100-300ms 增加资源开销。

成本跟踪与预算控制

语音AI的成本主要来自三部分:STT(按音频时长)、LLM(按输入/输出令牌数)、TTS(按合成字符数)。NeuroLink提供了钩子来监控这些指标。

// 成本监控中间件
function createCostTrackingMiddleware(agent: NeuroLink) {
  let totalCost = 0;
  const costConfig = {
    stt: { 'whisper': 0.006, 'deepgram': 0.004 }, // 美元/分钟
    llm: { 'claude-3-5-sonnet': { input: 3.0, output: 15.0 } }, // 美元/百万令牌
    tts: { 'elevenlabs': 0.00018, 'openai': 0.015 } // 美元/千字符
  };

  agent.on('usage', (event) => {
    const { sttProvider, sttSeconds, llmProvider, inputTokens, outputTokens, ttsProvider, ttsCharacters } = event;

    let cost = 0;
    if (sttSeconds && sttProvider) {
      cost += (sttSeconds / 60) * (costConfig.stt[sttProvider] || 0);
    }
    if (inputTokens && llmProvider) {
      cost += (inputTokens / 1_000_000) * (costConfig.llm[llmProvider]?.input || 0);
      cost += (outputTokens / 1_000_000) * (costConfig.llm[llmProvider]?.output || 0);
    }
    if (ttsCharacters && ttsProvider) {
      cost += (ttsCharacters / 1000) * (costConfig.tts[ttsProvider] || 0);
    }

    totalCost += cost;
    console.log(`💰 本次请求估算成本: $${cost.toFixed(6)}`);
    console.log(`📊 累计会话成本: $${totalCost.toFixed(4)}`);

    // 可以设置预算警报
    if (totalCost > 5.0) { // 例如,单会话预算5美元
      console.warn('⚠️ 会话成本已超过$5,考虑结束会话或切换至低成本模型。');
    }
  });

  return { getTotalCost: () => totalCost };
}

健壮的错误处理

生产环境必须能优雅地处理各种故障。

const resilientAgent = new NeuroLink({
  provider: 'anthropic',
  model: 'claude-3-haiku-20240307', // 生产环境可考虑使用更快、更便宜的模型
  fallback: {
    providers: [
      { provider: 'whisper', priority: 1 },
      { provider: 'deepgram', priority: 2 },
      { provider: 'assembly', priority: 3 }
    ],
    strategy: 'sequential', // 顺序回退,也可用'best-of-n'
    onFallback: (from, to) => console.warn(`STT从 ${from} 回退至 ${to}`)
  },
  retry: {
    attempts: 2,
    backoff: 'exponential',
    maxDelay: 3000,
    retryableErrors: ['NETWORK_ERROR', 'TIMEOUT', 'RATE_LIMIT']
  },
  timeout: 10000, // 全局超时10秒
});

// 包装调用,增加应用层重试和降级逻辑
async function robustGenerateWithFallback(input, options) {
  try {
    return await resilientAgent.generate(input, options);
  } catch (error) {
    console.error('主流程失败:', error);

    // 降级策略1:如果TTS失败,尝试换一个提供商
    if (error.message.includes('TTS')) {
      console.log('尝试降级到备用TTS...');
      return await resilientAgent.generate({
        ...input,
        tools: input.tools.map(tool =>
          tool.name === 'textToSpeech' ? { ...tool, provider: 'openai' } : tool
        )
      });
    }

    // 降级策略2:如果LLM失败,返回一个预设的兜底回复
    if (error.message.includes('LLM') || error.message.includes('provider')) {
      console.log('LLM服务异常,使用兜底回复。');
      return {
        toolResults: {
          textToSpeech: {
            audio: await getFallbackAudio('抱歉,我现在有点忙,请稍后再试。')
          }
        }
      };
    }

    throw error; // 其他错误向上抛出
  }
}

5. Web集成与客户端实现

将语音AI能力集成到Web浏览器中,可以构建出无需安装的实时语音应用。这里涉及服务器端的WebSocket服务和客户端的音频采集/播放。

5.1 服务器端WebSocket网关

服务器端负责接收客户端的音频数据流,调用NeuroLink处理,并将合成的音频流返回。

// server/voice-websocket.ts
import { WebSocketServer, WebSocket } from 'ws';
import { NeuroLink } from '@juspay/neurolink';
import { IncomingMessage } from 'http';

interface VoiceSession {
  ws: WebSocket;
  agent: NeuroLink;
  sessionId: string;
}

const wss = new WebSocketServer({ port: 8080 });
const activeSessions = new Map<string, VoiceSession>();

wss.on('connection', (ws: WebSocket, request: IncomingMessage) => {
  const sessionId = `ws_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  console.log(`🔗 新WebSocket连接建立,会话ID: ${sessionId}`);

  // 为每个连接创建独立的NeuroLink实例,隔离会话状态
  const agent = new NeuroLink({
    provider: 'anthropic',
    model: 'claude-3-haiku',
    memory: { enabled: true, backend: 'redis', ttl: 1800 },
    systemPrompt: '你是一个在浏览器中运行的语音助手。回答要非常简短。',
  });

  const session: VoiceSession = { ws, agent, sessionId };
  activeSessions.set(sessionId, session);

  ws.on('message', async (data: Buffer, isBinary) => {
    if (!isBinary) {
      ws.send(JSON.stringify({ error: '仅支持二进制音频数据' }));
      return;
    }

    try {
      // 将收到的二进制数据视为音频流(这里简化处理,实际可能是WebM/Opus格式)
      // 创建一个模拟的异步迭代器来适配NeuroLink的音频输入
      const audioIterable = (async function* () {
        yield data; // 这里简单处理,实际应处理分块传输
      })();

      const result = await agent.generate({
        input: {
          audio: audioIterable,
          mimeType: 'audio/webm; codecs=opus', // 告知服务器音频格式
        },
        session: { id: sessionId },
        tools: ['speechToText', 'textToSpeech'],
        output: { formats: ['audio'] }, // 我们只需要返回音频
      });

      const audioBase64 = result.toolResults?.textToSpeech?.audio;
      if (audioBase64) {
        // 将Base64音频数据以二进制形式发送回客户端
        const audioBuffer = Buffer.from(audioBase64, 'base64');
        ws.send(audioBuffer, { binary: true });
      } else {
        ws.send(JSON.stringify({ error: 'TTS合成失败' }));
      }
    } catch (error) {
      console.error(`处理会话 ${sessionId} 的音频时出错:`, error);
      ws.send(JSON.stringify({ error: '内部服务器错误' }));
    }
  });

  ws.on('close', () => {
    console.log(`连接关闭,会话ID: ${sessionId}`);
    activeSessions.delete(sessionId);
    // 可选:清理该会话在Redis中的记忆
  });

  ws.on('error', (error) => {
    console.error(`会话 ${sessionId} 的WebSocket错误:`, error);
  });
});

console.log('🚀 WebSocket语音服务器运行在 ws://localhost:8080');

5.2 浏览器端音频采集与播放

客户端使用Web Audio API和MediaRecorder来捕获麦克风音频,并通过WebSocket发送。

<!-- client/index.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <title>网页语音助手</title>
</head>
<body>
    <button id="startBtn">开始对话</button>
    <button id="stopBtn" disabled>停止</button>
    <p id="status">准备就绪</p>
    <script type="module">
        const startBtn = document.getElementById('startBtn');
        const stopBtn = document.getElementById('stopBtn');
        const statusEl = document.getElementById('status');

        let mediaRecorder;
        let audioChunks = [];
        let ws;
        let audioContext;
        let isRecording = false;

        startBtn.onclick = async () => {
            statusEl.textContent = '正在请求麦克风权限...';
            try {
                const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
                statusEl.textContent = '正在连接服务器...';

                // 连接WebSocket服务器
                ws = new WebSocket('ws://localhost:8080');
                setupWebSocket(ws);

                // 配置MediaRecorder,使用Opus编码以减小数据量
                const options = { mimeType: 'audio/webm; codecs=opus' };
                mediaRecorder = new MediaRecorder(stream, options);

                mediaRecorder.ondataavailable = (event) => {
                    if (event.data.size > 0) {
                        audioChunks.push(event.data);
                        // 将音频数据发送到服务器
                        if (ws.readyState === WebSocket.OPEN) {
                            ws.send(event.data);
                        }
                    }
                };

                mediaRecorder.onstart = () => {
                    isRecording = true;
                    startBtn.disabled = true;
                    stopBtn.disabled = false;
                    statusEl.textContent = '🎤 正在聆听...';
                    audioChunks = [];
                };

                mediaRecorder.onstop = () => {
                    isRecording = false;
                    startBtn.disabled = false;
                    stopBtn.disabled = true;
                    statusEl.textContent = '准备就绪';
                };

                // 每500毫秒发送一个数据块,平衡实时性和网络负载
                mediaRecorder.start(500);
                statusEl.textContent = '已连接,请开始说话。';

            } catch (err) {
                console.error('初始化失败:', err);
                statusEl.textContent = `错误: ${err.message}`;
            }
        };

        stopBtn.onclick = () => {
            if (mediaRecorder && isRecording) {
                mediaRecorder.stop();
                // 停止所有音频轨道
                mediaRecorder.stream.getTracks().forEach(track => track.stop());
            }
            if (ws) {
                ws.close();
            }
        };

        function setupWebSocket(socket) {
            socket.binaryType = 'arraybuffer'; // 接收二进制音频数据

            socket.onopen = () => {
                console.log('WebSocket连接已打开');
            };

            socket.onmessage = async (event) => {
                if (event.data instanceof ArrayBuffer) {
                    // 接收到服务器返回的音频数据
                    statusEl.textContent = '🔊 正在播放回复...';
                    await playAudioBuffer(event.data);
                    statusEl.textContent = isRecording ? '🎤 正在聆听...' : '准备就绪';
                } else {
                    // 可能是文本格式的错误信息
                    try {
                        const msg = JSON.parse(event.data);
                        if (msg.error) {
                            statusEl.textContent = `服务器错误: ${msg.error}`;
                        }
                    } catch {
                        console.log('收到非预期消息:', event.data);
                    }
                }
            };

            socket.onerror = (error) => {
                console.error('WebSocket错误:', error);
                statusEl.textContent = '连接出错';
            };

            socket.onclose = () => {
                console.log('WebSocket连接关闭');
                if (isRecording) {
                    mediaRecorder.stop();
                }
            };
        }

        async function playAudioBuffer(arrayBuffer) {
            if (!audioContext) {
                audioContext = new (window.AudioContext || window.webkitAudioContext)();
            }
            // 解码接收到的音频数据(假设服务器返回的是WAV或MP3等格式,需与服务器端TTS输出格式匹配)
            // 更简单的方案:服务器直接返回Base64,客户端使用Audio元素播放
            // 这里演示更高效的Web Audio API方式
            try {
                const audioBuffer = await audioContext.decodeAudioData(arrayBuffer);
                const source = audioContext.createBufferSource();
                source.buffer = audioBuffer;
                source.connect(audioContext.destination);
                source.start();
                return new Promise(resolve => {
                    source.onended = resolve;
                });
            } catch (decodeError) {
                console.error('音频解码失败:', decodeError);
                // 降级方案:假设是Base64字符串
                const base64String = new TextDecoder().decode(arrayBuffer);
                if (base64String.startsWith('data:audio')) {
                    const audio = new Audio(base64String);
                    await audio.play();
                }
            }
        }
    </script>
</body>
</html>

这个Web示例展示了基本的双向音频流。客户端录制音频并分块发送,服务器处理并返回合成音频,客户端实时播放。实际生产中,需要考虑音频编码格式的统一(如前后端都使用Opus)、错误重连、心跳保活、以及更复杂的会话管理。

6. 常见问题与调试技巧

在开发和部署语音AI应用的过程中,我踩过不少坑。这里总结一些典型问题和解决方法,希望能帮你节省时间。

6.1 音频流处理与格式问题

问题1:NeuroLink报错,提示音频格式不支持。 排查 :首先确认你传递给 input.audio 的数据到底是什么。它应该是一个能产出 Buffer AsyncIterable 。如果你从麦克风直接获取的是 Buffer ,需要将其包装成异步生成器。其次,检查音频参数(采样率、声道数、位深)是否与STT服务的要求匹配。Whisper通常期望16kHz、单声道、16位的PCM数据。 解决 :在录音时明确指定参数,并在送入NeuroLink前进行必要的重采样或格式转换。

// 确保录音配置正确
const recorder = new Recorder({
  sampleRate: 16000, // 必须
  channels: 1,       // 必须
  bitDepth: 16,      // 推荐
  audioType: 'wav'
});

// 如果来源格式不符,需要进行转换(例如使用`sox`或`ffmpeg`库)

问题2:TTS合成的语音听起来不连贯,有奇怪的停顿或语调。 排查 :这通常是由于文本切分不当造成的。我们的逐句合成逻辑依赖于标点符号来切分句子。如果LLM生成的文本包含大量的逗号、破折号,或者没有正确使用句号,切分就会出错。 解决 :优化句子切分算法。可以结合标点、句子长度、甚至简单的自然语言处理(NLP)规则来更智能地切分。或者,考虑使用支持SSML(语音合成标记语言)的TTS服务,通过插入 <break> 等标签来精确控制停顿。

// 更健壮的句子切分函数
function splitIntoSentences(text: string): string[] {
  // 简单规则:按句号、问号、感叹号切分,但排除缩写中的点(如Mr.)
  const regex = /(?<!\b(?:Mr|Mrs|Ms|Dr|Prof|etc|vs)\.)(?<=[.!?])\s+/;
  return text.split(regex).filter(s => s.trim().length > 0);
}

6.2 延迟与性能优化

问题3:从用户说完到听到回复,延迟感觉非常长(>3秒)。 排查 :需要分段测量延迟。使用 console.time 在关键节点打点。

  1. STT延迟 :从发送音频到收到完整转写文本的时间。
  2. LLM首次令牌时间 :从发送文本到收到LLM第一个响应块的时间。
  3. TTS首字节时间 :从发送文本到收到第一段音频数据的时间。
  4. 网络往返时间 解决
  • STT :确保使用流式识别,并考虑在用户说话中途就开始发送音频,而不是等说完。
  • LLM :选择TTFT更低的模型(如Claude Haiku vs. Sonnet)。精简系统提示词和上下文长度。
  • TTS :使用逐句合成,并考虑在LLM生成第一个句子时,就预建立TTS连接。
  • 网络 :将所有服务(STT、LLM、TTS)部署在同一个云区域,或使用全球加速网络。

问题4:在高并发下,应用响应变慢或出错。 排查 :检查服务器资源(CPU、内存、网络IO)。查看NeuroLink客户端或后端服务是否有连接池限制。 解决

  • 实现连接池或复用NeuroLink实例(注意会话隔离)。
  • 对昂贵的操作(如TTS合成)引入本地缓存,对相同的文本直接返回缓存音频。
  • 考虑使用消息队列(如RabbitMQ)将音频处理任务异步化,避免阻塞实时请求。

6.3 记忆与上下文管理

问题5:AI似乎忘记了对话刚开始的内容。 排查 :确认 sessionId 在同一个对话中是否保持不变。检查配置的 memory.backend (如Redis)是否正常运行,且TTL设置合理。确认在每次调用 stream() generate() 时都传入了相同的 session 对象。 解决

// 确保为每个用户/对话使用稳定且唯一的sessionId
function getSessionId(userId: string): string {
  // 可以从数据库或缓存中获取持久化的sessionId
  // 或者使用 userId + 时间窗口 的组合
  return `session_${userId}_${Math.floor(Date.now() / (1000 * 60 * 30))}`; // 每30分钟一个新会话
}

// 在每次调用时传入
const response = await voiceAgent.stream({
  input: { audio: stream },
  session: { id: getSessionId(currentUserId) }, // 关键!
  // ...
});

问题6:上下文太长导致LLM调用昂贵且速度慢。 排查 :NeuroLink的memory后端会存储完整的对话历史。如果对话轮数很多,每次请求携带的上下文令牌数会暴涨。 解决

  • 在NeuroLink配置中设置 maxContextTokens ,自动截断过长的历史。
  • 实现自定义的记忆摘要功能:定期让LLM自己总结之前的对话要点,然后用摘要替换掉原始的长历史。
  • 对于客服等场景,可以主动在合适时机(如问题解决后)清空或重置会话。

6.4 成本控制与监控

问题7:账单出乎意料地高。 排查 :使用前面提到的成本监控钩子,分析是STT、LLM还是TTS占了大头。检查是否有意外的循环调用或调试代码在大量生成请求。 解决

  • STT :务必集成VAD,避免在静音时持续调用STT。
  • LLM :使用更便宜的模型处理简单查询(设置路由逻辑)。优化提示词,减少不必要的输出。
  • TTS :缓存常用回复的音频。对于长文本,考虑是否真的需要全部合成,或许可以提示用户查看文字版。
  • 设置硬性预算和告警,在成本超支时自动切换至降级模式或停止服务。

构建语音AI应用是一个涉及音频处理、实时流、AI推理和用户体验的复杂工程。NeuroLink通过提供一个统一的TypeScript SDK,极大地简化了核心流水线的搭建。然而,要打造一个真正流畅、智能、健壮的生产级应用,你仍然需要在延迟优化、错误处理、成本控制和用户体验细节上投入大量精力。希望这篇结合了原理、代码和实战经验的内容,能为你提供一个坚实的起点。记住,从最简单的“回声”测试开始(录制语音,转文字,再原样合成播放),逐步添加LLM、记忆、打断等高级功能,是迭代开发这类复杂系统的最佳路径。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐