Google开源A2A(Agent to Agent)协议:开启智能体协作模式
A2A协议旨在解决智能代理系统间的互操作性问题,其核心思想是通过标准化的通信协议,使不同代理能在不共享内存、思想或工具的情况下完成协作。
·
A2A协议:智能代理协作
A2A协议旨在解决智能代理系统间的互操作性问题,其核心思想是通过标准化的通信协议,使不同代理能在不共享内存、思想或工具的情况下完成协作。这种去中心化的设计特别适用于云计算、物联网和微服务架构场景。
核心架构解析
三层角色模型
- 用户(User):发起任务请求的人类或自动化系统
- 客户端(Client):解析用户需求并构造A2A请求的代理
- 远程代理(Server):执行具体任务并返回结果的智能服务
HTTP+SSE传输层
import requests
from sseclient import SSEClient as EventSource
# 标准HTTP请求示例
response = requests.post('https://agent.example/rpc', json={
"jsonrpc": "2.0",
"method": "tasks/send",
"params": {"task_id": "task123", "instructions": "处理图像数据"},
"id": 1
})
# SSE流式更新监听
messages = EventSource('https://agent.example/stream?task_id=task123')
for msg in messages:
print(f"实时更新: {msg.data}")
JSON-RPC 2.0数据格式
{
"jsonrpc": "2.0",
"method": "tasks/get",
"params": {"task_id": "task123"},
"id": 2
}
任务全生命周期管理
A2A协议通过标准化的任务对象实现状态跟踪和协作,我们来看完整的任务处理流程:
任务创建与初始化
def create_task(client, task_id, instructions):
response = client.post('/tasks/send', json={
"task_id": task_id,
"instructions": instructions,
"context": {"user_id": "alice", "priority": "high"}
})
return response.json()
# 使用示例
new_task = create_task(
client=requests.Session(),
task_id="img_process_001",
instructions="识别图片中的交通标志"
)
状态轮询与流式更新
def monitor_task(task_id):
# 轮询模式
while True:
response = requests.post('https://agent.example/rpc', json={
"method": "tasks/get",
"params": {"task_id": task_id}
})
status = response.json().get('result', {}).get('status')
if status == 'completed':
break
time.sleep(5)
# SSE流式模式
with EventSource(f'https://agent.example/stream?task_id={task_id}') as events:
for event in events:
if event.event == 'progress':
print(f"处理进度: {event.data}%")
错误处理机制
A2A协议定义了标准化的错误代码体系:
错误代码 | 描述 | 处理策略 |
---|---|---|
-32600 | 无效请求 | 检查JSON格式和必填字段 |
-32601 | 方法不存在 | 验证API端点和方法名称 |
-32602 | 参数无效 | 添加参数校验逻辑 |
-32001 | 任务不存在 | 检查任务ID有效性 |
def safe_request(method, params):
try:
response = requests.post('https://agent.example/rpc', json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": uuid.uuid4()
})
return response.json()
except requests.exceptions.RequestException as e:
print(f"网络错误: {str(e)}")
except json.JSONDecodeError:
print("响应格式错误")
# 错误处理示例
try:
result = safe_request("tasks/invalid_method", {})
if 'error' in result:
error = result['error']
print(f"错误 {error['code']}: {error['message']}")
except Exception as e:
print(f"未捕获异常: {str(e)}")
高级功能
流式处理与实时更新
class TaskMonitor:
def __init__(self, task_id):
self.task_id = task_id
self.event_source = SSEClient(f'https://agent.example/stream?task_id={task_id}')
def start(self):
for event in self.event_source:
if event.event == 'artifact':
self.handle_artifact(event.data)
elif event.event == 'status':
self.update_status(event.data)
def handle_artifact(self, artifact_data):
with open(f'result_{self.task_id}.jpg', 'wb') as f:
f.write(base64.b64decode(artifact_data['content']))
# 使用示例
monitor = TaskMonitor("img_process_001")
monitor.start()
多轮对话与上下文传递
def multi_turn_dialog(task_id):
context = {}
while True:
response = requests.post('https://agent.example/rpc', json={
"method": "tasks/get",
"params": {"task_id": task_id}
})
status = response.json()['result']['status']
if status == 'input-required':
required_input = response.json()['result']['required_input']
user_input = input(f"需要补充信息: {required_input['prompt']}\n")
context.update({
"user_input": user_input,
"timestamp": datetime.now().isoformat()
})
requests.post('https://agent.example/rpc', json={
"method": "tasks/update",
"params": {"task_id": task_id, "context": context}
})
elif status == 'completed':
break
非文本媒体处理
def process_image(image_path):
with open(image_path, 'rb') as f:
image_data = base64.b64encode(f.read()).decode('utf-8')
response = requests.post('https://agent.example/rpc', json={
"method": "tasks/send",
"params": {
"task_id": "image_task_001",
"instructions": "识别图片内容",
"artifacts": [{"type": "image/jpeg", "content": image_data}]
}
})
return response.json()
# 使用示例
result = process_image("traffic_sign.jpg")
print(f"识别结果: {result['result']['artifacts'][0]['content']}")
实践
Agent Card标准化
每个代理应提供标准化的能力描述文件(Agent Card),示例:
{
"name": "Image Recognition Agent",
"version": "1.0",
"methods": ["image/process", "image/analyze"],
"authentication": {"type": "api_key", "endpoint": "/auth"}
}
幂等性设计
def idempotent_request(task_id, action):
idempotency_key = uuid.uuid4().hex
response = requests.post('https://agent.example/rpc', json={
"method": action,
"params": {"task_id": task_id},
"idempotency_key": idempotency_key
})
return response.json()
更多推荐
所有评论(0)