AI Agent 编排系统:从线性流程到事件驱动架构的演进

cover

一、从单 Agent 到多 Agent 协作:复杂任务的编排困境

在 AI 产品的早期阶段,我们使用简单的单 Agent 架构就能满足需求。用户提出一个问题,Agent 调用一次 LLM 就给出答案。但随着功能不断丰富,我们发现越来越多的任务需要多个步骤协同完成:数据分析需要先查询数据库,再进行计算,最后生成图表;客户支持需要先理解意图,检索知识库,再调用 CRM 系统。

最初我们用简单的 Python 脚本串联这些步骤,但很快就遇到了问题:流程硬编码在代码中,修改需要重新部署;错误处理逻辑分散在各个步骤中,难以维护;没有可视化的流程设计工具,产品经理无法参与流程设计。

这些问题并非我们独有,几乎所有从简单对话机器人向复杂工作流演进的团队都会遇到。技术如果不服务于真实的业务灵活性,那只是僵硬的代码堆砌。我们需要一套可编排、可监控、可调试的 Agent 协作框架。

二、事件驱动的 Agent 编排架构设计:解耦与弹性的核心

flowchart TB
    subgraph 接入层
        A[用户请求] --> B[API Gateway]
        B --> C[事件总线]
    end
    
    subgraph 编排层
        C --> D[Workflow Engine]
        D --> E[状态管理器]
        E --> F[任务调度器]
    end
    
    subgraph 执行层
        F --> G[Agent 1]
        F --> H[Agent 2]
        F --> I[Agent 3]
        G --> J[工具调用]
        H --> J
        I --> J
    end
    
    subgraph 持久化层
        K[状态存储]
        L[执行日志]
        M[监控指标]
    end
    
    E --> K
    G --> L
    H --> L
    I --> L
    G --> M
    H --> M
    I --> M

2.1 工作流定义与 DSL 设计

我们设计了一套简单的领域特定语言(DSL)来定义 Agent 工作流,让产品经理和开发者都能参与流程设计。工作流由任务、条件分支、循环等基本元素组成:

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import json

class TaskType(Enum):
    AGENT = "agent"
    TOOL = "tool"
    CONDITION = "condition"
    PARALLEL = "parallel"
    LOOP = "loop"

@dataclass
class Task:
    id: str
    type: TaskType
    name: str
    config: Dict[str, Any] = field(default_factory=dict)
    dependencies: List[str] = field(default_factory=list)
    on_error: Optional[str] = None  # 错误处理策略

@dataclass
class Workflow:
    id: str
    name: str
    tasks: List[Task]
    entry_point: str
    version: str = "1.0"

class WorkflowDefinitionLoader:
    """工作流定义加载器"""
    
    @staticmethod
    def from_dict(data: Dict[str, Any]) -> Workflow:
        """从字典加载工作流定义"""
        tasks = [
            Task(
                id=t["id"],
                type=TaskType(t["type"]),
                name=t["name"],
                config=t.get("config", {}),
                dependencies=t.get("dependencies", []),
                on_error=t.get("on_error")
            )
            for t in data["tasks"]
        ]
        
        return Workflow(
            id=data["id"],
            name=data["name"],
            tasks=tasks,
            entry_point=data["entry_point"],
            version=data.get("version", "1.0")
        )
    
    @staticmethod
    def from_json(json_str: str) -> Workflow:
        """从 JSON 字符串加载"""
        return WorkflowDefinitionLoader.from_dict(json.loads(json_str))


# 工作流定义示例
workflow_def = {
    "id": "data_analysis",
    "name": "数据分析工作流",
    "entry_point": "query_data",
    "tasks": [
        {
            "id": "query_data",
            "type": "tool",
            "name": "查询数据库",
            "config": {"tool": "database_query"},
            "dependencies": []
        },
        {
            "id": "analyze_data",
            "type": "agent",
            "name": "数据分析 Agent",
            "config": {"agent_id": "data_analyst"},
            "dependencies": ["query_data"]
        },
        {
            "id": "generate_chart",
            "type": "tool",
            "name": "生成图表",
            "config": {"tool": "chart_generator"},
            "dependencies": ["analyze_data"]
        },
        {
            "id": "generate_report",
            "type": "agent",
            "name": "报告生成 Agent",
            "config": {"agent_id": "report_writer"},
            "dependencies": ["generate_chart"]
        }
    ]
}

