最近在做一个智能客服项目,遇到了一个典型的高并发难题:平时系统运行平稳,但一到营销活动或业务高峰期,大量用户涌入,客服机器人就开始“罢工”——要么响应超时,要么对话上下文错乱,甚至直接丢失会话。传统的基于轮询或简单回调的架构,在状态管理和资源调度上显得力不从心。经过一番调研和实战,我们最终选择基于 Coze工作流 来重构核心对话引擎,成功将系统QPS提升了3倍以上。今天就来分享一下整个架构的设计思路、核心实现代码以及那些踩过的“坑”。

智能客服系统架构示意图

1. 背景与痛点:为什么传统方案在高并发下会失效?

我们最初的智能客服系统架构比较传统:用户消息通过API网关进入,由一个中心化的对话服务处理。这个服务维护着用户会话状态(通常放在Redis里),调用LLM接口生成回复,再返回给用户。听起来没问题,对吧?但在高并发场景下,问题接踵而至:

  • 会话状态竞争:大量并发请求同时读写同一个用户的会话上下文Redis键,导致状态覆盖或丢失。加锁又严重拖慢响应速度。
  • 响应链路过长且同步:从接收消息、更新状态、调用LLM、到返回结果,整个流程是同步的。任何一个环节(尤其是LLM调用)延迟,都会阻塞整个请求线程,快速耗尽服务器资源。
  • 资源伸缩不灵活:对话服务是一个整体,扩容时所有功能一起扩,无法针对耗时的LLM调用或状态管理进行独立伸缩。
  • 错误恢复困难:如果LLM调用失败或超时,整个对话流程中断,很难从中断点优雅恢复,用户体验差。

传统的轮询或简单事件驱动架构,很难精细地管理一个可能包含多轮问答、分支跳转的复杂对话状态机。

2. 技术选型:为什么是Coze工作流?

面对状态管理和异步编排的需求,我们评估了几个主流方案:

  • AWS Step Functions:功能强大,表达能力强(尤其是JSONPath),与AWS生态无缝集成。但冷启动延迟(Cold Start)在毫秒级,对于需要极低延迟的对话交互来说,有时仍显不足。此外,国内团队使用可能有网络和合规考量。
  • Azure Logic Apps:更偏向于企业应用集成,可视化设计器优秀。但对于需要深度自定义逻辑和复杂状态转换的对话引擎,其灵活度稍逊,且同样存在冷启动问题。
  • Coze工作流:这是我们最终的选择。它本质上是一个托管式的、高性能的工作流引擎。其核心优势在于:
    1. 极低延迟的状态管理:工作流实例的状态由引擎持久化,读写速度极快,避免了我们自己管理Redis状态的一致性问题。
    2. 内置的异步与容错:天然支持异步任务、重试、超时和错误处理,我们可以轻松编排“接收消息->意图识别->LLM调用->回复生成”的流程。
    3. 灵活的扩缩容:工作流引擎和任务执行节点(我们的业务代码)可以独立伸缩。我们可以为耗时的LLM调用部署更多节点,而状态引擎保持稳定。
    4. 可视化与代码结合:支持通过DAG(有向无环图)定义流程,既可以用UI拖拽,也支持代码化定义(Infrastructure as Code),便于版本管理。

简而言之,Coze工作流将复杂的对话状态机管理和异步协调逻辑,从我们的业务代码中剥离出来,交给了更专业的平台。

3. 核心实现:用DAG定义对话状态机

我们的目标是:每一个用户会话,对应一个Coze工作流实例。这个实例的生命周期就是整个对话过程。

首先,我们用Mermaid来描绘核心的对话状态机架构:

graph TD
    A[用户消息事件] --> B{工作流实例存在?}
    B -- 否 --> C[创建新工作流实例]
    B -- 是 --> D[向现有实例发送信号]
    C --> E[初始化对话上下文]
    D --> E
    E --> F[意图识别节点]
    F --> G{意图类型}
    G -- 常规问答 --> H[调用LLM生成节点]
    G -- 转人工 --> I[创建工单节点]
    G -- 查询订单 --> J[调用业务API节点]
    H --> K[格式化回复节点]
    I --> K
    J --> K
    K --> L[推送回复至用户]
    L --> M{会话是否结束?}
    M -- 否 --> N[等待下一条消息]
    M -- 是 --> O[清理资源并结束实例]
    N --> A

