基于扣子构建AI智能客服的架构设计与性能优化实战
在评估阶段,我们重点对比了扣子(Coze)、Rasa(开源)和Dialogflow(Google)。NLU处理能力与准确率:这是核心。扣子内置了基于类似BERT的预训练模型进行意图识别和槽位填充。在我们的测试集(5000条客服对话)上,其意图识别准确率达到了96.5%,显著高于我们自研模型(85%)和Rasa with DIET(约92%)。Dialogflow表现也不错(约95%),但其对中文特
最近在做一个智能客服系统的重构项目,客户对响应速度和意图识别的准确性要求非常高。传统的基于规则或早期开源框架的客服系统,在应对突发流量和多轮复杂对话时,常常力不从心。经过一番技术选型和实践,我们最终基于扣子(Coze)平台构建了一套新的AI智能客服系统,在性能和成本上都取得了不错的效果。这里把整个架构设计和优化过程记录下来,希望能给有类似需求的同学一些参考。

背景痛点:传统客服系统的瓶颈在哪里?
在项目初期,我们深入分析了现有系统的痛点,主要集中在三个方面:
- 并发请求处理能力弱:在促销活动期间,客服请求量会瞬间飙升。原有系统基于同步阻塞的Web框架,数据库连接池也配置不足,导致大量请求排队,平均响应延迟(P95)从平时的2秒飙升至15秒以上,用户体验急剧下降。
- 多轮对话管理混乱:对于需要多次交互才能完成的复杂任务(如订单修改、复杂产品咨询),传统系统依赖会话ID在数据库或内存中维护上下文。这种方式在服务重启或分布式部署时,容易出现状态丢失或不同步的问题,导致对话逻辑断裂。
- 意图识别准确率天花板低:我们之前使用的是基于词向量和传统机器学习模型(如SVM)的NLU模块。对于口语化、多义词或组合意图(例如“我想退掉昨天买的那个黑色的手机壳”)的识别准确率很难突破85%,大量请求需要转人工,不仅成本高,也影响了自动化率。
这些瓶颈直接导致了高昂的人力成本和低下的服务效率,因此,寻找一个在NLU能力、并发性能和开发效率上都有优势的平台,成为了我们的核心目标。
技术选型:为什么是扣子(Coze)?
在评估阶段,我们重点对比了扣子(Coze)、Rasa(开源)和Dialogflow(Google)。以下是我们从几个关键维度进行的量化对比:
- NLU处理能力与准确率:这是核心。扣子内置了基于类似BERT的预训练模型进行意图识别和槽位填充。在我们的测试集(5000条客服对话)上,其意图识别准确率达到了96.5%,显著高于我们自研模型(85%)和Rasa with DIET(约92%)。Dialogflow表现也不错(约95%),但其对中文特定场景的适应性稍弱。
- API响应延迟:我们使用相同网络环境,对处理单轮简单查询的API端点进行压测(100并发)。扣子API的平均响应时间为120ms,P99为210ms。Rasa(启用TensorFlow Serving)平均为180ms,P99为350ms。Dialogflow平均为150ms,但存在因网络导致的偶尔高延迟(P99超过500ms)。扣子的响应稳定性和低延迟表现突出。
- 定制化与集成能力:Rasa开源,定制化能力最强,但需要投入大量机器学习工程和运维资源。Dialogflow黑盒化程度高,定制复杂逻辑和与企业内部系统(如CRM、订单库)深度集成较麻烦。扣子提供了灵活的插件(Plugin)和工作流(Workflow)机制,可以通过Python代码轻松对接内部API和数据库,在易用性和灵活性之间取得了很好的平衡。
综合来看,扣子凭借其优秀的开箱即用的中文NLU能力、稳定的低延迟API以及友好的集成方式,成为了我们项目的最优解。
核心实现:构建健壮的对话引擎
选定平台后,我们开始构建系统的核心——对话引擎。其关键在于维护连贯的会话上下文和实现高效的异步处理。
使用Python SDK构建对话状态机
扣子的Python SDK是交互的核心。我们在此基础上封装了一个对话状态管理器,负责维护会话生命周期和上下文。
import hashlib
import json
from typing import Dict, Any, Optional
from coze import CozeClient
class DialogueStateManager:
def __init__(self, coze_client: CozeClient, redis_client):
self.client = coze_client
self.redis = redis_client
# 上下文在Redis中的过期时间,设为30分钟
self.context_ttl = 1800
def _get_context_key(self, session_id: str) -> str:
"""生成上下文存储的Redis键"""
return f"coze:context:{session_id}"
def process_message(self, session_id: str, user_input: str) -> Dict[str, Any]:
"""
处理用户输入,维护并返回对话响应。
时间复杂度:O(1) 的Redis访问 + O(n)的Coze API调用(n为输入长度)。
"""
# 1. 从Redis获取历史上下文
context_key = self._get_context_key(session_id)
history_context = self.redis.get(context_key)
if history_context:
# 将历史对话列表加载回来
dialogue_history = json.loads(history_context)
else:
# 新会话,初始化空历史
dialogue_history = []
# 2. 调用Coze API,将历史上下文作为参数传入
# 扣子SDK支持在消息中携带历史,以实现多轮对话理解
response = self.client.chat(
message=user_input,
dialogue_history=dialogue_history, # 关键:传入历史
user_id=session_id # 用于平台端分析
)
# 3. 更新上下文:将本轮Q&A加入历史
dialogue_history.append({"role": "user", "content": user_input})
dialogue_history.append({"role": "assistant", "content": response['content']})
# 为防止历史过长,限制只保留最近10轮对话
if len(dialogue_history) > 20: # 10轮对话,每轮2条消息
dialogue_history = dialogue_history[-20:]
# 4. 将更新后的上下文存回Redis,并刷新TTL
self.redis.setex(context_key, self.context_ttl, json.dumps(dialogue_history))
# 5. 返回处理结果
return {
"session_id": session_id,
"response": response['content'],
"intent": response.get('intent', ''),
"slots": response.get('entities', {})
}
这个管理器的核心逻辑是将会话上下文(对话历史列表)以session_id为键持久化到Redis中。每次对话时,先取出历史,连同当前问题一起发给扣子,扣子模型能基于完整上下文给出更准确的回复。处理完成后,将本轮对话追加到历史中并写回Redis。通过设置TTL,可以自动清理不活跃的会话,节省内存。
基于Redis的异步消息队列实现
为了应对高并发并实现请求的削峰填谷,我们引入了异步处理机制。使用Redis的List结构实现了一个简单的可靠队列。
import asyncio
import uuid
import time
from redis.asyncio import Redis
class AsyncMessageQueue:
def __init__(self, redis_client: Redis, queue_name='coze:task:queue'):
self.redis = redis_client
self.queue_name = queue_name
self.processing_set = f"{queue_name}:processing" # 正在处理的任务集合
self.retry_count_key_prefix = f"{queue_name}:retry:" # 重试次数键前缀
async def add_task(self, task_data: Dict[str, Any]) -> str:
"""添加任务到队列,并返回任务ID。使用消息去重(简易版)。"""
# 生成任务唯一ID和内容指纹(用于简单去重)
task_id = str(uuid.uuid4())
content_fingerprint = hashlib.md5(
json.dumps(task_data, sort_keys=True).encode()
).hexdigest()
task_item = {
'id': task_id,
'fp': content_fingerprint, # 指纹
'data': task_data,
'timestamp': time.time()
}
# 简易去重:检查最近1分钟内是否有相同指纹的任务已被添加
dup_key = f"{self.queue_name}:dup:{content_fingerprint}"
if not await self.redis.setnx(dup_key, '1'):
# 键已存在,说明是重复任务
await self.redis.expire(dup_key, 60) # 刷新过期时间
return f"duplicate:{content_fingerprint}"
await self.redis.expire(dup_key, 60) # 设置1分钟过期
# 将任务序列化后推入队列尾部
await self.redis.rpush(self.queue_name, json.dumps(task_item))
return task_id
async def process_task(self, worker_id: str):
"""工作进程消费任务。包含超时重试机制。"""
while True:
# 1. 从队列头部取一个任务,并原子性地放入“处理中集合”
task_json = await self.redis.blpop(self.queue_name, timeout=30)
if not task_json:
continue # 队列为空,等待
_, task_json = task_json
task = json.loads(task_json)
task_id = task['id']
# 2. 将任务ID加入“处理中集合”,并设置处理超时(如30秒)
added = await self.redis.sadd(self.processing_set, task_id)
if not added:
# 理论上不应发生,如果发生说明任务被重复取出,丢弃
continue
await self.redis.expire(self.processing_set, 30)
try:
# 3. 模拟任务处理(实际应调用对话状态管理器)
print(f"Worker {worker_id} processing task {task_id}")
await asyncio.sleep(0.1) # 模拟处理耗时
# 处理成功,从“处理中集合”移除
await self.redis.srem(self.processing_set, task_id)
except Exception as e:
print(f"Worker {worker_id} failed on task {task_id}: {e}")
# 4. 处理失败,重试逻辑
retry_key = f"{self.retry_count_key_prefix}{task_id}"
retry_count = int(await self.redis.get(retry_key) or 0)
if retry_count < 3: # 最大重试3次
await self.redis.incr(retry_key)
await self.redis.expire(retry_key, 3600) # 重试计数1小时过期
# 将任务重新放回队列头部,立即重试
await self.redis.lpush(self.queue_name, task_json)
else:
# 超过重试次数,任务最终失败,记录到死信队列或日志
await self.redis.srem(self.processing_set, task_id)
await self.redis.lpush(f"{self.queue_name}:dead", task_json)
这个队列实现了两个关键机制:消息去重(通过内容指纹在1分钟内防止重复提交)和超时重试(通过processing_set跟踪处理中的任务,失败后重新入队,最多3次)。这保证了在高并发下消息不被重复处理,且临时故障不会导致任务丢失。
性能优化:从可用到高效
架构搭建完成后,我们进行了系统的性能测试与优化。
压测报告:寻找系统瓶颈
我们使用JMeter对核心对话接口进行了压力测试,模拟了从50到2000并发用户的场景。测试环境为4核8G的云服务器,连接单独的Redis和扣子API。

