单个 AI Agent 可以读取资料、调用工具并返回答案。

当系统进一步增加规划 Agent、代码 Agent、检索 Agent 和审核 Agent 后,问题就不再只是“模型会不会回答”,而是:

  • 任务应该发送给谁?
  • 对方是否具备所需能力?
  • 执行到一半如何取消?
  • 超时以后能否重试?
  • 多次请求会不会重复执行?
  • 出错时怎样定位责任节点?

如果各个 Agent 只是互相发送自然语言,系统很快就会变成一间所有人都在说话、却没人记录工单的办公室。

一、Agent 通信不能只有一个 prompt

最简单的调用方式可能长这样:

{
  "message": "请分析这个项目并修复登录错误"
}

它可以演示功能,却无法支撑稳定系统。

调用方不知道:

  • 当前消息属于哪个任务;
  • 接收方支持哪些操作;
  • 请求是否允许修改代码;
  • 最长执行时间是多少;
  • 返回内容是中间进度还是最终结果;
  • 请求失败后能否安全重试。

通信协议首先要把这些隐含约定变成明确字段。

二、设计统一的消息信封

可以为所有消息设计一层公共结构:

{
  "protocol": "agent-link/1.0",
  "message_id": "msg_8f12",
  "task_id": "task_20260622_001",
  "type": "request",
  "source": "planner",
  "target": "code-agent",
  "timestamp": "2026-06-22T10:30:00Z",
  "deadline": "2026-06-22T10:32:00Z",
  "payload": {},
  "metadata": {
    "trace_id": "trace_a91c",
    "attempt": 1
  }
}

公共字段负责解决追踪和路由问题,真正的业务参数放在 payload 中。

三、至少需要四种消息类型

1. 请求消息

用于创建任务或调用能力:

{
  "type": "request",
  "payload": {
    "action": "inspect_repository",
    "arguments": {
      "path": "src/auth",
      "question": "为什么刷新令牌会失效?"
    }
  }
}

2. 进度事件

长任务不应该一直沉默:

{
  "type": "event",
  "payload": {
    "event": "progress",
    "stage": "reading_dependencies",
    "percent": 45
  }
}

进度值不一定非常精确,但至少要让调用方知道任务仍在运行。

3. 最终响应

{
  "type": "response",
  "payload": {
    "status": "completed",
    "result": {
      "summary": "刷新令牌验证使用了错误的密钥",
      "changed_files": []
    }
  }
}

4. 错误消息

错误不应该只返回一句“执行失败”。

{
  "type": "error",
  "payload": {
    "code": "TOOL_PERMISSION_DENIED",
    "message": "当前 Agent 没有写入仓库的权限",
    "retryable": false,
    "details": {
      "required_permission": "repository.write"
    }
  }
}

结构化错误能帮助调度器判断应该重试、换节点,还是请求人工授权。

四、先发现能力,再分配任务

调度器不应该假设所有 Agent 都会做同样的事。

Agent 启动后可以发布能力描述:

{
  "agent_id": "code-agent-03",
  "capabilities": [
    {
      "name": "repository.inspect",
      "version": "1.2",
      "input_formats": ["text", "git-diff"],
      "streaming": true
    },
    {
      "name": "repository.patch",
      "version": "1.0",
      "requires_permission": "repository.write"
    }
  ],
  "limits": {
    "max_concurrent_tasks": 3,
    "max_task_seconds": 300
  }
}

调度器可以根据能力、负载和权限选择节点,而不是把任务广播给所有 Agent。

五、任务必须有明确状态机

一个可靠任务至少应包含这些状态:

CREATED
   ↓
ACCEPTED
   ↓
RUNNING
   ├──→ COMPLETED
   ├──→ FAILED
   ├──→ CANCELLED
   └──→ TIMED_OUT

状态转换必须受到限制。

例如,已经完成的任务不能重新变成运行中;正在执行的任务收到取消请求后,也不应该立即删除记录,而应进入取消处理中。

Python 可以简单实现为:

from enum import Enum


class TaskState(Enum):
    CREATED = "created"
    ACCEPTED = "accepted"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"


ALLOWED_TRANSITIONS = {
    TaskState.CREATED: {
        TaskState.ACCEPTED,
        TaskState.CANCELLED
    },
    TaskState.ACCEPTED: {
        TaskState.RUNNING,
        TaskState.CANCELLED
    },
    TaskState.RUNNING: {
        TaskState.COMPLETED,
        TaskState.FAILED,
        TaskState.CANCELLED
    }
}