这个DAG清晰地定义了对话的路径。接下来,我们看如何用Python代码与Coze工作流SDK集成,并实现上下文持久化。

3.1 消息队列与工作流引擎集成

我们使用RabbitMQ作为消息入口。消费者收到消息后,并不处理业务,而是负责协调工作流实例。

import pika
from coze_sdk import WorkflowClient, WorkflowInstance
from typing import Dict, Any, Optional
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ConversationOrchestrator:
    def __init__(self, coze_client: WorkflowClient, mongo_client):
        self.coze_client = coze_client
        self.mongo_client = mongo_client
        self.workflow_definition_id = "conversation-state-machine-v1"

    def on_user_message(self, channel, method, properties, body):
        """RabbitMQ消费者回调函数"""
        try:
            message_data: Dict[str, Any] = json.loads(body)
            user_id = message_data["user_id"]
            session_id = message_data.get("session_id", user_id)
            text = message_data["text"]

            # 关键步骤:查找或创建工作流实例
            instance: Optional[WorkflowInstance] = self._find_active_instance(session_id)
            if not instance:
                logger.info(f"为会话 {session_id} 创建新的工作流实例")
                # 初始化上下文并启动实例
                initial_context = self._initialize_context(user_id, text)
                instance = self.coze_client.start_workflow(
                    definition_id=self.workflow_definition_id,
                    input_data={"initial_message": text, "context": initial_context},
                    instance_id=session_id  # 使用session_id作为实例ID,保证唯一性
                )
            else:
                logger.info(f"向现有实例 {session_id} 发送信号")
                # 向运行中的实例发送新消息信号
                self.coze_client.signal_workflow(
                    instance_id=session_id,
                    signal_name="new_message",
                    signal_data={"text": text}
                )

            channel.basic_ack(delivery_tag=method.delivery_tag)
        except json.JSONDecodeError as e:
            logger.error(f"消息JSON解析失败: {e}")
            channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        except KeyError as e:
            logger.error(f"消息缺少必要字段: {e}")
            channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        except Exception as e:
            logger.error(f"处理消息时发生未知错误: {e}", exc_info=True)
            channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 可能临时故障,重新入队

    def _find_active_instance(self, session_id: str) -> Optional[WorkflowInstance]:
        """查询Coze引擎中该会话ID对应的活跃实例"""
        # Coze SDK通常提供按ID查询或按状态查询实例的方法
        try:
            instance = self.coze_client.get_workflow_instance(instance_id=session_id)
            if instance.status in ["RUNNING", "PAUSED"]:
                return instance
        except Exception as e:
            # 实例不存在或其他错误
            logger.debug(f"未找到活跃实例 {session_id}: {e}")
        return None

    def _initialize_context(self, user_id: str, initial_text: str) -> Dict[str, Any]:
        """初始化并持久化对话上下文到MongoDB"""
        context_doc = {
            "user_id": user_id,
            "session_start_time": datetime.utcnow(),
            "message_history": [{"role": "user", "content": initial_text}],
            "intent": None,
            "slots": {}, # 用于存储提取的实体信息,如订单号、城市等
            "metadata": {}
        }
        # 插入到MongoDB集合中
        result = self.mongo_client.conversation.contexts.insert_one(context_doc)
        context_doc["_id"] = result.inserted_id
        return context_doc

3.2 对话上下文持久化与MongoDB分片配置

工作流引擎管理流程状态,但详细的对话历史、提取的实体(槽位)等,我们选择存储在MongoDB中,便于复杂查询和分析。为了应对高并发写入,我们采用了分片集群。

from pymongo import MongoClient, ASCENDING
from pymongo.errors import DuplicateKeyError, OperationFailure

