别让多个 AI Agent 靠“口头约定”协作:通信协议设计实战

单个 AI Agent 可以读取资料、调用工具并返回答案。
当系统进一步增加规划 Agent、代码 Agent、检索 Agent 和审核 Agent 后,问题就不再只是“模型会不会回答”,而是:
- 任务应该发送给谁?
- 对方是否具备所需能力?
- 执行到一半如何取消?
- 超时以后能否重试?
- 多次请求会不会重复执行?
- 出错时怎样定位责任节点?
如果各个 Agent 只是互相发送自然语言,系统很快就会变成一间所有人都在说话、却没人记录工单的办公室。
一、Agent 通信不能只有一个 prompt
最简单的调用方式可能长这样:
{
"message": "请分析这个项目并修复登录错误"
}
它可以演示功能,却无法支撑稳定系统。
调用方不知道:
- 当前消息属于哪个任务;
- 接收方支持哪些操作;
- 请求是否允许修改代码;
- 最长执行时间是多少;
- 返回内容是中间进度还是最终结果;
- 请求失败后能否安全重试。
通信协议首先要把这些隐含约定变成明确字段。
二、设计统一的消息信封
可以为所有消息设计一层公共结构:
{
"protocol": "agent-link/1.0",
"message_id": "msg_8f12",
"task_id": "task_20260622_001",
"type": "request",
"source": "planner",
"target": "code-agent",
"timestamp": "2026-06-22T10:30:00Z",
"deadline": "2026-06-22T10:32:00Z",
"payload": {},
"metadata": {
"trace_id": "trace_a91c",
"attempt": 1
}
}
公共字段负责解决追踪和路由问题,真正的业务参数放在 payload 中。
三、至少需要四种消息类型
1. 请求消息
用于创建任务或调用能力:
{
"type": "request",
"payload": {
"action": "inspect_repository",
"arguments": {
"path": "src/auth",
"question": "为什么刷新令牌会失效?"
}
}
}
2. 进度事件
长任务不应该一直沉默:
{
"type": "event",
"payload": {
"event": "progress",
"stage": "reading_dependencies",
"percent": 45
}
}
进度值不一定非常精确,但至少要让调用方知道任务仍在运行。
3. 最终响应
{
"type": "response",
"payload": {
"status": "completed",
"result": {
"summary": "刷新令牌验证使用了错误的密钥",
"changed_files": []
}
}
}
4. 错误消息
错误不应该只返回一句“执行失败”。
{
"type": "error",
"payload": {
"code": "TOOL_PERMISSION_DENIED",
"message": "当前 Agent 没有写入仓库的权限",
"retryable": false,
"details": {
"required_permission": "repository.write"
}
}
}
结构化错误能帮助调度器判断应该重试、换节点,还是请求人工授权。
四、先发现能力,再分配任务
调度器不应该假设所有 Agent 都会做同样的事。
Agent 启动后可以发布能力描述:
{
"agent_id": "code-agent-03",
"capabilities": [
{
"name": "repository.inspect",
"version": "1.2",
"input_formats": ["text", "git-diff"],
"streaming": true
},
{
"name": "repository.patch",
"version": "1.0",
"requires_permission": "repository.write"
}
],
"limits": {
"max_concurrent_tasks": 3,
"max_task_seconds": 300
}
}
调度器可以根据能力、负载和权限选择节点,而不是把任务广播给所有 Agent。
五、任务必须有明确状态机
一个可靠任务至少应包含这些状态:
CREATED
↓
ACCEPTED
↓
RUNNING
├──→ COMPLETED
├──→ FAILED
├──→ CANCELLED
└──→ TIMED_OUT
状态转换必须受到限制。
例如,已经完成的任务不能重新变成运行中;正在执行的任务收到取消请求后,也不应该立即删除记录,而应进入取消处理中。
Python 可以简单实现为:
from enum import Enum
class TaskState(Enum):
CREATED = "created"
ACCEPTED = "accepted"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
ALLOWED_TRANSITIONS = {
TaskState.CREATED: {
TaskState.ACCEPTED,
TaskState.CANCELLED
},
TaskState.ACCEPTED: {
TaskState.RUNNING,
TaskState.CANCELLED
},
TaskState.RUNNING: {
TaskState.COMPLETED,
TaskState.FAILED,
TaskState.CANCELLED
}
}
def change_state(current, target):
allowed = ALLOWED_TRANSITIONS.get(current, set())
if target not in allowed:
raise ValueError(
f"invalid transition: {current} -> {target}"
)
return target
六、超时和重试不能混为一谈
请求超时只代表调用方没有按时收到结果,并不能证明任务没有执行。
假设支付 Agent 已经完成扣款,但响应在网络中丢失。调度器直接重试,就可能造成重复扣款。
因此,每个具有副作用的请求都应该携带幂等键:
{
"message_id": "msg_1002",
"idempotency_key": "order_9001_payment",
"payload": {
"action": "charge",
"amount": 199
}
}
执行方首先检查该键:
def execute_once(request, storage):
key = request["idempotency_key"]
previous = storage.find_result(key)
if previous is not None:
return previous
result = perform_action(request["payload"])
storage.save_result(key, result)
return result
只有明确标记为 retryable 的错误,才应该自动重试。
七、取消任务需要协议支持
长时间运行的 Agent 应该定期检查取消信号:
async def run_task(task, cancellation):
for step in task.steps:
if cancellation.is_cancelled(task.id):
await release_resources(task)
return {
"status": "cancelled"
}
await execute_step(step)
return {
"status": "completed"
}
取消并不等于粗暴终止进程。
优雅取消通常需要:
- 停止创建新子任务;
- 撤销可以回滚的临时操作;
- 关闭文件和网络连接;
- 保存已有执行记录;
- 返回最终取消状态。
八、流式输出也要能够恢复
大模型生成过程可能持续几十秒。使用事件流传输时,每个事件应拥有递增序号:
{
"type": "event",
"task_id": "task_001",
"sequence": 18,
"payload": {
"event": "content_delta",
"content": "正在检查认证模块……"
}
}
连接断开后,客户端可以声明最后收到的序号:
{
"type": "resume",
"task_id": "task_001",
"last_sequence": 18
}
服务端从第 19 条事件继续发送,而不是重新运行整个任务。
九、权限应该跟随任务,而不是跟随 Agent
不要因为某个 Agent“通常负责代码”,就永久授予它所有仓库的写权限。
请求中应明确声明授权范围:
{
"authorization": {
"permissions": [
"repository.read",
"tests.run"
],
"resources": [
"repo://demo-project"
],
"expires_at": "2026-06-22T10:35:00Z"
}
}
高风险操作还可以要求额外审批:
读取文件:自动允许
执行测试:自动允许
修改源码:按任务授权
部署生产:必须人工确认
删除资源:必须人工确认
十、可观测性不是附加功能
每条消息至少应携带:
trace_id:关联完整调用链;task_id:关联同一个业务任务;message_id:定位单条消息;source与target:确认通信双方;attempt:记录第几次尝试;- 时间戳与耗时;
- 请求和响应状态。
日志可以统一记录为:
{
"trace_id": "trace_a91c",
"task_id": "task_001",
"source": "planner",
"target": "search-agent",
"action": "knowledge.search",
"duration_ms": 842,
"status": "completed",
"attempt": 1
}
出现问题时,才能快速回答:
是谁发起的?
经过了哪些 Agent?
在哪一步等待?
哪次重试最终成功?
是否执行了有副作用的操作?
十一、一个实用的最小架构
中小型 Agent 系统不必一开始就设计得异常复杂。可以从以下组件开始:
调用方
↓
协议网关
├──身份与权限验证
├──消息格式校验
├──任务路由
└──追踪记录
↓
Agent 节点
↓
任务状态存储
第一版只要确保以下能力即可:
- 统一消息格式;
- 能力发现;
- 任务状态机;
- 超时与取消;
- 幂等控制;
- 结构化错误;
- 调用链追踪。
等系统真正出现跨组织通信、大规模节点发现或复杂流式会话后,再引入更完整的协议层。
总结
多个 Agent 能互相发送消息,并不意味着它们已经具备可靠协作能力。
真正可用的通信机制需要同时回答:
消息是什么?
任务属于谁?
当前执行到哪里?
失败后怎么办?
重复请求是否安全?
谁有权执行操作?
怎样还原完整过程?
模型决定 Agent 能做什么,协议则决定一群 Agent 能否长期、稳定而可控地一起工作。
更多推荐

所有评论(0)