workflow = WorkflowDefinitionLoader.from_dict(workflow_def)

2.2 状态机与持久化设计

工作流执行是一个状态机,每个任务都有明确的生命周期:待执行、执行中、成功、失败、已取消。我们使用 Redis 缓存当前状态,使用 PostgreSQL 持久化完整的执行历史:

import uuid
from datetime import datetime
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, Optional

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class TaskExecution:
    id: str
    task_id: str
    workflow_id: str
    execution_id: str
    status: TaskStatus
    input_data: Dict[str, Any]
    output_data: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

@dataclass
class WorkflowExecution:
    id: str
    workflow_id: str
    status: TaskStatus
    tasks: Dict[str, TaskExecution] = field(default_factory=dict)
    input_data: Dict[str, Any] = field(default_factory=dict)
    output_data: Optional[Dict[str, Any]] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

class StateManager:
    """状态管理器,负责工作流和任务状态的持久化"""
    
    def __init__(self, redis_client, db_connection):
        self.redis = redis_client
        self.db = db_connection
    
    def create_workflow_execution(self, workflow_id: str, 
                                  input_data: Dict[str, Any]) -> str:
        """创建工作流执行实例"""
        execution_id = str(uuid.uuid4())
        
        execution = WorkflowExecution(
            id=execution_id,
            workflow_id=workflow_id,
            status=TaskStatus.PENDING,
            input_data=input_data,
            started_at=datetime.now()
        )
        
        # 保存到 Redis 缓存
        cache_key = f"workflow:{execution_id}"
        self.redis.setex(cache_key, 3600, json.dumps({
            "id": execution.id,
            "workflow_id": execution.workflow_id,
            "status": execution.status.value,
            "started_at": execution.started_at.isoformat()
        }))
        
        return execution_id
    
    def update_task_status(self, execution_id: str, task_id: str, 
                          status: TaskStatus, output: Optional[Dict] = None,
                          error: Optional[str] = None):
        """更新任务状态"""
        # 更新缓存
        cache_key = f"workflow:{execution_id}:task:{task_id}"
        update_data = {
            "status": status.value,
            "updated_at": datetime.now().isoformat()
        }
        if output:
            update_data["output"] = output
        if error:
            update_data["error"] = error
        
        self.redis.hset(cache_key, mapping=update_data)
    
    def get_ready_tasks(self, execution_id: str, 
                       workflow: Workflow) -> List[Task]:
        """获取可以执行的任务(所有依赖已完成)"""
        ready_tasks = []
        
        for task in workflow.tasks:
            # 检查任务是否已处理过
            cache_key = f"workflow:{execution_id}:task:{task.id}"
            if self.redis.exists(cache_key):
                task_data = self.redis.hgetall(cache_key)
                status = TaskStatus(task_data.get(b"status", b"pending").decode())
                if status != TaskStatus.PENDING:
                    continue
            
            # 检查所有依赖是否已成功完成
            all_deps_ready = True
            for dep_id in task.dependencies:
                dep_key = f"workflow:{execution_id}:task:{dep_id}"
                if not self.redis.exists(dep_key):
                    all_deps_ready = False
                    break
                
                dep_data = self.redis.hgetall(dep_key)
                dep_status = TaskStatus(dep_data.get(b"status", b"pending").decode())
                if dep_status != TaskStatus.SUCCESS:
                    all_deps_ready = False
                    break
            
            if all_deps_ready:
                ready_tasks.append(task)
        
        return ready_tasks

2.3 事件总线与任务调度

我们使用事件总线来解耦工作流引擎和执行器:

import asyncio
from typing import Dict, Any, Callable
from abc import ABC, abstractmethod

class Event(ABC):
    """事件基类"""
    
    @abstractmethod
    def to_dict(self) -> Dict[str, Any]:
        pass

class TaskScheduledEvent(Event):
    """任务调度事件"""
    
    def __init__(self, execution_id: str, task: Task, 
                 input_data: Dict[str, Any]):
        self.execution_id = execution_id
        self.task = task
        self.input_data = input_data
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "type": "task_scheduled",
            "execution_id": self.execution_id,
            "task_id": self.task.id,
            "task_type": self.task.type.value,
            "input_data": self.input_data
        }