def change_state(current, target):
    allowed = ALLOWED_TRANSITIONS.get(current, set())

    if target not in allowed:
        raise ValueError(
            f"invalid transition: {current} -> {target}"
        )

    return target

六、超时和重试不能混为一谈

请求超时只代表调用方没有按时收到结果,并不能证明任务没有执行。

假设支付 Agent 已经完成扣款,但响应在网络中丢失。调度器直接重试,就可能造成重复扣款。

因此,每个具有副作用的请求都应该携带幂等键:

{
  "message_id": "msg_1002",
  "idempotency_key": "order_9001_payment",
  "payload": {
    "action": "charge",
    "amount": 199
  }
}

执行方首先检查该键:

def execute_once(request, storage):
    key = request["idempotency_key"]

    previous = storage.find_result(key)
    if previous is not None:
        return previous

    result = perform_action(request["payload"])
    storage.save_result(key, result)

    return result

只有明确标记为 retryable 的错误,才应该自动重试。

七、取消任务需要协议支持

长时间运行的 Agent 应该定期检查取消信号:

async def run_task(task, cancellation):
    for step in task.steps:
        if cancellation.is_cancelled(task.id):
            await release_resources(task)
            return {
                "status": "cancelled"
            }

        await execute_step(step)

    return {
        "status": "completed"
    }

取消并不等于粗暴终止进程。

优雅取消通常需要:

  • 停止创建新子任务;
  • 撤销可以回滚的临时操作;
  • 关闭文件和网络连接;
  • 保存已有执行记录;
  • 返回最终取消状态。

八、流式输出也要能够恢复

大模型生成过程可能持续几十秒。使用事件流传输时,每个事件应拥有递增序号:

{
  "type": "event",
  "task_id": "task_001",
  "sequence": 18,
  "payload": {
    "event": "content_delta",
    "content": "正在检查认证模块……"
  }
}

连接断开后,客户端可以声明最后收到的序号:

{
  "type": "resume",
  "task_id": "task_001",
  "last_sequence": 18
}

服务端从第 19 条事件继续发送,而不是重新运行整个任务。

九、权限应该跟随任务,而不是跟随 Agent

不要因为某个 Agent“通常负责代码”,就永久授予它所有仓库的写权限。

请求中应明确声明授权范围:

{
  "authorization": {
    "permissions": [
      "repository.read",
      "tests.run"
    ],
    "resources": [
      "repo://demo-project"
    ],
    "expires_at": "2026-06-22T10:35:00Z"
  }
}

高风险操作还可以要求额外审批:

读取文件:自动允许
执行测试:自动允许
修改源码:按任务授权
部署生产:必须人工确认
删除资源:必须人工确认

十、可观测性不是附加功能

每条消息至少应携带:

  • trace_id:关联完整调用链;
  • task_id:关联同一个业务任务;
  • message_id:定位单条消息;
  • sourcetarget:确认通信双方;
  • attempt:记录第几次尝试;
  • 时间戳与耗时;
  • 请求和响应状态。

日志可以统一记录为:

{
  "trace_id": "trace_a91c",
  "task_id": "task_001",
  "source": "planner",
  "target": "search-agent",
  "action": "knowledge.search",
  "duration_ms": 842,
  "status": "completed",
  "attempt": 1
}

出现问题时,才能快速回答:

是谁发起的?
经过了哪些 Agent?
在哪一步等待?
哪次重试最终成功?
是否执行了有副作用的操作?

十一、一个实用的最小架构

中小型 Agent 系统不必一开始就设计得异常复杂。可以从以下组件开始:

调用方
   ↓
协议网关
   ├──身份与权限验证
   ├──消息格式校验
   ├──任务路由
   └──追踪记录
          ↓
      Agent 节点
          ↓
   任务状态存储

第一版只要确保以下能力即可:

  • 统一消息格式;
  • 能力发现;
  • 任务状态机;
  • 超时与取消;
  • 幂等控制;
  • 结构化错误;
  • 调用链追踪。

等系统真正出现跨组织通信、大规模节点发现或复杂流式会话后,再引入更完整的协议层。

总结

多个 Agent 能互相发送消息,并不意味着它们已经具备可靠协作能力。

真正可用的通信机制需要同时回答:

消息是什么?
任务属于谁?
当前执行到哪里?
失败后怎么办?
重复请求是否安全?
谁有权执行操作?
怎样还原完整过程?

模型决定 Agent 能做什么,协议则决定一群 Agent 能否长期、稳定而可控地一起工作。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