AI Agent 编排系统:从线性流程到事件驱动架构的演进
我们设计了一套简单的领域特定语言(DSL)来定义 Agent 工作流,让产品经理和开发者都能参与流程设计。@dataclassid: strname: stron_error: Optional[str] = None # 错误处理策略@dataclassid: strname: str"""工作流定义加载器""""""从字典加载工作流定义"""tasks = [Task("""从 JSON 字符
AI Agent 编排系统:从线性流程到事件驱动架构的演进

一、从单 Agent 到多 Agent 协作:复杂任务的编排困境
在 AI 产品的早期阶段,我们使用简单的单 Agent 架构就能满足需求。用户提出一个问题,Agent 调用一次 LLM 就给出答案。但随着功能不断丰富,我们发现越来越多的任务需要多个步骤协同完成:数据分析需要先查询数据库,再进行计算,最后生成图表;客户支持需要先理解意图,检索知识库,再调用 CRM 系统。
最初我们用简单的 Python 脚本串联这些步骤,但很快就遇到了问题:流程硬编码在代码中,修改需要重新部署;错误处理逻辑分散在各个步骤中,难以维护;没有可视化的流程设计工具,产品经理无法参与流程设计。
这些问题并非我们独有,几乎所有从简单对话机器人向复杂工作流演进的团队都会遇到。技术如果不服务于真实的业务灵活性,那只是僵硬的代码堆砌。我们需要一套可编排、可监控、可调试的 Agent 协作框架。
二、事件驱动的 Agent 编排架构设计:解耦与弹性的核心
flowchart TB
subgraph 接入层
A[用户请求] --> B[API Gateway]
B --> C[事件总线]
end
subgraph 编排层
C --> D[Workflow Engine]
D --> E[状态管理器]
E --> F[任务调度器]
end
subgraph 执行层
F --> G[Agent 1]
F --> H[Agent 2]
F --> I[Agent 3]
G --> J[工具调用]
H --> J
I --> J
end
subgraph 持久化层
K[状态存储]
L[执行日志]
M[监控指标]
end
E --> K
G --> L
H --> L
I --> L
G --> M
H --> M
I --> M
2.1 工作流定义与 DSL 设计
我们设计了一套简单的领域特定语言(DSL)来定义 Agent 工作流,让产品经理和开发者都能参与流程设计。工作流由任务、条件分支、循环等基本元素组成:
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import json
class TaskType(Enum):
AGENT = "agent"
TOOL = "tool"
CONDITION = "condition"
PARALLEL = "parallel"
LOOP = "loop"
@dataclass
class Task:
id: str
type: TaskType
name: str
config: Dict[str, Any] = field(default_factory=dict)
dependencies: List[str] = field(default_factory=list)
on_error: Optional[str] = None # 错误处理策略
@dataclass
class Workflow:
id: str
name: str
tasks: List[Task]
entry_point: str
version: str = "1.0"
class WorkflowDefinitionLoader:
"""工作流定义加载器"""
@staticmethod
def from_dict(data: Dict[str, Any]) -> Workflow:
"""从字典加载工作流定义"""
tasks = [
Task(
id=t["id"],
type=TaskType(t["type"]),
name=t["name"],
config=t.get("config", {}),
dependencies=t.get("dependencies", []),
on_error=t.get("on_error")
)
for t in data["tasks"]
]
return Workflow(
id=data["id"],
name=data["name"],
tasks=tasks,
entry_point=data["entry_point"],
version=data.get("version", "1.0")
)
@staticmethod
def from_json(json_str: str) -> Workflow:
"""从 JSON 字符串加载"""
return WorkflowDefinitionLoader.from_dict(json.loads(json_str))
# 工作流定义示例
workflow_def = {
"id": "data_analysis",
"name": "数据分析工作流",
"entry_point": "query_data",
"tasks": [
{
"id": "query_data",
"type": "tool",
"name": "查询数据库",
"config": {"tool": "database_query"},
"dependencies": []
},
{
"id": "analyze_data",
"type": "agent",
"name": "数据分析 Agent",
"config": {"agent_id": "data_analyst"},
"dependencies": ["query_data"]
},
{
"id": "generate_chart",
"type": "tool",
"name": "生成图表",
"config": {"tool": "chart_generator"},
"dependencies": ["analyze_data"]
},
{
"id": "generate_report",
"type": "agent",
"name": "报告生成 Agent",
"config": {"agent_id": "report_writer"},
"dependencies": ["generate_chart"]
}
]
}
workflow = WorkflowDefinitionLoader.from_dict(workflow_def)
2.2 状态机与持久化设计
工作流执行是一个状态机,每个任务都有明确的生命周期:待执行、执行中、成功、失败、已取消。我们使用 Redis 缓存当前状态,使用 PostgreSQL 持久化完整的执行历史:
import uuid
from datetime import datetime
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskExecution:
id: str
task_id: str
workflow_id: str
execution_id: str
status: TaskStatus
input_data: Dict[str, Any]
output_data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
@dataclass
class WorkflowExecution:
id: str
workflow_id: str
status: TaskStatus
tasks: Dict[str, TaskExecution] = field(default_factory=dict)
input_data: Dict[str, Any] = field(default_factory=dict)
output_data: Optional[Dict[str, Any]] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class StateManager:
"""状态管理器,负责工作流和任务状态的持久化"""
def __init__(self, redis_client, db_connection):
self.redis = redis_client
self.db = db_connection
def create_workflow_execution(self, workflow_id: str,
input_data: Dict[str, Any]) -> str:
"""创建工作流执行实例"""
execution_id = str(uuid.uuid4())
execution = WorkflowExecution(
id=execution_id,
workflow_id=workflow_id,
status=TaskStatus.PENDING,
input_data=input_data,
started_at=datetime.now()
)
# 保存到 Redis 缓存
cache_key = f"workflow:{execution_id}"
self.redis.setex(cache_key, 3600, json.dumps({
"id": execution.id,
"workflow_id": execution.workflow_id,
"status": execution.status.value,
"started_at": execution.started_at.isoformat()
}))
return execution_id
def update_task_status(self, execution_id: str, task_id: str,
status: TaskStatus, output: Optional[Dict] = None,
error: Optional[str] = None):
"""更新任务状态"""
# 更新缓存
cache_key = f"workflow:{execution_id}:task:{task_id}"
update_data = {
"status": status.value,
"updated_at": datetime.now().isoformat()
}
if output:
update_data["output"] = output
if error:
update_data["error"] = error
self.redis.hset(cache_key, mapping=update_data)
def get_ready_tasks(self, execution_id: str,
workflow: Workflow) -> List[Task]:
"""获取可以执行的任务(所有依赖已完成)"""
ready_tasks = []
for task in workflow.tasks:
# 检查任务是否已处理过
cache_key = f"workflow:{execution_id}:task:{task.id}"
if self.redis.exists(cache_key):
task_data = self.redis.hgetall(cache_key)
status = TaskStatus(task_data.get(b"status", b"pending").decode())
if status != TaskStatus.PENDING:
continue
# 检查所有依赖是否已成功完成
all_deps_ready = True
for dep_id in task.dependencies:
dep_key = f"workflow:{execution_id}:task:{dep_id}"
if not self.redis.exists(dep_key):
all_deps_ready = False
break
dep_data = self.redis.hgetall(dep_key)
dep_status = TaskStatus(dep_data.get(b"status", b"pending").decode())
if dep_status != TaskStatus.SUCCESS:
all_deps_ready = False
break
if all_deps_ready:
ready_tasks.append(task)
return ready_tasks
2.3 事件总线与任务调度
我们使用事件总线来解耦工作流引擎和执行器:
import asyncio
from typing import Dict, Any, Callable
from abc import ABC, abstractmethod
class Event(ABC):
"""事件基类"""
@abstractmethod
def to_dict(self) -> Dict[str, Any]:
pass
class TaskScheduledEvent(Event):
"""任务调度事件"""
def __init__(self, execution_id: str, task: Task,
input_data: Dict[str, Any]):
self.execution_id = execution_id
self.task = task
self.input_data = input_data
def to_dict(self) -> Dict[str, Any]:
return {
"type": "task_scheduled",
"execution_id": self.execution_id,
"task_id": self.task.id,
"task_type": self.task.type.value,
"input_data": self.input_data
}
class TaskCompletedEvent(Event):
"""任务完成事件"""
def __init__(self, execution_id: str, task_id: str,
output_data: Dict[str, Any]):
self.execution_id = execution_id
self.task_id = task_id
self.output_data = output_data
def to_dict(self) -> Dict[str, Any]:
return {
"type": "task_completed",
"execution_id": self.execution_id,
"task_id": self.task_id,
"output_data": self.output_data
}
class EventBus:
"""事件总线"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable):
"""订阅事件"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event: Event):
"""发布事件"""
event_dict = event.to_dict()
event_type = event_dict["type"]
if event_type in self.subscribers:
for handler in self.subscribers[event_type]:
await handler(event_dict)
class WorkflowEngine:
"""工作流引擎"""
def __init__(self, event_bus: EventBus, state_manager: StateManager):
self.event_bus = event_bus
self.state_manager = state_manager
# 订阅任务完成事件
self.event_bus.subscribe("task_completed", self.on_task_completed)
async def start_workflow(self, workflow: Workflow,
input_data: Dict[str, Any]) -> str:
"""启动工作流"""
execution_id = self.state_manager.create_workflow_execution(
workflow.id, input_data
)
# 调度初始任务
await self.schedule_ready_tasks(execution_id, workflow, input_data)
return execution_id
async def schedule_ready_tasks(self, execution_id: str,
workflow: Workflow,
context: Dict[str, Any]):
"""调度可执行的任务"""
ready_tasks = self.state_manager.get_ready_tasks(execution_id, workflow)
for task in ready_tasks:
# 更新任务状态为运行中
self.state_manager.update_task_status(
execution_id, task.id, TaskStatus.RUNNING
)
# 发布任务调度事件
event = TaskScheduledEvent(execution_id, task, context)
await self.event_bus.publish(event)
async def on_task_completed(self, event_dict: Dict[str, Any]):
"""处理任务完成事件"""
execution_id = event_dict["execution_id"]
task_id = event_dict["task_id"]
output_data = event_dict["output_data"]
# 更新任务状态
self.state_manager.update_task_status(
execution_id, task_id, TaskStatus.SUCCESS, output=output_data
)
# 继续调度后续任务
# 这里需要加载工作流定义并获取上下文
# (省略具体实现)
三、条件分支与并行执行:复杂流程的灵活控制
3.1 条件分支实现
条件分支让工作流可以根据中间结果选择不同的执行路径:
class ConditionEvaluator:
"""条件评估器"""
@staticmethod
def evaluate(condition: str, context: Dict[str, Any]) -> bool:
"""
评估条件表达式
支持简单的比较运算: ==, !=, >, <, >=, <=
支持逻辑运算: and, or, not
"""
# 这里使用安全的表达式评估
# 生产环境中应该使用更严格的解析器
try:
# 创建安全的全局命名空间
safe_globals = {
"__builtins__": {},
"True": True,
"False": False,
"None": None
}
# 将上下文注入局部命名空间
safe_locals = context.copy()
# 安全评估
result = eval(condition, safe_globals, safe_locals)
return bool(result)
except Exception as e:
print(f"条件评估失败: {e}")
return False
# 使用示例
context = {"data_count": 1500, "has_error": False}
condition = "data_count > 1000 and not has_error"
result = ConditionEvaluator.evaluate(condition, context)
print(f"条件 '{condition}' 评估结果: {result}")
3.2 并行任务执行
对于可以并行处理的任务,我们支持并行执行以提高效率:
class ParallelTaskExecutor:
"""并行任务执行器"""
def __init__(self, max_concurrency: int = 5):
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
async def execute_task(self, task: Task, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""执行单个任务(带并发控制)"""
async with self.semaphore:
# 实际任务执行逻辑
print(f"执行任务: {task.name}")
await asyncio.sleep(1) # 模拟执行时间
return {"result": f"{task.name} 执行完成"}
async def execute_parallel(self, tasks: List[Task],
input_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""并行执行多个任务"""
coroutines = [
self.execute_task(task, input_data)
for task in tasks
]
results = await asyncio.gather(*coroutines, return_exceptions=True)
# 整理结果
output = {}
for task, result in zip(tasks, results):
if isinstance(result, Exception):
output[task.id] = {"error": str(result)}
else:
output[task.id] = result
return output
四、可观测性与调试工具:生产环境的必备设施
4.1 完整的执行日志与追踪
每个工作流执行都会生成详细的日志,包括任务的输入输出、执行时间、错误信息等:
import logging
from datetime import datetime
class WorkflowLogger:
"""工作流执行日志记录器"""
def __init__(self, log_directory: str = "./workflow_logs"):
self.log_directory = log_directory
import os
os.makedirs(log_directory, exist_ok=True)
# 配置日志
self.logger = logging.getLogger("workflow_engine")
self.logger.setLevel(logging.DEBUG)
# 文件处理器
file_handler = logging.FileHandler(
f"{log_directory}/workflow_{datetime.now().strftime('%Y%m%d')}.log"
)
file_handler.setLevel(logging.DEBUG)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def log_task_start(self, execution_id: str, task_id: str,
task_name: str, input_data: Dict):
"""记录任务开始"""
self.logger.info(
f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) started"
)
self.logger.debug(f"Input data: {json.dumps(input_data, ensure_ascii=False)}")
def log_task_complete(self, execution_id: str, task_id: str,
task_name: str, output_data: Dict,
duration_seconds: float):
"""记录任务完成"""
self.logger.info(
f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) "
f"completed in {duration_seconds:.2f}s"
)
self.logger.debug(f"Output data: {json.dumps(output_data, ensure_ascii=False)}")
def log_task_error(self, execution_id: str, task_id: str,
task_name: str, error: Exception):
"""记录任务错误"""
self.logger.error(
f"[Execution: {execution_id}] Task '{task_name}' ({task_id}) "
f"failed: {str(error)}"
)
4.2 可视化的执行追踪界面
为了帮助开发者调试工作流,我们构建了可视化界面,可以:
- 查看工作流的执行进度
- 检查每个任务的输入输出
- 回放执行过程
- 分析性能瓶颈
五、架构权衡与适用边界:编排系统的选型考量
| 架构模式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 线性脚本 | 简单直接 | 难以维护、扩展性差 | 简单、固定流程 |
| 事件驱动 | 解耦、高弹性 | 复杂度高 | 复杂、动态流程 |
| 状态机 | 清晰、可控 | 状态定义繁琐 | 有明确状态转换的场景 |
4.1 何时需要编排系统
构建编排系统有一定的复杂度,不是所有场景都需要。建议在以下情况下考虑:
- 工作流步骤超过 5 个
- 需要支持条件分支和并行执行
- 工作流需要频繁变更,希望非技术人员能参与
- 需要完整的执行历史和监控
如果只是简单的 2-3 个步骤,用普通的函数调用可能更合适。
4.2 与现有框架的对比
目前有一些开源的 Agent 编排框架,如 LangGraph、AutoGen 等。我们最终选择自己构建,主要是因为:
- 需要与现有的基础设施深度集成
- 对性能和延迟有较高要求
- 需要支持特定的业务规则
但对于大多数团队来说,先评估现有框架是否满足需求是更高效的选择。
五、总结
AI Agent 编排系统是复杂业务流程的核心基础设施。从简单的线性脚本到事件驱动架构,我们经历了多次迭代,最终构建了一套灵活、可靠、可观测的编排系统。
这套系统的核心要素包括:声明式的工作流定义 DSL、基于状态机的执行引擎、事件驱动的任务调度、完整的可观测性工具。在选择技术方案时,要根据业务复杂度和团队能力做出合适的权衡,不要过度设计。
对于 AI 创业公司来说,灵活的编排系统是快速迭代产品的关键。它让我们可以将复杂的 AI 能力封装成可重用的模块,通过可视化的方式组合出新的业务流程,大大加速了产品创新的速度。
更多推荐



所有评论(0)