class TaskCompletedEvent(Event):
    """任务完成事件"""
    
    def __init__(self, execution_id: str, task_id: str,
                 output_data: Dict[str, Any]):
        self.execution_id = execution_id
        self.task_id = task_id
        self.output_data = output_data
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "type": "task_completed",
            "execution_id": self.execution_id,
            "task_id": self.task_id,
            "output_data": self.output_data
        }

class EventBus:
    """事件总线"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}
    
    def subscribe(self, event_type: str, handler: Callable):
        """订阅事件"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
    
    async def publish(self, event: Event):
        """发布事件"""
        event_dict = event.to_dict()
        event_type = event_dict["type"]
        
        if event_type in self.subscribers:
            for handler in self.subscribers[event_type]:
                await handler(event_dict)

class WorkflowEngine:
    """工作流引擎"""
    
    def __init__(self, event_bus: EventBus, state_manager: StateManager):
        self.event_bus = event_bus
        self.state_manager = state_manager
        
        # 订阅任务完成事件
        self.event_bus.subscribe("task_completed", self.on_task_completed)
    
    async def start_workflow(self, workflow: Workflow, 
                            input_data: Dict[str, Any]) -> str:
        """启动工作流"""
        execution_id = self.state_manager.create_workflow_execution(
            workflow.id, input_data
        )
        
        # 调度初始任务
        await self.schedule_ready_tasks(execution_id, workflow, input_data)
        
        return execution_id
    
    async def schedule_ready_tasks(self, execution_id: str, 
                                  workflow: Workflow, 
                                  context: Dict[str, Any]):
        """调度可执行的任务"""
        ready_tasks = self.state_manager.get_ready_tasks(execution_id, workflow)
        
        for task in ready_tasks:
            # 更新任务状态为运行中
            self.state_manager.update_task_status(
                execution_id, task.id, TaskStatus.RUNNING
            )
            
            # 发布任务调度事件
            event = TaskScheduledEvent(execution_id, task, context)
            await self.event_bus.publish(event)
    
    async def on_task_completed(self, event_dict: Dict[str, Any]):
        """处理任务完成事件"""
        execution_id = event_dict["execution_id"]
        task_id = event_dict["task_id"]
        output_data = event_dict["output_data"]
        
        # 更新任务状态
        self.state_manager.update_task_status(
            execution_id, task_id, TaskStatus.SUCCESS, output=output_data
        )
        
        # 继续调度后续任务
        # 这里需要加载工作流定义并获取上下文
        # (省略具体实现)

三、条件分支与并行执行:复杂流程的灵活控制

3.1 条件分支实现

条件分支让工作流可以根据中间结果选择不同的执行路径:

class ConditionEvaluator:
    """条件评估器"""
    
    @staticmethod
    def evaluate(condition: str, context: Dict[str, Any]) -> bool:
        """
        评估条件表达式
        支持简单的比较运算: ==, !=, >, <, >=, <=
        支持逻辑运算: and, or, not
        """
        # 这里使用安全的表达式评估
        # 生产环境中应该使用更严格的解析器
        try:
            # 创建安全的全局命名空间
            safe_globals = {
                "__builtins__": {},
                "True": True,
                "False": False,
                "None": None
            }
            
            # 将上下文注入局部命名空间
            safe_locals = context.copy()
            
            # 安全评估
            result = eval(condition, safe_globals, safe_locals)
            return bool(result)
        except Exception as e:
            print(f"条件评估失败: {e}")
            return False


# 使用示例
context = {"data_count": 1500, "has_error": False}
condition = "data_count > 1000 and not has_error"
result = ConditionEvaluator.evaluate(condition, context)
print(f"条件 '{condition}' 评估结果: {result}")

3.2 并行任务执行

对于可以并行处理的任务,我们支持并行执行以提高效率:

class ParallelTaskExecutor:
    """并行任务执行器"""
    
    def __init__(self, max_concurrency: int = 5):
        self.max_concurrency = max_concurrency
        self.semaphore = asyncio.Semaphore(max_concurrency)
    
    async def execute_task(self, task: Task, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """执行单个任务(带并发控制)"""
        async with self.semaphore:
            # 实际任务执行逻辑
            print(f"执行任务: {task.name}")
            await asyncio.sleep(1)  # 模拟执行时间
            return {"result": f"{task.name} 执行完成"}
    
    async def execute_parallel(self, tasks: List[Task], 
                              input_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
        """并行执行多个任务"""
        coroutines = [
            self.execute_task(task, input_data)
            for task in tasks
        ]
        
        results = await asyncio.gather(*coroutines, return_exceptions=True)
        
        # 整理结果
        output = {}
        for task, result in zip(tasks, results):
            if isinstance(result, Exception):
                output[task.id] = {"error": str(result)}
            else:
                output[task.id] = result
        
        return output

四、可观测性与调试工具:生产环境的必备设施

4.1 完整的执行日志与追踪

每个工作流执行都会生成详细的日志,包括任务的输入输出、执行时间、错误信息等:

import logging
from datetime import datetime

class WorkflowLogger:
    """工作流执行日志记录器"""
    
    def __init__(self, log_directory: str = "./workflow_logs"):
        self.log_directory = log_directory
        import os
        os.makedirs(log_directory, exist_ok=True)
        
        # 配置日志
        self.logger = logging.getLogger("workflow_engine")
        self.logger.setLevel(logging.DEBUG)
        
        # 文件处理器
        file_handler = logging.FileHandler(
            f"{log_directory}/workflow_{datetime.now().strftime('%Y%m%d')}.log"
        )
        file_handler.setLevel(logging.DEBUG)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # 格式化
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
    
    def log_task_start(self, execution_id: str, task_id: str, 
                      task_name: str, input_data: Dict):
        """记录任务开始"""
        self.logger.info(
            f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) started"
        )
        self.logger.debug(f"Input data: {json.dumps(input_data, ensure_ascii=False)}")
    
    def log_task_complete(self, execution_id: str, task_id: str,
                         task_name: str, output_data: Dict,
                         duration_seconds: float):
        """记录任务完成"""
        self.logger.info(
            f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) "
            f"completed in {duration_seconds:.2f}s"
        )
        self.logger.debug(f"Output data: {json.dumps(output_data, ensure_ascii=False)}")
    
    def log_task_error(self, execution_id: str, task_id: str,
                      task_name: str, error: Exception):
        """记录任务错误"""
        self.logger.error(
            f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) "
            f"failed: {str(error)}"
        )

4.2 可视化的执行追踪界面

为了帮助开发者调试工作流,我们构建了可视化界面,可以:

  • 查看工作流的执行进度
  • 检查每个任务的输入输出
  • 回放执行过程
  • 分析性能瓶颈

五、架构权衡与适用边界:编排系统的选型考量

架构模式 优点 缺点 适用场景
线性脚本 简单直接 难以维护、扩展性差 简单、固定流程
事件驱动 解耦、高弹性 复杂度高 复杂、动态流程
状态机 清晰、可控 状态定义繁琐 有明确状态转换的场景

4.1 何时需要编排系统

构建编排系统有一定的复杂度,不是所有场景都需要。建议在以下情况下考虑:

  • 工作流步骤超过 5 个
  • 需要支持条件分支和并行执行
  • 工作流需要频繁变更,希望非技术人员能参与
  • 需要完整的执行历史和监控

如果只是简单的 2-3 个步骤,用普通的函数调用可能更合适。

4.2 与现有框架的对比

目前有一些开源的 Agent 编排框架,如 LangGraph、AutoGen 等。我们最终选择自己构建,主要是因为:

  • 需要与现有的基础设施深度集成
  • 对性能和延迟有较高要求
  • 需要支持特定的业务规则

但对于大多数团队来说,先评估现有框架是否满足需求是更高效的选择。

五、总结

AI Agent 编排系统是复杂业务流程的核心基础设施。从简单的线性脚本到事件驱动架构,我们经历了多次迭代,最终构建了一套灵活、可靠、可观测的编排系统。

这套系统的核心要素包括:声明式的工作流定义 DSL、基于状态机的执行引擎、事件驱动的任务调度、完整的可观测性工具。在选择技术方案时,要根据业务复杂度和团队能力做出合适的权衡,不要过度设计。

对于 AI 创业公司来说,灵活的编排系统是快速迭代产品的关键。它让我们可以将复杂的 AI 能力封装成可重用的模块,通过可视化的方式组合出新的业务流程,大大加速了产品创新的速度。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