(示意图:横轴为并发用户数,纵轴为平均响应时间(ms)。曲线显示在1000并发前响应时间平缓上升,之后斜率增大。)
测试结果摘要:
- 并发<500:平均响应时间稳定在150ms以内,系统资源利用率低。
- 并发500~1500:响应时间线性缓慢增长至约400ms(P95)。此时Web应用服务器CPU成为首个瓶颈点。
- 并发>1500:响应时间开始显著上升,超过800ms,错误率(非200响应)也开始增加。瓶颈转移到外部扣子API的调用延迟上(因其有自身的QPS限制)。
结论:单节点应用在1500 TPS左右达到性能拐点。要支持更高的并发,需要进行水平扩展和依赖服务治理。
冷启动优化方案
我们的服务部署在Kubernetes中,会面临实例扩缩容带来的冷启动问题。冷启动延迟主要来自两方面:Python Web应用加载(相对快)和与扣子API建立连接/模型预热(慢)。
我们采取了以下措施:
- 连接池与模型预加载:在应用启动后、接收流量前,主动向扣子API发送一个轻量级的预热请求(例如问候语“你好”)。这能促使扣子后端为我们的Bot预热模型,并且初始化我们HTTP客户端的连接池。我们使用Kubernetes的
readinessProbe,在预热完成后再将Pod标记为就绪。 - 水平扩展与负载均衡:如前所述,单实例有瓶颈。我们使用Nginx Ingress作为负载均衡器,配置了基于最少连接的负载均衡算法,将请求分发到多个应用实例。同时,将对话状态管理器中的Redis配置为集群模式,以承载更高的状态存储压力。
- 扣子API调用优化:与扣子团队沟通后,针对高并发场景,他们建议我们:
- 适当批量处理请求(对于可稍延迟的请求)。
- 使用异步SDK(如
aiohttp)来避免网络IO阻塞。 - 申请调整了QPS限制,以满足业务峰值需求。
经过优化,系统成功支持了2000+ TPS的稳定并发处理,且P99响应时间控制在500ms以内。
避坑指南:实践中遇到的挑战
在开发过程中,我们踩过一些坑,值得分享。
-
对话流设计中的状态一致性问题: 最初,我们将一些业务逻辑状态(如“用户正在选择产品型号”)也放在Redis的对话上下文中。但当两个请求几乎同时处理同一会话时(虽然不常见),可能出现状态覆盖。解决方案:对于需要强一致性的业务状态,我们改用Redis的
WATCH/MULTI/EXEC事务,或者使用分布式锁(如Redlock)来确保关键状态更新的原子性。对于大多数对话流,扣子自身的工作流状态管理更可靠,应尽量将复杂逻辑编排在扣子平台内。 -
敏感词过滤的正则表达式最佳实践: 客服回复必须过滤敏感词。最初我们使用一个巨大的正则表达式
(敏感词1|敏感词2|...),性能很差,且容易漏掉变体。解决方案:采用多级过滤策略。- 第一级:使用高效的字符串匹配算法(如Aho-Corasick自动机)的库(如
flashtext或ahocorasick)进行精确词匹配,时间复杂度接近O(n)。 - 第二级:对于需要模糊匹配的(如谐音、拆字),使用经过优化的、范围精确的正则表达式,并预编译。
- 第三级:将过滤服务独立部署,避免影响主对话线程。
import ahocorasick # 构建自动机 A = ahocorasick.Automaton() for idx, word in enumerate(sensitive_words_list): A.add_word(word, (idx, word)) A.make_automaton() # 进行匹配 for end_index, (idx, original_word) in A.iter(text): start_index = end_index - len(original_word) + 1 print(f"Found '{original_word}' at {start_index}:{end_index}") # 进行替换或标记 - 第一级:使用高效的字符串匹配算法(如Aho-Corasick自动机)的库(如
-
GPU资源分配策略(针对自行部署NLU模型的情况): 如果我们未来需要将部分NLU模型(如用于二次分类)自行部署,GPU分配是关键。经验:不要将所有模型堆在一个GPU上。使用
CUDA_VISIBLE_DEVICES环境变量隔离不同服务。对于推理服务,使用TensorRT或ONNX Runtime进行模型优化和量化,可以大幅减少GPU内存占用和提升推理速度,从而在单卡上部署更多模型实例。
延伸思考:从文本到语音与业务联动
当前系统主要处理文本客服。未来的扩展方向可以很有趣:
-
语音客服集成:架构上可以增加一个“语音处理层”。用户语音流通过WebSocket接入,先由语音识别(ASR)服务(如阿里云、腾讯云的实时语音识别)转为文本。文本进入我们现有的扣子对话引擎。引擎返回的文本回复,再经由语音合成(TTS)服务转为语音,通过WebSocket返回给用户。扣子强大的对话能力可以无缝复用,我们只需处理好音频流的实时接收、发送和编解码。
-
与工单系统联动:当扣子识别到用户意图为“投诉”、“故障申报”或经过多轮尝试仍无法解决时,可以自动触发创建工单。这可以通过扣子的“插件”功能实现。我们开发一个“工单创建插件”,当对话工作流到达特定节点时,调用该插件。插件内部通过RPC或API调用企业的工单系统(如Jira Service Desk、自研系统),将对话历史、用户信息、识别出的问题分类自动填充为工单内容,并返回工单号给用户。这样实现了从智能应答到人工服务的平滑过渡,全程自动化,提升了问题解决效率。
通过这个项目,我们深刻体会到,选择一个合适的AI平台能极大降低智能客服系统的开发门槛和运维成本。扣子平台在NLU核心能力上的优势,让我们能更专注于业务逻辑和系统架构的优化,最终实现了响应速度、识别准确率和运营成本的多重提升。希望这篇笔记能为你构建自己的AI客服系统提供一条清晰的路径。
更多推荐


所有评论(0)