class ConversationContextManager:
    def __init__(self, mongo_uri: str):
        # 连接MongoDB分片集群的mongos路由器
        self.client = MongoClient(mongo_uri)
        self.db = self.client.conversation
        self.contexts = self.db.contexts

    def update_context(self, session_id: str, update_data: Dict[str, Any]) -> bool:
        """更新对话上下文,例如追加消息历史、更新意图槽位"""
        try:
            # 使用 $push 追加消息,$set 更新其他字段
            update_operations = {}
            if "new_message" in update_data:
                update_operations["$push"] = {"message_history": update_data["new_message"]}
            if "slots" in update_data or "intent" in update_data:
                update_operations.setdefault("$set", {}).update({k: v for k, v in update_data.items() if k in ["slots", "intent", "metadata"]})

            if not update_operations:
                return True

            result = self.contexts.update_one(
                {"_id": session_id}, # 我们使用Coze工作流实例ID作为Mongo文档的_id
                update_operations,
                upsert=False # 不创建新文档,文档应在初始化时创建
            )
            return result.modified_count > 0
        except OperationFailure as e:
            logger.error(f"更新MongoDB上下文失败: {e}")
            # 这里可以触发告警或降级逻辑
            return False

# MongoDB分片配置关键步骤(在Mongo Shell中执行):
# 1. 启用分片数据库:sh.enableSharding("conversation")
# 2. 选择分片键:我们选择 `user_id` 作为分片键,将同一用户的数据分布到同一分片,有利于查询。
#    db.contexts.createIndex({ "user_id": 1 })
# 3. 对集合进行分片:sh.shardCollection("conversation.contexts", { "user_id": 1 })
# 这样,数据会根据user_id的哈希值分布到不同分片上。

4. 性能优化:负载测试与实例回收

架构搭好了,性能如何?我们使用Locust进行了压力测试。

4.1 负载测试方案(Locust脚本示例)

# locustfile.py
from locust import HttpUser, task, between
import random
import string

class ChatbotUser(HttpUser):
    wait_time = between(0.5, 2) # 模拟用户思考时间

    def on_start(self):
        """用户会话开始,初始化一个唯一的用户ID和会话ID"""
        self.user_id = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
        self.session_id = self.user_id # 简化模型,会话ID即用户ID
        self.message_count = 0

    @task
    def send_message(self):
        """模拟发送一条消息"""
        self.message_count += 1
        payload = {
            "user_id": self.user_id,
            "session_id": self.session_id,
            "text": f"测试消息{self.message_count},请问有什么可以帮助您的?"
        }
        headers = {'Content-Type': 'application/json'}
        # 我们的API网关接收消息,并投递到RabbitMQ
        with self.client.post("/api/message", json=payload, headers=headers, catch_response=True) as response:
            if response.status_code == 202: # 202 Accepted 表示消息已接收,进入异步处理
                response.success()
            else:
                response.failure(f"Unexpected status code: {response.status_code}")

通过Locust,我们模拟了从每秒几百到上千的并发用户请求。监控指标包括:API网关延迟、RabbitMQ堆积、Coze工作流实例创建/执行延迟、MongoDB操作延迟。

4.2 工作流实例的自动回收策略

一个对话可能永远不结束(用户不说了),我们不能让工作流实例无限期运行,占用资源。Coze工作流支持设置实例超时。

  • 执行超时:在创建工作流定义时,设置整个工作流的最大执行时长(例如30分钟)。
  • 任务心跳与超时:对于“等待下一条消息”这种等待状态的任务节点,我们设置一个“等待超时”(例如10分钟)。如果超时后仍未收到用户新消息,工作流会触发超时分支,执行清理逻辑并结束实例。
  • 定期清理僵尸实例:我们额外部署了一个定时任务,定期扫描状态为RUNNING但最后活动时间超过阈值的实例,强制将其终止(Terminate)并记录日志。

5. 避坑指南:幂等、重试与死信

在实际生产环境中,以下几个坑我们踩得最深。

5.1 对话幂等性的实现

网络可能抖动,客户端可能重发同一条消息。我们必须保证同一消息被处理多次的效果和处理一次相同。关键在于“用户消息ID”和“分布式锁”。

import redis
import hashlib
import time

