A2A协议:智能代理协作

A2A协议旨在解决智能代理系统间的互操作性问题,其核心思想是通过标准化的通信协议,使不同代理能在不共享内存、思想或工具的情况下完成协作。这种去中心化的设计特别适用于云计算、物联网和微服务架构场景。

核心架构解析

三层角色模型

  • 用户(User):发起任务请求的人类或自动化系统
  • 客户端(Client):解析用户需求并构造A2A请求的代理
  • 远程代理(Server):执行具体任务并返回结果的智能服务

HTTP+SSE传输层

import requests
from sseclient import SSEClient as EventSource

# 标准HTTP请求示例
response = requests.post('https://agent.example/rpc', json={
    "jsonrpc": "2.0",
    "method": "tasks/send",
    "params": {"task_id": "task123", "instructions": "处理图像数据"},
    "id": 1
})

# SSE流式更新监听
messages = EventSource('https://agent.example/stream?task_id=task123')
for msg in messages:
    print(f"实时更新: {msg.data}")

JSON-RPC 2.0数据格式

{
  "jsonrpc": "2.0",
  "method": "tasks/get",
  "params": {"task_id": "task123"},
  "id": 2
}

任务全生命周期管理

A2A协议通过标准化的任务对象实现状态跟踪和协作,我们来看完整的任务处理流程:

任务创建与初始化

def create_task(client, task_id, instructions):
    response = client.post('/tasks/send', json={
        "task_id": task_id,
        "instructions": instructions,
        "context": {"user_id": "alice", "priority": "high"}
    })
    return response.json()

# 使用示例
new_task = create_task(
    client=requests.Session(),
    task_id="img_process_001",
    instructions="识别图片中的交通标志"
)

状态轮询与流式更新

def monitor_task(task_id):
    # 轮询模式
    while True:
        response = requests.post('https://agent.example/rpc', json={
            "method": "tasks/get",
            "params": {"task_id": task_id}
        })
        status = response.json().get('result', {}).get('status')
        if status == 'completed':
            break
        time.sleep(5)

    # SSE流式模式
    with EventSource(f'https://agent.example/stream?task_id={task_id}') as events:
        for event in events:
            if event.event == 'progress':
                print(f"处理进度: {event.data}%")

错误处理机制

A2A协议定义了标准化的错误代码体系:

错误代码 描述 处理策略
-32600 无效请求 检查JSON格式和必填字段
-32601 方法不存在 验证API端点和方法名称
-32602 参数无效 添加参数校验逻辑
-32001 任务不存在 检查任务ID有效性
def safe_request(method, params):
    try:
        response = requests.post('https://agent.example/rpc', json={
            "jsonrpc": "2.0",
            "method": method,
            "params": params,
            "id": uuid.uuid4()
        })
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"网络错误: {str(e)}")
    except json.JSONDecodeError:
        print("响应格式错误")

# 错误处理示例
try:
    result = safe_request("tasks/invalid_method", {})
    if 'error' in result:
        error = result['error']
        print(f"错误 {error['code']}: {error['message']}")
except Exception as e:
    print(f"未捕获异常: {str(e)}")

高级功能

流式处理与实时更新

class TaskMonitor:
    def __init__(self, task_id):
        self.task_id = task_id
        self.event_source = SSEClient(f'https://agent.example/stream?task_id={task_id}')

    def start(self):
        for event in self.event_source:
            if event.event == 'artifact':
                self.handle_artifact(event.data)
            elif event.event == 'status':
                self.update_status(event.data)

    def handle_artifact(self, artifact_data):
        with open(f'result_{self.task_id}.jpg', 'wb') as f:
            f.write(base64.b64decode(artifact_data['content']))

# 使用示例
monitor = TaskMonitor("img_process_001")
monitor.start()

多轮对话与上下文传递

def multi_turn_dialog(task_id):
    context = {}
    while True:
        response = requests.post('https://agent.example/rpc', json={
            "method": "tasks/get",
            "params": {"task_id": task_id}
        })
        
        status = response.json()['result']['status']
        if status == 'input-required':
            required_input = response.json()['result']['required_input']
            user_input = input(f"需要补充信息: {required_input['prompt']}\n")
            
            context.update({
                "user_input": user_input,
                "timestamp": datetime.now().isoformat()
            })
            
            requests.post('https://agent.example/rpc', json={
                "method": "tasks/update",
                "params": {"task_id": task_id, "context": context}
            })
        elif status == 'completed':
            break

非文本媒体处理

def process_image(image_path):
    with open(image_path, 'rb') as f:
        image_data = base64.b64encode(f.read()).decode('utf-8')
    
    response = requests.post('https://agent.example/rpc', json={
        "method": "tasks/send",
        "params": {
            "task_id": "image_task_001",
            "instructions": "识别图片内容",
            "artifacts": [{"type": "image/jpeg", "content": image_data}]
        }
    })
    
    return response.json()

# 使用示例
result = process_image("traffic_sign.jpg")
print(f"识别结果: {result['result']['artifacts'][0]['content']}")

实践

Agent Card标准化
每个代理应提供标准化的能力描述文件(Agent Card),示例:

{
  "name": "Image Recognition Agent",
  "version": "1.0",
  "methods": ["image/process", "image/analyze"],
  "authentication": {"type": "api_key", "endpoint": "/auth"}
}

幂等性设计

def idempotent_request(task_id, action):
    idempotency_key = uuid.uuid4().hex
    response = requests.post('https://agent.example/rpc', json={
        "method": action,
        "params": {"task_id": task_id},
        "idempotency_key": idempotency_key
    })
    return response.json()
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