AI Agent智能体工程化实战:2026年从“对话“到“干活“的技术全解
·
2026年,AI Agent相关内容阅读量增长340%,搜索热度首次超越"大模型"本身。本文深度解析Agent核心架构(规划、记忆、工具、协作),附生产级代码实现,覆盖ReAct、Plan-and-Execute、多Agent协作三大范式。
1. 为什么2026年是Agent爆发元年
1.1 从"百模大战"到"Agent落地"
2023-2024: 百模大战
→ 各家疯狂训练大模型,模型能力快速提升
→ 但实际落地场景有限,大量模型"跑在Demo里"
2025: RAG普及年
→ 检索增强生成成为标配,解决幻觉问题
→ 企业开始把AI用到知识库、客服等场景
2026: Agent爆发年 ⭐
→ 大模型本身不再稀缺,垂直场景微调+RAG+函数调用+工作流
→ AI从"被动回答"升级为"主动完成任务"
→ 多Agent协作成为主流架构
1.2 Agent vs 传统AI的核心区别
传统AI (RAG模式):
用户 → "查询订单状态" → [LLM + RAG] → 直接回答 → 结束
特点: 一次性请求,无状态,无法执行多步操作
AI Agent模式:
用户 → "帮我处理这个退货申请"
↓
[Agent] 理解意图
↓
[规划] 拆解任务: ①查询订单 → ②验证状态 → ③生成退货单 → ④通知用户
↓
[执行] 调用工具(API/代码/数据库)
↓
[反馈] 返回结果 + 自动执行后续步骤
↓
整个过程无需人工干预,可跨系统操作
1.3 CSDN 2026数据支撑
| 指标 | 数据 |
|---|---|
| Agent内容阅读量增长 | 340% (2025下半年) |
| Agent搜索热度 vs 大模型 | 首次超越 |
| 企业Agent部署率 | 仅11%,但成功者正"Agent-First" |
| 开发者必备技能 | RAG + Agent框架 + Prompt工程 |
2. Agent核心架构:四要素模型
2.1 四要素详解
┌─────────────────────────────────────────────────────────────┐
│ AI Agent 四要素架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Planning │ ← 任务规划与分解 │
│ │ (规划器) │ │
│ └───────┬──────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌───────┴───────┐ ┌───────┴───────┐ ┌──────┴──────┐ │
│ │ Memory │ │ Tools │ │ Collaboration│ │
│ │ (记忆) │ │ (工具) │ │ (协作) │ │
│ │ │ │ │ │ │ │
│ │ 短期/长期记忆 │ │ API/代码/搜索 │ │ 多Agent通信 │ │
│ │ 会话/向量存储 │ │ 函数调用/Filp │ │ A2A协议/HTTP │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
2.2 Agent基类实现
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import json
import time
class AgentState(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
WAITING = "waiting"
DONE = "done"
ERROR = "error"
@dataclass
class Message:
"""对话消息"""
role: str # "user" | "assistant" | "system" | "tool"
content: str
tool_call: Optional[Dict] = None
tool_result: Optional[str] = None
timestamp: float = field(default_factory=time.time)
@dataclass
class Tool:
"""工具定义"""
name: str
description: str
parameters: Dict[str, Any]
func: Callable
requires_confirmation: bool = False
@dataclass
class PlanStep:
"""规划步骤"""
step_id: str
description: str
tool_name: Optional[str] = None
tool_args: Optional[Dict] = None
status: str = "pending" # pending | running | done | failed
result: Any = None
error: Optional[str] = None
class BaseAgent(ABC):
"""
Agent基类:四要素架构
- Planning: 任务规划与分解
- Memory: 短期/长期记忆管理
- Tools: 工具注册与调用
- Collaboration: Agent间协作接口
"""
def __init__(self, name: str, model_client: Any = None):
self.name = name
self.model = model_client
self.state = AgentState.IDLE
# 核心组件
self.tools: Dict[str, Tool] = {}
self.short_term_memory: List[Message] = []
self.long_term_memory: Optional[Any] = None # 向量数据库
self.plan: List[PlanStep] = []
# 统计
self.stats = {
"total_steps": 0,
"tool_calls": 0,
"errors": 0,
"start_time": None,
}
# ====== 工具管理 ======
def register_tool(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool
return self
def register_tools(self, tools: List[Tool]):
for tool in tools:
self.register_tool(tool)
return self
def get_tool_schemas(self) -> List[Dict]:
"""生成工具Schema(供LLM调用)"""
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
}
}
for tool in self.tools.values()
]
@tool_call
def execute_tool(self, tool_name: str, arguments: Dict) -> str:
"""执行工具"""
self.stats["tool_calls"] += 1
if tool_name not in self.tools:
return f"Error: Tool '{tool_name}' not found. Available tools: {list(self.tools.keys())}"
tool = self.tools[tool_name]
try:
result = tool.func(**arguments)
return json.dumps(result, ensure_ascii=False)
except Exception as e:
self.stats["errors"] += 1
return f"Error executing {tool_name}: {str(e)}"
# ====== 记忆管理 ======
def add_message(self, role: str, content: str, **kwargs):
"""添加消息到短期记忆"""
msg = Message(role=role, content=content, **kwargs)
self.short_term_memory.append(msg)
# 限制短期记忆长度
if len(self.short_term_memory) > 50:
self.short_term_memory = self.short_term_memory[-50:]
return msg
def get_context(self, max_turns: int = 10) -> List[Dict]:
"""获取对话上下文"""
recent = self.short_term_memory[-max_turns:]
return [
{"role": m.role, "content": m.content}
for m in recent
]
def save_to_long_term(self, key: str, value: Any):
"""保存到长期记忆"""
if self.long_term_memory:
self.long_term_memory.add(key, value)
def retrieve_from_long_term(self, query: str, top_k: int = 3) -> List[Any]:
"""从长期记忆检索"""
if self.long_term_memory:
return self.long_term_memory.search(query, top_k)
return []
# ====== 核心推理循环 ======
@abstractmethod
def plan_action(self, user_input: str) -> List[PlanStep]:
"""规划下一步行动(子类实现)"""
pass
@abstractmethod
def execute_step(self, step: PlanStep) -> Any:
"""执行单个步骤(子类实现)"""
pass
def run(self, user_input: str, max_iterations: int = 10) -> str:
"""Agent主循环"""
self.stats["start_time"] = time.time()
self.state = AgentState.PLANNING
self.add_message("user", user_input)
self.plan = self.plan_action(user_input)
for i, step in enumerate(self.plan):
if i >= max_iterations:
return f"Max iterations ({max_iterations}) reached. Partial result: {self.get_context()}"
self.state = AgentState.EXECUTING
step.status = "running"
self.stats["total_steps"] += 1
try:
result = self.execute_step(step)
step.status = "done"
step.result = result
self.add_message("tool", str(result),
tool_call={"name": step.tool_name, "args": step.tool_args})
except Exception as e:
step.status = "failed"
step.error = str(e)
self.stats["errors"] += 1
self.state = AgentState.DONE
return self._generate_final_response()
def _generate_final_response(self) -> str:
"""生成最终回复"""
return "Task completed. Steps executed: " + ", ".join(
s.description for s in self.plan if s.status == "done"
)
def get_stats(self) -> Dict:
"""获取运行统计"""
elapsed = time.time() - self.stats["start_time"] if self.stats["start_time"] else 0
return {
**self.stats,
"elapsed_seconds": round(elapsed, 2),
"success_rate": f"{(self.stats['total_steps'] - self.stats['errors']) / max(1, self.stats['total_steps']) * 100:.1f}%"
}
3. ReAct模式:推理与行动合一
3.1 ReAct原理
ReAct (Reason + Act) 是2026年最流行的Agent推理范式,核心思想:每一步都是"思考→行动→观察"的循环。
# ReAct核心循环
# Thought → Action → Observation → Thought → ...
"""
用户: 北京今天适合跑步吗?
Step 1: Thought=我需要知道北京的天气情况才能判断
Action=get_weather(city="北京")
Observation={"temp": 28, "AQI": 85, "weather": "多云"}
Step 2: Thought=温度28度偏高,AQI 85空气质量一般,不算理想
Action=search_sports_advice(temp=28, aqi=85)
Observation={"advice": "建议傍晚跑步,多云天气注意补水"}
Step 3: Final Answer=今天北京多云,28度,AQI 85
不算最佳跑步日,建议傍晚6点左右跑步,注意补水
"""
3.2 ReAct Agent实现
from typing import List, Tuple
class ReActAgent(BaseAgent):
"""
ReAct推理模式Agent
核心: Thought → Action → Observation 循环
"""
MAX_REACT_STEPS = 15
def __init__(self, name: str, model_client: Any = None):
super().__init__(name, model_client)
self.thought_history: List[str] = []
self.observation_history: List[str] = []
def plan_action(self, user_input: str) -> List[PlanStep]:
"""
ReAct规划:让LLM自行决定Thought+Action
返回: 包含Thought推理的PlanStep列表
"""
prompt = self._build_react_prompt(user_input)
response = self.model.chat([{"role": "user", "content": prompt}])
steps = self._parse_react_response(response.content)
return steps
def _build_react_prompt(self, user_input: str) -> str:
"""构建ReAct推理Prompt"""
tool_schemas = "\n".join([
f"- {t.name}: {t.description} (args: {list(t.parameters.get('properties', {}).keys())})"
for t in self.tools.values()
])
history = ""
for t, o in zip(self.thought_history, self.observation_history):
history += f"\nThought: {t}\nObservation: {o}\n"
return f"""你是一个智能助手。请通过"Thought-Action-Observation"循环回答用户问题。
可用工具:
{tool_schemas}
历史记录:
{history or "(空)"}
当前问题: {user_input}
请按以下格式回复:
Thought: 你的思考过程,为什么要做这个动作
Action: 工具名称 (参数用JSON格式)
Observation: 工具返回的结果
如果问题已解决,回复:
Thought: 我已经获得足够信息
Final Answer: 你的完整回答
开始:"""
def _parse_react_response(self, response: str) -> List[PlanStep]:
"""解析ReAct响应"""
steps = []
lines = response.strip().split("\n")
i = 0
step_id = 1
while i < len(lines) and step_id <= self.MAX_REACT_STEPS:
line = lines[i].strip()
if line.startswith("Thought:"):
thought = line[8:].strip()
self.thought_history.append(thought)
elif line.startswith("Action:"):
action_line = line[7:].strip()
# 解析 "tool_name(args)"
if "(" in action_line:
tool_part = action_line.split("(")[0].strip()
args_part = action_line.split("(")[1].rstrip(")").strip()
try:
args = json.loads(args_part) if args_part else {}
except:
args = {}
step = PlanStep(
step_id=f"step_{step_id}",
description=f"{thought} → {tool_part}",
tool_name=tool_part,
tool_args=args
)
steps.append(step)
step_id += 1
elif line.startswith("Final Answer:"):
# 最终回答,添加一个finalize步骤
final_answer = line[12:].strip()
steps.append(PlanStep(
step_id=f"step_{step_id}",
description="生成最终回答",
tool_name=None,
tool_args={"answer": final_answer}
))
break
i += 1
return steps
def execute_step(self, step: PlanStep) -> str:
"""执行ReAct步骤"""
if step.tool_name is None:
# Final Answer步骤
return step.tool_args.get("answer", "")
if step.tool_name not in self.tools:
return f"Tool '{step.tool_name}' not found"
# 添加思考过程到记忆
if self.thought_history:
self.add_message("assistant", self.thought_history[-1])
# 执行工具
result = self.execute_tool(step.tool_name, step.tool_args)
# 记录观察结果
self.observation_history.append(result)
return result
def run(self, user_input: str, max_iterations: int = 15) -> str:
"""ReAct主循环(显式迭代控制)"""
self.stats["start_time"] = time.time()
self.thought_history = []
self.observation_history = []
for iteration in range(max_iterations):
# 构建ReAct Prompt
prompt = self._build_react_prompt(user_input)
# 调用LLM
response = self.model.chat([
{"role": "system", "content": "你是一个使用ReAct推理模式的智能助手。"},
{"role": "user", "content": prompt}
])
# 解析响应
steps = self._parse_react_response(response.content)
if not steps:
return "无法生成有效的行动计划"
# 执行第一个步骤
step = steps[0]
if step.tool_name is None:
# 最终回答
return step.tool_args.get("answer", "")
# 执行工具
result = self.execute_tool(step.tool_name, step.tool_args)
self.thought_history.append(
steps[0].description.split(" → ")[0] if " → " in steps[0].description else ""
)
self.observation_history.append(result)
# 将结果注入到下一步(循环)
user_input = f"继续,基于之前的观察结果: {result}"
return "达到最大迭代次数"
3.3 实战:退货处理Agent
# 定义退货处理工具
def get_order(order_id: str) -> dict:
"""查询订单信息"""
return {
"order_id": order_id,
"status": "已发货",
"amount": 299.00,
"create_time": "2026-06-25",
"can_return": True,
"return_deadline": "2026-07-05"
}
def validate_return_eligibility(order: dict, reason: str) -> dict:
"""验证退货资格"""
if not order["can_return"]:
return {"eligible": False, "reason": "订单状态不允许退货"}
if "已过退货期限" in reason:
return {"eligible": False, "reason": "已过退货期限"}
return {"eligible": True, "reason": "可以退货"}
def create_return_order(order_id: str, reason: str) -> dict:
"""创建退货单"""
return {
"return_id": f"RO{order_id[2:]}",
"order_id": order_id,
"status": "待取件",
"pickup_address": "北京市朝阳区xxx",
"logistics_company": "顺丰速运",
"pickup_code": "SF12345678"
}
def send_notification(user_id: str, message: str, channel: str = "wechat") -> dict:
"""发送通知"""
return {"sent": True, "channel": channel, "message": message}
# 注册工具并运行Agent
agent = ReActAgent(
name="退货助手",
model_client=your_model_client
)
agent.register_tools([
Tool("get_order", "查询订单信息", {"order_id": {"type": "string"}}, get_order),
Tool("validate_return", "验证退货资格", {
"order": {"type": "object"},
"reason": {"type": "string"}
}, validate_return_eligibility),
Tool("create_return_order", "创建退货单", {
"order_id": {"type": "string"},
"reason": {"type": "string"}
}, create_return_order),
Tool("send_notification", "发送通知", {
"user_id": {"type": "string"},
"message": {"type": "string"},
"channel": {"type": "string"}}
, send_notification),
])
result = agent.run("用户ID 12345,想退订单 ORD20260625001,原因是收到商品破损")
print(result)
4. Plan-and-Execute:规划-执行分离
4.1 原理
Plan-and-Execute 与 ReAct 的核心区别:先规划全部步骤,再逐个执行。适合复杂长任务,避免每步都调用LLM造成的高成本和延迟。
ReAct: 思考→行动→观察→思考→行动→观察... (边想边做)
问题: 每个步骤都要调用LLM,成本高,延迟大
Plan-and-Execute:
Phase 1 - Plan (一次LLM调用):
用户: "帮我做一份季度销售报告,包含同比环比分析"
LLM规划:
Step 1: 查询Q2销售数据
Step 2: 查询Q1销售数据(同比)
Step 3: 计算同比/环比增长率
Step 4: 生成图表
Step 5: 导出PDF报告
Phase 2 - Execute (执行器逐个执行):
执行器 → 查询数据库 → 计算数据 → 生成图表 → 导出PDF
(无需每次调用LLM,执行器按计划行动)
4.2 Plan-and-Execute实现
class PlanAndExecuteAgent(BaseAgent):
"""
Plan-and-Execute 模式Agent
优势: 规划成本低(一次LLM调用),执行效率高
适合: 复杂长任务,多步骤工作流
"""
def __init__(self, name: str, model_client: Any = None,
replan_threshold: float = 0.3):
super().__init__(name, model_client)
self.replan_threshold = replan_threshold # 失败率超过30%则重新规划
def plan_action(self, user_input: str) -> List[PlanStep]:
"""规划阶段:一次性生成完整计划"""
tool_schemas = "\n".join([
f"- {t.name}: {t.description}"
for t in self.tools.values()
])
prompt = f"""用户需求: {user_input}
可用工具:
{tool_schemas}
请将这个任务分解为精确的执行步骤。每个步骤必须:
1. 有明确的执行目标
2. 指定要使用的工具
3. 工具参数具体明确
请以JSON数组格式返回:
[
{{
"step_id": "step_1",
"description": "步骤描述",
"tool_name": "工具名",
"tool_args": {{"参数": "值"}}
}},
...
]
只返回JSON,不要其他文字:"""
response = self.model.chat([
{"role": "user", "content": prompt}
])
try:
# 提取JSON
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]
steps_data = json.loads(content.strip())
return [PlanStep(**s) for s in steps_data]
except Exception as e:
return [PlanStep(
step_id="step_1",
description="无法生成有效计划",
tool_name=None
)]
def execute_step(self, step: PlanStep) -> str:
"""执行单个步骤"""
if step.tool_name is None:
return step.description
if step.tool_name not in self.tools:
return f"Tool '{step.tool_name}' not found"
return self.execute_tool(step.tool_name, step.tool_args)
def should_replan(self) -> bool:
"""检查是否需要重新规划"""
if not self.plan:
return False
failed = sum(1 for s in self.plan if s.status == "failed")
total = len(self.plan)
return (failed / total) > self.replan_threshold
def run(self, user_input: str, max_replan: int = 2) -> str:
"""Plan-and-Execute主循环"""
self.stats["start_time"] = time.time()
self.add_message("user", user_input)
for attempt in range(max_replan + 1):
# ===== Phase 1: 规划 =====
self.state = AgentState.PLANNING
self.plan = self.plan_action(user_input)
if not self.plan or self.plan[0].tool_name is None:
return "无法生成有效计划"
# ===== Phase 2: 执行 =====
self.state = AgentState.EXECUTING
results = []
for step in self.plan:
step.status = "running"
self.stats["total_steps"] += 1
try:
result = self.execute_step(step)
step.status = "done"
step.result = result
results.append({"step": step.description, "result": result})
except Exception as e:
step.status = "failed"
step.error = str(e)
self.stats["errors"] += 1
results.append({"step": step.description, "error": str(e)})
# ===== Phase 3: 检查是否需要重新规划 =====
if not self.should_replan():
break
else:
# 重新规划,跳过失败的步骤
failed_steps = [s for s in self.plan if s.status == "failed"]
context = f"上轮计划中以下步骤失败: {[s.description for s in failed_steps]}"
user_input = f"{user_input}\n\n上下文: {context}"
return self._summarize_results(results)
def _summarize_results(self, results: List[Dict]) -> str:
"""汇总执行结果"""
summary = "执行完成:\n"
for r in results:
if "error" in r:
summary += f"- {r['step']}: ❌ {r['error']}\n"
else:
summary += f"- {r['step']}: ✅\n"
summary += f"\n成功率: {self.get_stats()['success_rate']}"
return summary
4.3 实战:季度报告自动生成
# 季度报告生成Agent
report_agent = PlanAndExecuteAgent(
name="报告生成助手",
model_client=your_model
)
report_agent.register_tools([
Tool("query_sales_data", "查询销售数据", {
"quarter": {"type": "string", "description": "季度,如Q2"},
"year": {"type": "integer"}
}, query_sales_data),
Tool("calculate_growth", "计算增长率", {
"current": {"type": "number"},
"previous": {"type": "number"}
}, calculate_growth),
Tool("generate_chart", "生成图表", {
"data": {"type": "object"},
"chart_type": {"type": "string"}
}, generate_chart),
Tool("export_pdf", "导出PDF", {
"content": {"type": "string"},
"filename": {"type": "string"}
}, export_pdf),
Tool("send_email", "发送邮件", {
"to": {"type": "string"},
"subject": {"type": "string"},
"body": {"type": "string"}
}, send_email),
])
result = report_agent.run(
"生成2026年Q2季度销售报告,包含同比(Q1)、环比(上一季度)分析,"
"并发送给ceo@company.com"
)
5. 多Agent协作架构
5.1 协作模式对比
┌────────────────────────────────────────────────────────────────┐
│ 多Agent协作模式对比 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 模式1: 层级式 (Hierarchical) │
│ ┌───────────┐ │
│ │ Supervisor │ → 分析任务 → 分发给下属Agent │
│ │ (主管) │ ← 收集结果 ← │
│ └─────┬─────┘ │
│ │ │
│ ┌───┴───┬───────────┐ │
│ ▼ ▼ ▼ │
│ ┌────┐ ┌────┐ ┌────┐ │
│ │Data│ │Code│ │Doc │ │
│ │Agent│ │Agent│ │Agent│ │
│ └────┘ └────┘ └────┘ │
│ │
│ 模式2: 同级协作 (Peer-to-Peer) │
│ ┌────┐ ┌────┐ ┌────┐ │
│ │Research│ ←→ │ Coder│ ←→ │Review│ │
│ │ Agent │ │Agent │ │Agent │ │
│ └────┘ └────┘ └────┘ │
│ ↑ ↑ ↑ │
│ └────────────┴────────────┘ │
│ 消息总线 (Message Bus) │
│ │
│ 模式3: 辩论式 (Debate) │
│ ┌────┐ ┌────┐ │
│ │Agent A│ VS │Agent B│ → 裁判LLM仲裁 │
│ │(正方) │ │(反方) │ │
│ └────┘ └────┘ │
│ 裁判: 综合双方论点,输出最终结论 │
│ │
└────────────────────────────────────────────────────────────────┘
5.2 A2A协议实现
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from enum import Enum
import uuid
import json
class MessageType(Enum):
TASK = "task" # 分配任务
RESULT = "result" # 返回结果
ERROR = "error" # 出错
HEARTBEAT = "heartbeat" # 心跳
QUERY = "query" # 查询状态
@dataclass
class AgentMessage:
"""Agent间通信消息"""
msg_id: str
sender: str
receiver: str
msg_type: MessageType
content: Any
task_id: Optional[str] = None
reply_to: Optional[str] = None
timestamp: float = field(default_factory=time.time)
metadata: Dict = field(default_factory=dict)
class AgentMessageBus:
"""
Agent消息总线 (类A2A协议实现)
支持: 点对点通信、广播、任务分发
"""
def __init__(self):
self.agents: Dict[str, 'BaseAgent'] = {}
self.inbox: Dict[str, List[AgentMessage]] = {} # agent_id -> 消息队列
self.pending_tasks: Dict[str, Dict] = {} # task_id -> 任务状态
self.subscribers: Dict[str, List[str]] = {} # topic -> agent_ids
def register(self, agent: BaseAgent):
"""注册Agent"""
self.agents[agent.name] = agent
self.inbox[agent.name] = []
async def send(self, message: AgentMessage):
"""发送消息"""
if message.receiver not in self.inbox:
raise ValueError(f"Unknown receiver: {message.receiver}")
self.inbox[message.receiver].append(message)
# 跟踪任务状态
if message.task_id:
self.pending_tasks[message.task_id] = {
"status": "sent",
"receiver": message.receiver,
"msg_id": message.msg_id
}
async def broadcast(self, sender: str, content: Any,
topic: Optional[str] = None):
"""广播消息"""
if topic and topic in self.subscribers:
targets = self.subscribers[topic]
else:
targets = [a for a in self.agents.keys() if a != sender]
for target in targets:
msg = AgentMessage(
msg_id=str(uuid.uuid4()),
sender=sender,
receiver=target,
msg_type=MessageType.TASK,
content=content
)
await self.send(msg)
def subscribe(self, agent_id: str, topic: str):
"""订阅主题"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(agent_id)
async def receive(self, agent_id: str) -> Optional[AgentMessage]:
"""接收消息(非阻塞)"""
if not self.inbox.get(agent_id):
return None
return self.inbox[agent_id].pop(0)
def get_task_status(self, task_id: str) -> Optional[Dict]:
return self.pending_tasks.get(task_id)
class MultiAgentOrchestrator:
"""
多Agent编排器
模式: 层级式编排 + 结果聚合
"""
def __init__(self, message_bus: AgentMessageBus):
self.bus = message_bus
self.supervisor: Optional[BaseAgent] = None
self.worker_agents: Dict[str, BaseAgent] = {}
self.task_results: Dict[str, Any] = {}
def set_supervisor(self, agent: BaseAgent):
self.supervisor = agent
def add_worker(self, name: str, agent: BaseAgent):
self.worker_agents[name] = agent
self.bus.register(agent)
async def run_hierarchical_task(self, task: str) -> str:
"""
层级式任务执行
1. Supervisor分析任务
2. 分发给多个Worker
3. 聚合结果
"""
task_id = str(uuid.uuid4())
# ===== Step 1: Supervisor分析并规划 =====
plan = self.supervisor.plan_action(task)
# ===== Step 2: 分发子任务给Worker =====
worker_tasks = {}
for step in plan:
# 根据步骤类型选择合适的Worker
worker_name = self._route_to_worker(step)
if worker_name not in worker_tasks:
worker_tasks[worker_name] = []
worker_tasks[worker_name].append(step)
# 并行执行各Worker的任务
async def run_worker_tasks(name: str, steps: List[PlanStep]):
agent = self.worker_agents[name]
results = []
for step in steps:
result = agent.execute_step(step)
results.append({"step": step.description, "result": result})
self.task_results[name] = results
# 并发执行所有Worker
await asyncio.gather(*[
run_worker_tasks(name, steps)
for name, steps in worker_tasks.items()
])
# ===== Step 3: Supervisor聚合结果 =====
all_results = json.dumps(self.task_results, ensure_ascii=False)
final_prompt = f"原始任务: {task}\n\n各Worker结果:\n{all_results}\n\n请综合所有结果,给出最终回答。"
final_response = self.supervisor.model.chat([
{"role": "user", "content": final_prompt}
])
return final_response.content
def _route_to_worker(self, step: PlanStep) -> str:
"""根据步骤类型路由到合适的Worker"""
tool_name = step.tool_name or ""
routing_rules = {
"data": ["query", "search", "fetch", "get", "calculate"],
"code": ["write", "execute", "run", "compile", "test"],
"doc": ["write", "format", "export", "generate_report"],
"review": ["check", "validate", "review", "audit"],
}
for worker, keywords in routing_rules.items():
if any(kw in tool_name.lower() for kw in keywords):
return worker
return list(self.worker_agents.keys())[0] # 默认第一个
class MultiAgentDebate:
"""
多Agent辩论框架
Agent A (正方) vs Agent B (反方) → 裁判仲裁
"""
def __init__(self, judge_model: Any, num_rounds: int = 3):
self.judge = judge_model
self.num_rounds = num_rounds
self.positive_agent: Optional[BaseAgent] = None
self.negative_agent: Optional[BaseAgent] = None
def set_agents(self, positive: BaseAgent, negative: BaseAgent):
self.positive_agent = positive
self.negative_agent = negative
def run_debate(self, topic: str) -> Dict:
"""运行辩论,返回最终裁决"""
context = {
"topic": topic,
"rounds": [],
"verdict": ""
}
for round_num in range(1, self.num_rounds + 1):
# 正方发言
pos_response = self.positive_agent.run(
f"辩论主题: {topic}\n"
f"你是正方,请提出支持该观点的最强论据。\n"
f"这是第{round_num}轮,请针对之前的讨论深化论点。"
)
# 反方发言
neg_response = self.negative_agent.run(
f"辩论主题: {topic}\n"
f"你是反方,请提出反对该观点的最强论据。\n"
f"这是第{round_num}轮,请针对正方的论点进行反驳。"
)
context["rounds"].append({
"round": round_num,
"positive": pos_response,
"negative": neg_response
})
# 裁判裁决
debate_summary = "\n\n".join([
f"第{r['round']}轮:\n正方: {r['positive']}\n反方: {r['negative']}"
for r in context["rounds"]
])
judge_prompt = f"""辩论主题: {topic}
辩论内容:
{debate_summary}
作为裁判,请:
1. 总结正方和反方的核心论点
2. 指出哪方的论据更有说服力
3. 给出你的最终裁决和理由
请用JSON格式返回:
{{
"positive_summary": "正方核心论点",
"negative_summary": "反方核心论点",
"winner": "positive/negative/neutral",
"reasoning": "裁决理由"
}}"""
verdict_response = self.judge.chat([{"role": "user", "content": judge_prompt}])
try:
context["verdict"] = json.loads(verdict_response.content)
except:
context["verdict"] = {"winner": "neutral", "reasoning": "解析失败"}
return context
6. 工具生态:Function Calling实战
6.1 主流框架对比
| 框架 | 定位 | 特点 | 适用场景 |
|---|---|---|---|
| LangChain | 全栈框架 | 组件丰富,但较重 | 快速原型 |
| LangGraph | 图编排 | 状态机式的Agent流程 | 复杂工作流 |
| AutoGen | 多Agent | 微软出品,对话式协作 | 多Agent系统 |
| CrewAI | 多Agent | 角色分工清晰 | 企业场景 |
| 自研 | 定制 | 完全可控,无依赖 | 生产环境 |
6.2 OpenAI Function Calling
import openai
from typing import List, Optional
class OpenAIFunctionAgent(BaseAgent):
"""基于OpenAI Function Calling的Agent"""
def __init__(self, name: str, api_key: str, model: str = "gpt-4o"):
super().__init__(name)
self.client = openai.OpenAI(api_key=api_key)
self.model = model
def run(self, user_input: str, max_turns: int = 10) -> str:
"""Function Calling主循环"""
messages = [
{"role": "system", "content": f"你是{self.name},一个AI助手。"}
]
messages.append({"role": "user", "content": user_input})
for turn in range(max_turns):
# 调用OpenAI,附带工具定义
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.get_tool_schemas(),
tool_choice="auto"
)
response_message = response.choices[0].message
# 检查是否需要调用工具
if response_message.tool_calls:
messages.append({
"role": "assistant",
"content": response_message.content or "",
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
for tc in response_message.tool_calls
]
})
# 执行工具
for tc in response_message.tool_calls:
tool_name = tc.function.name
args = json.loads(tc.function.arguments)
result = self.execute_tool(tool_name, args)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result
})
else:
# 直接回答
messages.append({
"role": "assistant",
"content": response_message.content
})
return response_message.content
return "达到最大对话轮次"
6.3 工具调用最佳实践
# ===== 实践1: 带确认的工具调用 =====
class ConfirmedFunctionAgent(OpenAIFunctionAgent):
"""带用户确认的工具调用"""
def run(self, user_input: str) -> str:
messages = [{"role": "user", "content": user_input}]
while True:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.get_tool_schemas()
)
msg = response.choices[0].message
if msg.tool_calls:
for tc in msg.tool_calls:
tool = self.tools.get(tc.function.name)
if tool and tool.requires_confirmation:
# 需要用户确认
confirmation_prompt = (
f"即将执行: {tool.name}\n"
f"参数: {tc.function.arguments}\n"
f"确认执行? (yes/no)"
)
# 这里应暂停等待用户确认
confirmed = self._ask_user_confirmation(confirmation_prompt)
if not confirmed:
messages.append({
"role": "assistant",
"content": f"用户取消了工具调用: {tool.name}"
})
continue
result = self.execute_tool(
tc.function.name,
json.loads(tc.function.arguments)
)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result
})
else:
return msg.content
# ===== 实践2: 批量工具并行调用 =====
class ParallelFunctionAgent(OpenAIFunctionAgent):
"""支持批量并行工具调用的Agent"""
def run(self, user_input: str) -> str:
messages = [{"role": "user", "content": user_input}]
while True:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.get_tool_schemas()
)
msg = response.choices[0].message
if msg.tool_calls:
# 收集所有工具调用
tool_calls = msg.tool_calls
messages.append({
"role": "assistant",
"content": msg.content or "",
"tool_calls": tool_calls
})
# 并行执行所有工具(asyncio)
async def execute_all():
tasks = [
self.execute_tool(
tc.function.name,
json.loads(tc.function.arguments)
)
for tc in tool_calls
]
return await asyncio.gather(*tasks)
results = asyncio.run(execute_all())
# 添加所有结果
for tc, result in zip(tool_calls, results):
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result
})
else:
return msg.content
7. 记忆系统深度实现
7.1 混合记忆架构
import numpy as np
from typing import List, Tuple
from datetime import datetime, timedelta
class VectorMemory:
"""
向量记忆系统 (长期记忆)
使用简单向量相似度(生产环境建议用Milvus/Pinecone/FAISS)
"""
def __init__(self, embedding_model: str = "text-embedding-3-small",
similarity_threshold: float = 0.7):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.memories: List[Dict] = [] # 内存版存储
# 简单向量数据库(生产用专用向量库)
self.embeddings: List[np.ndarray] = []
def add(self, content: str, metadata: Optional[Dict] = None) -> str:
"""添加记忆"""
memory_id = str(uuid.uuid4())
# 生成embedding
embedding = self._get_embedding(content)
self.memories.append({
"id": memory_id,
"content": content,
"metadata": metadata or {},
"created_at": datetime.now().isoformat(),
"access_count": 0,
"last_accessed": None
})
self.embeddings.append(embedding)
return memory_id
def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""语义检索"""
query_embedding = self._get_embedding(query)
# 计算余弦相似度
scores = []
for emb in self.embeddings:
score = self._cosine_similarity(query_embedding, emb)
scores.append(score)
# 排序取top_k
top_indices = np.argsort(scores)[::-1][:top_k]
results = []
for idx in top_indices:
if scores[idx] >= self.similarity_threshold:
self.memories[idx]["access_count"] += 1
self.memories[idx]["last_accessed"] = datetime.now().isoformat()
results.append({
**self.memories[idx],
"score": float(scores[idx])
})
return results
def _get_embedding(self, text: str) -> np.ndarray:
"""调用embedding模型(这里简化)"""
# 生产中调用OpenAI/Cohere等embedding API
# return openai_client.embeddings.create(input=text, model=self.embedding_model)
# 简化:返回随机向量
return np.random.randn(1536)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8))
def forget_old(self, days: int = 30, access_threshold: int = 2):
"""遗忘不常用的旧记忆"""
cutoff = datetime.now() - timedelta(days=days)
new_memories = []
new_embeddings = []
for mem, emb in zip(self.memories, self.embeddings):
created = datetime.fromisoformat(mem["created_at"])
if created < cutoff and mem["access_count"] < access_threshold:
continue # 遗忘
new_memories.append(mem)
new_embeddings.append(emb)
self.memories = new_memories
self.embeddings = new_embeddings
class AgentMemoryManager:
"""
Agent记忆管理器:整合短期+长期+工作记忆
"""
def __init__(self, vector_memory: VectorMemory):
self.short_term: List[Message] = [] # 当前会话
self.working: Dict[str, Any] = {} # 当前任务工作区
self.long_term: VectorMemory = vector_memory
self.max_short_term = 50
def remember(self, query: str) -> List[Dict]:
"""检索相关记忆"""
# 1. 先查长期记忆
relevant = self.long_term.search(query, top_k=3)
# 2. 查短期记忆(关键词匹配)
short_matches = [
m for m in self.short_term
if any(word in m.content for word in query.split()[:3])
]
return {
"long_term": relevant,
"short_term": short_matches[-5:], # 最近5条相关
"working": self.working
}
def memorize(self, content: str, importance: str = "normal"):
"""存储记忆"""
# 短期记忆
self.short_term.append(Message(role="system", content=content))
# 超过限制则压缩
if len(self.short_term) > self.max_short_term:
self._compress_short_term()
# 高重要性存入长期
if importance == "high":
self.long_term.add(content, metadata={"importance": "high"})
def _compress_short_term(self):
"""压缩短期记忆:摘要+关键信息"""
if len(self.short_term) < 10:
return
# 保留最近的10条,其余摘要
recent = self.short_term[-10:]
old = self.short_term[:-10]
summary = f"[会话摘要,{len(old)}条消息]: " + " ".join(
m.content[:100] for m in old[:5]
)
self.long_term.add(summary, metadata={"type": "session_summary"})
self.short_term = recent
8. 生产级部署与监控
8.1 Agent服务化部署
# FastAPI Agent服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import uvicorn
app = FastAPI(title="AI Agent Service", version="1.0")
# 全局Agent实例(生产用对象池)
agent_pool: Dict[str, BaseAgent] = {}
class TaskRequest(BaseModel):
task: str
agent_type: str = "react"
max_iterations: int = 10
user_id: Optional[str] = None
class TaskResponse(BaseModel):
result: str
stats: Dict
task_id: str
@app.post("/agent/run", response_model=TaskResponse)
async def run_agent_task(request: TaskRequest):
"""运行Agent任务"""
task_id = str(uuid.uuid4())
# 选择/创建Agent
agent = agent_pool.get(request.agent_type)
if not agent:
agent = create_agent(request.agent_type)
agent_pool[request.agent_type] = agent
try:
result = agent.run(request.task, max_iterations=request.max_iterations)
return TaskResponse(
result=result,
stats=agent.get_stats(),
task_id=task_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/agent/{agent_type}/stats")
async def get_agent_stats(agent_type: str):
"""获取Agent运行统计"""
agent = agent_pool.get(agent_type)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
return agent.get_stats()
@app.post("/agent/{agent_type}/tool/{tool_name}")
async def call_tool(agent_type: str, tool_name: str, args: dict):
"""直接调用工具"""
agent = agent_pool.get(agent_type)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
if tool_name not in agent.tools:
raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found")
result = agent.execute_tool(tool_name, args)
return {"result": result}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
8.2 Agent监控指标
# Prometheus监控指标
from prometheus_client import Counter, Histogram, Gauge
# 计数器
agent_requests_total = Counter(
'agent_requests_total',
'Total agent requests',
['agent_type', 'status']
)
tool_calls_total = Counter(
'agent_tool_calls_total',
'Total tool calls',
['tool_name', 'status']
)
# 直方图
agent_duration = Histogram(
'agent_request_duration_seconds',
'Agent request duration',
['agent_type'],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)
# 仪表
active_agents = Gauge(
'agent_active_count',
'Number of active agents'
)
token_usage = Gauge(
'agent_token_usage',
'Token usage by agent',
['agent_type', 'token_type'] # token_type: input/output
)
class AgentMetrics:
"""Agent指标收集器"""
def __init__(self, agent_name: str):
self.agent_name = agent_name
self.start_time = None
def track_request(self):
self.start_time = time.time()
active_agents.inc()
def track_completion(self, status: str = "success"):
agent_requests_total.labels(
agent_type=self.agent_name,
status=status
).inc()
if self.start_time:
duration = time.time() - self.start_time
agent_duration.labels(self.agent_name).observe(duration)
active_agents.dec()
def track_tool_call(self, tool_name: str, status: str):
tool_calls_total.labels(tool_name=tool_name, status=status).inc()
9. 总结
Agent开发路线图
Level 1: 基础Agent
- 单Agent,ReAct模式
- 少量工具,固定工作流
→ 适合: 简单问答、客服机器人
Level 2: 规划型Agent
- Plan-and-Execute
- 多步骤工作流
- 短期+长期记忆
→ 适合: 复杂业务自动化、数据处理
Level 3: 多Agent协作
- A2A协议通信
- 层级/同级/辩论模式
- Agent注册与发现
→ 适合: 企业级应用、大型工作流
Level 4: 生产级Agent系统
- 服务化部署
- 完善的监控与可观测性
- 限流/熔断/重试
→ 适合: 商业化产品
2026年开发者行动清单
| 优先级 | 技能 | 工具 |
|---|---|---|
| ⭐⭐⭐ | RAG工程化 | Milvus/Pinecone |
| ⭐⭐⭐ | Agent框架 | LangChain/LangGraph/CrewAI |
| ⭐⭐ | 多Agent协作 | A2A协议、消息总线 |
| ⭐⭐ | Prompt工程 | 结构化输出、Few-shot |
| ⭐ | 模型微调 | LoRA/QLoRA |
更多推荐


所有评论(0)