基于agents-flex构建高并发智能客服系统的实战指南
通过这次基于 agents-flex 的智能客服系统重构,我们成功解决了老系统在并发、准确率和弹性方面的核心痛点。agents-flex 在分布式和异步处理上的原生支持,让我们能够更专注于业务逻辑的实现,而非底层基础设施的搭建。目前系统已稳定运行了数月,经历了多次营销活动的流量考验。当然,没有完美的方案。agents-flex 作为一个较新的框架,其社区生态和工具链相比 Rasa 还有差距,有些高
最近在做一个智能客服系统的重构项目,之前的老系统是基于规则匹配的,一到业务高峰期就卡顿、响应慢,意图识别也经常出错,用户体验很不好。经过一番技术选型,我们最终决定采用 agents-flex 框架来构建新一代的高并发智能客服系统。这篇文章就来分享一下我们的实战经验,从架构设计到代码实现,再到性能调优和踩过的坑,希望能给有类似需求的开发者一些参考。

1. 为什么选择 agents-flex?先聊聊传统方案的痛点
我们之前的客服系统,问题主要集中在三个方面:
1.1 并发处理能力弱 规则引擎在处理大量并发请求时,由于大量字符串匹配和逻辑判断,CPU消耗巨大,响应时间(RT)会线性增长。当QPS超过500时,平均响应延迟就从几十毫秒飙升到秒级,用户体验急剧下降。
1.2 意图识别准确率低 基于关键词和正则的规则,无法理解用户问句的真实意图。比如用户问“我怎么退不了款?”,规则可能只匹配到“退款”关键词,但无法区分这是“咨询退款流程”还是“投诉退款失败”。这导致大量问题需要转人工,客服成本居高不下。
1.3 动态扩缩容困难 老系统是单体架构,状态(如用户会话上下文)存在本地内存。想要水平扩展,就得引入复杂的会话同步机制,或者改成无状态设计,改动成本非常高,几乎无法应对“双十一”这类流量突增的场景。
基于这些痛点,我们开始寻找新的解决方案。我们横向对比了几个主流框架:
- Rasa: 开源,NLU和对话管理功能强大,但部署和运维相对复杂,分布式会话管理需要自己基于Redis等中间件实现,在高并发场景下的性能调优有一定门槛。
- Dialogflow (Google): 云服务,开箱即用,意图识别准确率高,但定制化能力受限,数据隐私性要求高的场景不适合,且成本随调用量增长。
- agents-flex: 一个新兴的、专注于高并发和分布式场景的智能体框架。它的核心优势在于原生的分布式设计和高性能的异步推理管道。其上下文管理模块天生支持与Redis等分布式存储集成,NLU模块可以方便地集成BERT等预训练模型并进行异步批处理,非常适合我们构建高并发、可弹性伸缩的客服系统。
综合考虑定制化需求、性能、成本和部署灵活性,我们最终选择了 agents-flex。
2. 核心实现:用 agents-flex 搭建智能客服骨架
我们的新系统主要分为两大模块:意图识别模块和会话状态管理模块。
2.1 意图识别模块实现
这是智能客服的“大脑”。我们利用 agents-flex 的 NLUProcessor 来构建一个异步的意图分类流水线。
import asyncio
from typing import Dict, List
import numpy as np
from agents_flex.nlu import NLUProcessor, Intent
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F
class CustomerServiceIntentRecognizer(NLUProcessor):
"""
自定义的客服意图识别处理器。
继承自 agents-flex 的 NLUProcessor,实现异步推理。
"""
def __init__(self, model_path: str, intent_labels: List[str]):
super().__init__()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 加载预训练的BERT分类模型和分词器
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model = AutoModelForSequenceClassification.from_pretrained(model_path).to(self.device)
self.model.eval() # 设置为评估模式
self.intent_labels = intent_labels # 意图标签列表,如 ['查询订单', '申请退款', '投诉建议']
async def preprocess(self, text: str) -> Dict:
"""异步文本预处理:分词、编码"""
# 使用tokenizer将文本转换为模型输入
inputs = self.tokenizer(text,
truncation=True,
padding='max_length',
max_length=128,
return_tensors="pt")
# 将数据转移到指定设备(GPU/CPU)
return {k: v.to(self.device) for k, v in inputs.items()}
async def inference(self, preprocessed_data: Dict) -> np.ndarray:
"""异步模型推理"""
with torch.no_grad(): # 禁用梯度计算,提升推理速度
outputs = self.model(**preprocessed_data)
logits = outputs.logits
# 使用softmax获取概率分布
probabilities = F.softmax(logits, dim=-1)
return probabilities.cpu().numpy()
async def postprocess(self, inference_result: np.ndarray) -> Intent:
"""后处理:将模型输出转换为Intent对象"""
# 获取概率最高的意图索引
predicted_idx = np.argmax(inference_result, axis=-1)[0]
confidence = inference_result[0][predicted_idx]
intent_name = self.intent_labels[predicted_idx]
# 返回 agents-flex 定义的 Intent 对象
return Intent(
name=intent_name,
confidence=float(confidence),
entities={} # 本例暂不处理实体抽取
)
# 使用示例
async def main():
recognizer = CustomerServiceIntentRecognizer(
model_path="./models/bert_customer_service",
intent_labels=["查询订单", "申请退款", "产品咨询", "投诉建议", "其他"]
)
user_query = "我昨天买的手机什么时候能发货?"
intent = await recognizer.process(user_query)
print(f"识别意图: {intent.name}, 置信度: {intent.confidence:.4f}")
if __name__ == "__main__":
asyncio.run(main())
这段代码的核心是继承并实现了 NLUProcessor 的三个异步方法。preprocess 负责文本向量化,inference 调用模型,postprocess 将模型输出转化为框架能理解的 Intent 对象。这种设计使得模型加载、推理可以完全异步化,轻松融入 agents-flex 的异步处理管道,为高并发打下基础。
2.2 会话状态管理设计
对于高并发客服系统,会话状态(如对话历史、用户信息、当前处理进度)必须持久化到外部存储,并保证在多实例环境下的数据一致性。我们采用 Redis Cluster + 分布式锁 的方案。
- 存储结构:每个用户会话用一个唯一的
session_id作为 Redis Key,Value 是一个 Hash 结构,存储对话轮次、历史消息、提取的槽位(Slots)信息等。 - 分布式锁:当需要更新某个会话的状态时(例如,填充一个用户提供的订单号),使用 Redlock 算法在 Redis 上获取一个针对该
session_id的分布式锁,防止多个请求同时修改导致状态错乱。
import json
import aioredis
from redis.asyncio import RedisCluster
from redis.asyncio.lock import Lock
class DistributedSessionManager:
def __init__(self, redis_nodes):
# 连接 Redis 集群
self.redis_client = RedisCluster.from_nodes(redis_nodes)
async def get_session(self, session_id: str) -> Dict:
"""获取会话状态"""
data = await self.redis_client.hgetall(f"cs_session:{session_id}")
return {k.decode(): json.loads(v.decode()) for k, v in data.items()} if data else {}
async def update_session(self, session_id: str, updates: Dict, ttl: int = 1800):
"""更新会话状态(使用分布式锁保证原子性)"""
lock_key = f"cs_session_lock:{session_id}"
# 获取分布式锁,超时时间5秒
lock = Lock(self.redis_client, lock_key, timeout=5)
try:
if await lock.acquire():
# 获取当前状态
current_state = await self.get_session(session_id)
# 合并更新
current_state.update(updates)
# 写回Redis,并设置TTL(30分钟无活动则过期)
pipe = self.redis_client.pipeline()
for key, value in current_state.items():
pipe.hset(f"cs_session:{session_id}", key, json.dumps(value))
pipe.expire(f"cs_session:{session_id}", ttl)
await pipe.execute()
else:
raise Exception(f"Acquire lock failed for session: {session_id}")
finally:
await lock.release() # 释放锁
async def add_message(self, session_id: str, role: str, content: str):
"""向会话中添加一条消息记录"""
message = {"role": role, "content": content, "timestamp": time.time()}
# 使用列表存储历史消息,只保留最近50条
await self.redis_client.lpush(f"cs_session:{session_id}:messages", json.dumps(message))
await self.redis_client.ltrim(f"cs_session:{session_id}:messages", 0, 49)
通过这套设计,我们的客服系统实现了无状态化,任何一个服务实例都可以处理任何用户的请求,只需从 Redis 集群中读取/写入对应的会话上下文即可,为水平扩容扫清了障碍。
3. 性能优化:从压测数据到冷启动
架构搭好了,性能如何呢?我们进行了详细的压力测试。
3.1 压测数据对比
我们使用 Locust 模拟了从 100 到 2000 的并发用户,对“意图识别+会话更新”这个核心链路进行压测。
| 并发用户数 | 平均响应时间 (ms) | QPS (每秒查询率) | 错误率 |
|---|---|---|---|
| 100 | 45 | 2200 | 0% |
| 500 | 68 | 7350 | 0% |
| 1000 | 120 | 8330 | 0.1% |
| 2000 | 250 | 8000 | 0.5% |
(测试环境:4台 8核16G 的云服务器,Redis Cluster 3主3从,模型为裁剪后的BERT-base)
可以看到,在并发1000以内时,系统表现非常稳定,QPS线性增长。超过1000后,响应时间有所上升,主要瓶颈出现在模型推理的GPU资源上。但整体QPS维持在8000+,完全满足我们日常万级并发的需求,峰值时通过快速扩容实例也能应对。
3.2 冷启动优化
NLU模型通常比较大(几百MB到几GB),如果每次服务启动或实例扩容时才加载,会导致前几分钟的请求全部超时失败。我们采用了 模型预热加载 和 健康检查隔离 的策略。
- 预热加载:在服务启动的初始化阶段,在
__init__方法中同步加载模型和分词器到内存/显存。虽然这会稍微增加启动时间,但避免了第一个请求的“冷启动惩罚”。 - 就绪探针 (Readiness Probe):在 Kubernetes 的 Deployment 配置中,设置一个就绪探针。该探针会在服务初始化完成后(即模型加载完毕),才返回成功。K8s 只有在就绪探针通过后,才会将流量导入该 Pod。这样就确保了所有对外服务的实例都是“热”的。
# Kubernetes Deployment 配置片段示例
spec:
containers:
- name: cs-nlu-service
image: my-registry/cs-nlu:v1.0
readinessProbe:
httpGet:
path: /health/ready # 服务内部实现的就绪检查端点
port: 8080
initialDelaySeconds: 30 # 给予足够的模型加载时间
periodSeconds: 5
4. 避坑指南:生产环境中的那些“坑”
在实际上线和运行过程中,我们遇到了几个典型问题,这里分享下解决方案。
4.1 对话超时处理
用户可能中途离开,导致会话长时间挂起。如果不处理,会浪费大量Redis存储和会话锁资源。 我们的最佳实践是双层超时机制:
- 会话级TTL:如上文代码所示,每次更新会话时,刷新 Redis Key 的 TTL(例如30分钟)。超过30分钟无互动,会话自动过期被清理。
- 轮次级超时:在对话逻辑内部,对于每一个等待用户回复的“槽位填充”步骤,设置一个更短的超时(如3分钟)。如果超时,则触发超时处理流程,例如发送提示“您还在吗?”,或者将会话状态重置,并释放分布式锁。
4.2 敏感词过滤的合规实现
客服对话必须符合监管要求。我们并没有在NLU模型层面处理,而是在预处理和后处理之间插入了一个异步的过滤组件。
- 异步过滤:在
preprocess之后,inference之前,调用一个高效的异步敏感词过滤服务(例如基于DFA算法)。如果命中,则直接返回一个特定的“内容违规”意图,并终止后续的模型推理流程。 - 词库热更新:过滤词库存储在 Redis 中,后台管理页面更新词库后,通过 Pub/Sub 通知所有服务实例实时 reload,无需重启服务。
class SensitiveFilterMiddleware:
async def filter(self, text: str) -> bool:
"""返回True表示包含敏感词"""
# 这里调用内部的敏感词检测服务
# 可以是基于本地DFA树的快速检查
pass
# 在 IntentRecognizer 的 process 方法中集成
async def process(self, text: str) -> Intent:
# 1. 敏感词检查
if await self.filter_middleware.filter(text):
return Intent(name="content_violation", confidence=1.0)
# 2. 正常流程:预处理、推理、后处理
preprocessed = await self.preprocess(text)
# ... 后续流程
5. 互动体验:欢迎来测压我们的Demo
纸上得来终觉浅。我们部署了一个简化版的演示系统,并开放了测试 API 端点,你可以体验不同并发下的系统表现。
API 端点:POST https://demo.custservice.com/v1/recognize 请求体:
{
"session_id": "your_test_session",
"query": "我要投诉物流太慢了!"
}
你可以使用 wrk 或 JMeter 等工具,对这个端点进行压测,观察响应时间和成功率的变化。我们配置了自动扩缩容策略,当 CPU 利用率持续超过70%时,会自动增加实例,你可以看到响应时间曲线如何随着实例数增加而变得平缓。

总结
通过这次基于 agents-flex 的智能客服系统重构,我们成功解决了老系统在并发、准确率和弹性方面的核心痛点。agents-flex 在分布式和异步处理上的原生支持,让我们能够更专注于业务逻辑的实现,而非底层基础设施的搭建。目前系统已稳定运行了数月,经历了多次营销活动的流量考验。
当然,没有完美的方案。agents-flex 作为一个较新的框架,其社区生态和工具链相比 Rasa 还有差距,有些高级功能需要自己动手实现。但它在性能和高并发场景下的表现,确实令人印象深刻。如果你也在为构建高性能、可扩展的对话系统而烦恼,agents-flex 绝对值得你花时间深入了解一下。
更多推荐

所有评论(0)