class IdempotencyHandler:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.lock_timeout = 5  # 锁超时时间,秒

    def ensure_idempotent(self, session_id: str, message_id: str, message_hash: str, ttl_seconds: int = 300) -> bool:
        """
        确保消息处理的幂等性。
        返回True表示可以处理(首次见到此消息),False表示已处理过(重复消息)。
        """
        # 构造Redis键
        dedupe_key = f"msg_dedup:{session_id}:{message_id}"
        lock_key = f"lock:{dedupe_key}"

        # 1. 快速检查:消息哈希是否已存在
        existing_hash = self.redis.get(dedupe_key)
        if existing_hash and existing_hash.decode() == message_hash:
            logger.info(f"消息 {message_id} 已处理过,跳过")
            return False

        # 2. 获取分布式锁,防止并发处理同一条“新”消息
        lock_acquired = self._acquire_lock(lock_key)
        if not lock_acquired:
            # 获取锁失败,可能其他进程正在处理,稍后重试或直接认为正在处理(根据业务决定)
            logger.warning(f"无法获取锁处理消息 {message_id},可能正在被其他进程处理")
            # 这里可以选择等待重试或返回False。我们选择返回False,让消息在MQ中稍后重试。
            return False

        try:
            # 3. 再次检查(Double-check),防止在获取锁期间消息已被处理
            existing_hash = self.redis.get(dedupe_key)
            if existing_hash and existing_hash.decode() == message_hash:
                return False

            # 4. 标记消息已处理
            self.redis.setex(dedupe_key, ttl_seconds, message_hash)
            logger.info(f"消息 {message_id} 标记为已处理")
            return True
        finally:
            # 5. 释放锁
            self._release_lock(lock_key)

    def _acquire_lock(self, lock_key: str) -> bool:
        """使用SETNX实现简单的分布式锁"""
        # 使用随机值作为锁的值,防止误删其他进程的锁
        lock_value = str(time.time())
        # SET key value NX EX timeout
        acquired = self.redis.set(lock_key, lock_value, nx=True, ex=self.lock_timeout)
        return acquired is not None

    def _release_lock(self, lock_key: str):
        """释放锁,使用Lua脚本保证原子性"""
        lua_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        # 这里简化,实际需要传递lock_value。我们在类内部存储它。
        self.redis.delete(lock_key) # 简化版,生产环境应用Lua脚本

在消息消费者中,在处理前先调用ensure_idempotent

5.2 错误重试与死信队列配置

不是所有失败都值得无限重试。我们为RabbitMQ队列配置了重试和死信交换器(DLX)。

  • 重试队列:主业务队列chatbot.messages设置最大重试次数(例如3次),并配置死信交换器chatbot.dlx
  • 死信队列:消息在重试多次失败后,会被路由到死信队列chatbot.dead.letters
  • 告警与处理:监控死信队列的消息堆积,触发告警。我们可以有单独的服务消费死信队列,进行人工干预、日志分析和数据修复。

在Coze工作流任务节点中,我们也为可能失败的操作(如调用外部LLM API、写入数据库)配置了指数退避的重试策略。

6. 延伸思考:接入LLM意图识别模块

目前我们的工作流中,“意图识别节点”可能还是基于规则或简单的分类模型。一个很自然的演进是接入一个更强大的LLM(如GPT-4、Claude或本地部署的大模型)来进行意图识别和槽位填充。

我们可以将这个LLM服务封装成一个独立的决策节点接入工作流:

  1. 节点输入:当前对话上下文(精简的历史消息)、用户最新话语。
  2. 节点逻辑:调用LLM API,使用精心设计的Prompt,要求其输出结构化的JSON,包含:intent(意图分类)、slots(提取的键值对)、confidence(置信度)。
  3. 节点输出:将结构化的意图和槽位数据输出到工作流上下文中,驱动后续分支判断(是调用问答LLM,还是转人工,或是查询订单API)。

这样,工作流就成为了一个协调者,负责流程控制和状态管理,而具体的智能任务(意图识别、回复生成)则由专业的、可独立伸缩的AI服务节点完成,架构更加清晰和健壮。

工作流与AI服务协同示意图

总结

通过引入Coze工作流,我们将智能客服系统从“一个笨重的同步服务”改造为“一个由状态机驱动的异步协作网络”。这个架构带来的好处是显而易见的:响应更快、更稳定、更容易扩展和运维。状态管理交给工作流引擎,我们只需关心单个节点的业务逻辑实现。虽然引入工作流增加了一定的架构复杂度,但它在高并发、复杂流程的场景下带来的收益是巨大的。

当然,没有银弹。这种架构更适合对话逻辑复杂、有状态、需要多步异步处理的场景。对于极其简单的问答,可能有点“杀鸡用牛刀”。建议大家在评估时,一定要结合自身的业务复杂度和流量规模。

希望这篇笔记能为你构建高并发、可靠的智能对话系统提供一些切实可行的思路。如果你也尝试了Coze工作流或者其他工作流引擎,欢迎一起交流踩坑经验!

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