AI Agent 的Tool Calling 工程陷阱:从幂等性到失败重试的6 个生产踩坑
我们的 AI 客服 Agent 上线三周后,有一天收到用户投诉,说同一张工单被创建了三次。查日志才发现:Agent 调用 create_ticket 工具时,因为网络抖动触发了重试,结果三次调用全部成功,数据库里躺着三条一模一样的记录。

我们的 AI 客服 Agent 上线三周后,有一天收到用户投诉,说同一张工单被创建了三次。查日志才发现:Agent 调用 create_ticket 工具时,因为网络抖动触发了重试,结果三次调用全部成功,数据库里躺着三条一模一样的记录。
这是我遇到的第一个 Tool Calling 生产 bug,但不是最后一个。
过去一年多,我在两个 AI Agent 项目里踩了很多坑:幂等性、超时、并发、返回值解析、工具选择混乱……每一个坑单独看不起眼,但放在生产环境里,加上用户量和并发,都能变成故障。
这篇文章把 6 个最常见的 Tool Calling 工程陷阱整理出来,每个坑都附实际代码,以及我们最终用了什么方式绕开它。
陷阱一:写操作工具不幂等,重试就出问题
这是最经典的坑,也是最容易被忽视的。
Tool Calling 的调用链里有好几个地方可能触发重试:
- LLM API 超时后 SDK 自动重试
- Agent 框架检测到工具返回错误后主动重试
- 网络层的透明重传
大部分读操作(search_web、get_order_status)天然幂等,重试无害。但写操作不一样:
# 危险的写操作工具
def create_support_ticket(user_id: str, issue: str, priority: str) -> dict:
ticket_id = db.insert("tickets", {
"user_id": user_id,
"issue": issue,
"priority": priority,
"created_at": datetime.now()
})
return {"ticket_id": ticket_id, "status": "created"}
每次调用都会在数据库插入一条新记录,没有任何去重逻辑。如果 Agent 重试,用户就会看到重复工单。
解法:给每次工具调用生成一个 idempotency key,写入端校验并去重。
import hashlib
import json
def create_support_ticket(
user_id: str,
issue: str,
priority: str,
idempotency_key: str | None = None
) -> dict:
# 如果没传 key,基于参数生成一个确定性 key
if not idempotency_key:
payload = json.dumps(
{"user_id": user_id, "issue": issue, "priority": priority},
sort_keys=True
)
idempotency_key = hashlib.sha256(payload.encode()).hexdigest()[:16]
# 先查是否已经执行过
existing = db.query_one(
"SELECT ticket_id FROM tickets WHERE idempotency_key = %s",
[idempotency_key]
)
if existing:
return {
"ticket_id": existing["ticket_id"],
"status": "already_created",
"idempotent": True
}
# 插入时带上 idempotency_key
ticket_id = db.insert("tickets", {
"user_id": user_id,
"issue": issue,
"priority": priority,
"idempotency_key": idempotency_key,
"created_at": datetime.now()
})
return {"ticket_id": ticket_id, "status": "created", "idempotent": False}
在工具定义里把 idempotency_key 作为可选参数暴露出去,Agent 调用时由框架层注入(或者在工具描述里说明 LLM 可以自己传一个唯一标识符)。
另一个方案是在 Agent 框架层做调用去重:
class ToolCallDeduplicator:
def __init__(self, ttl_seconds: int = 300):
self.cache = {} # 生产中换成 Redis
self.ttl = ttl_seconds
def is_duplicate(self, tool_name: str, args: dict) -> bool:
key = self._make_key(tool_name, args)
now = time.time()
if key in self.cache:
ts, _ = self.cache[key]
if now - ts < self.ttl:
return True
return False
def record(self, tool_name: str, args: dict, result: dict):
key = self._make_key(tool_name, args)
self.cache[key] = (time.time(), result)
def get_cached_result(self, tool_name: str, args: dict) -> dict | None:
key = self._make_key(tool_name, args)
if key in self.cache:
_, result = self.cache[key]
return result
return None
def _make_key(self, tool_name: str, args: dict) -> str:
payload = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
return hashlib.md5(payload.encode()).hexdigest()
陷阱二:工具超时没有硬限制,Agent 卡住等死
我们有一个工具叫 run_sql_query,允许 Agent 对数据库执行只读查询。某次 Agent 生成了一个没有 LIMIT 的查询:
SELECT * FROM user_events WHERE event_type = 'click';
这张表有两千万行。查询跑了 40 秒,占满了数据库连接,把整个 Agent 服务拖慢了。
工具超时在 Agent 场景里有两层:
- 网络/IO 超时:工具调用的外部服务响应时间
- 整体任务超时:整个 Agent run 不能无限跑下去
两层超时都要设:
import asyncio
from functools import wraps
def with_timeout(seconds: float, error_message: str = "Tool execution timed out"):
"""给工具调用加超时装饰器"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
except asyncio.TimeoutError:
return {
"error": error_message,
"timeout_seconds": seconds,
"status": "timeout"
}
@wraps(func)
def sync_wrapper(*args, **kwargs):
import signal
def handler(signum, frame):
raise TimeoutError(error_message)
signal.signal(signal.SIGALRM, handler)
signal.alarm(int(seconds))
try:
result = func(*args, **kwargs)
signal.alarm(0)
return result
except TimeoutError:
return {
"error": error_message,
"timeout_seconds": seconds,
"status": "timeout"
}
finally:
signal.alarm(0)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# 用法
@with_timeout(seconds=10, error_message="数据库查询超时,请优化查询条件")
async def run_sql_query(query: str, params: list | None = None) -> dict:
# 额外加 statement_timeout 防止数据库端慢查询
async with db.acquire() as conn:
await conn.execute("SET statement_timeout = '8s'")
rows = await conn.fetch(query, *(params or []))
return {"rows": [dict(r) for r in rows], "count": len(rows)}
对于整个 Agent run,要在 orchestrator 层加一个全局超时:
class AgentOrchestrator:
def __init__(self, max_run_seconds: int = 120, max_tool_calls: int = 30):
self.max_run_seconds = max_run_seconds
self.max_tool_calls = max_tool_calls
async def run(self, user_message: str) -> str:
start_time = time.time()
tool_call_count = 0
messages = [{"role": "user", "content": user_message}]
while True:
# 全局超时检查
elapsed = time.time() - start_time
if elapsed > self.max_run_seconds:
return f"任务超时(已运行 {elapsed:.0f}s),请拆解成更小的子任务"
# 工具调用次数上限
if tool_call_count >= self.max_tool_calls:
return f"工具调用次数已达上限({self.max_tool_calls}次),任务终止"
# 调用 LLM
response = await llm.chat(messages)
if response.stop_reason == "end_turn":
return response.content
# 执行工具调用
for tool_call in response.tool_calls:
tool_call_count += 1
result = await self.execute_tool(tool_call)
messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": str(result)})
陷阱三:并发工具调用没有资源隔离,互相干扰
现代 LLM(GPT-4o、Claude 3.5 Sonnet)在一次响应里可以返回多个工具调用,框架通常会并发执行。
这本来是好事——并发执行多个独立工具比串行快得多。
问题出在"独立"这个假设上。如果两个工具调用都在操作同一个资源,没有并发控制,就会出现竞态条件:
# Agent 同时触发了两个工具调用:
# 1. update_user_plan(user_id="123", plan="pro")
# 2. apply_promo_code(user_id="123", code="SUMMER30")
# 两个都在修改用户账户,没有锁保护
async def parallel_tool_execution(tool_calls: list) -> list:
# 简单的并发执行——危险!
tasks = [execute_tool(tc) for tc in tool_calls]
return await asyncio.gather(*tasks)
解法:按资源维度分组,组内串行,组间并行。
from collections import defaultdict
async def safe_parallel_tool_execution(tool_calls: list) -> list:
"""
对工具调用按资源分组,同一资源的操作串行执行,
不同资源的操作并行执行
"""
# 每个工具需要声明自己的 resource_key
# 例如: create_ticket -> None (无状态)
# update_user -> f"user:{user_id}"
groups = defaultdict(list)
orderless = [] # 无资源竞争的工具
for i, tc in enumerate(tool_calls):
resource_key = get_tool_resource_key(tc)
if resource_key is None:
orderless.append((i, tc))
else:
groups[resource_key].append((i, tc))
results = [None] * len(tool_calls)
async def run_group(tool_call_group):
for idx, tc in tool_call_group:
results[idx] = await execute_tool(tc)
# 无竞争的工具直接并发
tasks = [execute_tool(tc) for i, tc in orderless]
orderless_results = await asyncio.gather(*tasks)
for (i, _), r in zip(orderless, orderless_results):
results[i] = r
# 有资源竞争的组串行执行,组间并发
group_tasks = [run_group(g) for g in groups.values()]
await asyncio.gather(*group_tasks)
return results
def get_tool_resource_key(tool_call) -> str | None:
"""根据工具名和参数推断资源 key"""
resource_tools = {
"update_user": lambda args: f"user:{args.get('user_id')}",
"apply_promo_code": lambda args: f"user:{args.get('user_id')}",
"update_order": lambda args: f"order:{args.get('order_id')}",
}
tool_name = tool_call.function.name
if tool_name in resource_tools:
args = json.loads(tool_call.function.arguments)
return resource_tools[tool_name](args)
return None # 无状态工具
陷阱四:工具返回值太大,把 context 撑爆
Agent 让工具返回数据,LLM 消化数据后继续推理。这个模式很自然,但有个隐患:工具返回的内容直接进 context,很快就把 token 限制撑满。
我们有一个 search_documents 工具,初版直接返回搜索到的全文:
# 危险版本:返回完整文档内容
def search_documents(query: str, top_k: int = 5) -> dict:
docs = vector_db.search(query, top_k=top_k)
return {
"results": [
{
"title": doc.title,
"content": doc.content, # 可能几千字
"score": doc.score
}
for doc in docs
]
}
5 篇文档,每篇 2000 字,直接往 context 里塞进去 10000 token。多几轮工具调用,context 就爆了。
解法:工具只返回摘要和引用 ID,需要全文时再按需获取。
# 分层返回:先给摘要,按需给全文
def search_documents(query: str, top_k: int = 5, include_full_content: bool = False) -> dict:
docs = vector_db.search(query, top_k=top_k)
results = []
for doc in docs:
item = {
"doc_id": doc.id,
"title": doc.title,
"score": round(doc.score, 3),
# 只返回前 300 字的摘要
"snippet": doc.content[:300] + ("..." if len(doc.content) > 300 else ""),
}
if include_full_content:
item["content"] = doc.content
results.append(item)
return {
"results": results,
"total": len(results),
"hint": "Use get_document(doc_id) to fetch full content of a specific document"
}
def get_document(doc_id: str) -> dict:
"""按需获取单篇文档全文"""
doc = vector_db.get_by_id(doc_id)
if not doc:
return {"error": f"Document {doc_id} not found"}
return {
"doc_id": doc.id,
"title": doc.title,
"content": doc.content,
"metadata": doc.metadata
}
工具描述里明确告诉 LLM 这两个工具的配合使用方式,它会自己判断什么时候需要拿全文。
另一个常见的压缩技巧:对于列表类工具,返回结果时做截断,并附上分页信息:
def list_orders(user_id: str, status: str | None = None, page: int = 1) -> dict:
PAGE_SIZE = 10 # 每次最多返回 10 条
orders = db.query_orders(user_id, status, offset=(page-1)*PAGE_SIZE, limit=PAGE_SIZE+1)
has_more = len(orders) > PAGE_SIZE
if has_more:
orders = orders[:PAGE_SIZE]
return {
"orders": [order.to_summary_dict() for order in orders], # summary 不含详情字段
"page": page,
"has_more": has_more,
"total_hint": "Use page parameter to get more results"
}
陷阱五:工具描述写得含糊,LLM 选错工具
这个问题不是代码 bug,而是工程设计问题。工具描述写得不够清晰,LLM 就会选错工具,或者用错参数。
我们有两个工具,最初描述是这样的:
tools = [
{
"name": "search_products",
"description": "搜索产品", # 太简单
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
}
}
},
{
"name": "get_product_details",
"description": "获取产品详情", # 太简单
"parameters": {
"type": "object",
"properties": {
"product_id": {"type": "string"}
}
}
}
]
LLM 经常混用这两个工具——明明有 product_id,还是会去调 search_products;明明要搜索,却把用户问题当成 product_id 传给 get_product_details。
解法:工具描述要写清楚触发条件、不要用的场景、参数格式要求。
tools = [
{
"name": "search_products",
"description": (
"根据自然语言查询搜索产品目录。"
"用于:用户描述了产品需求但没有具体产品 ID 时。"
"不要用于:已知产品 ID 时(改用 get_product_details)。"
"返回:产品列表,每项含 product_id、名称、简介和价格。"
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "用户的产品需求描述,例如:'蓝牙降噪耳机 500元以内'"
},
"max_results": {
"type": "integer",
"description": "返回结果数量,默认 5,最大 20",
"default": 5
}
},
"required": ["query"]
}
},
{
"name": "get_product_details",
"description": (
"通过产品 ID 获取单个产品的完整信息,包括规格、库存、用户评价。"
"用于:已知确切产品 ID,需要获取详细信息时。"
"不要用于:搜索或浏览产品(改用 search_products)。"
"产品 ID 格式:'PROD-' 开头的字母数字串,如 'PROD-ABC123'。"
),
"parameters": {
"type": "object",
"properties": {
"product_id": {
"type": "string",
"description": "产品唯一 ID,格式为 PROD-XXXXXXX",
"pattern": "^PROD-[A-Z0-9]{6,10}$"
}
},
"required": ["product_id"]
}
}
]
几个提升工具选择准确率的技巧:
- 用"用于/不要用于"格式:明确说明边界,比"可以用来"更清晰
- 描述输出内容:LLM 在选工具时会考虑哪个工具能提供它需要的信息
- 指定参数格式:用
pattern或者在 description 里举例,减少参数格式错误 - 工具数量控制:一次性给 LLM 超过 20 个工具,选择准确率明显下降;优先按需注入工具,而不是全量暴露
陷阱六:工具错误处理不一致,Agent 不知道该如何继续
工具调用失败时,返回什么给 LLM 很关键。我见过三种典型的错误返回方式:
# 方式 A:抛异常(最差)
def get_user(user_id: str) -> dict:
user = db.find_user(user_id)
if not user:
raise ValueError(f"User {user_id} not found") # 框架层会把异常转成字符串返回 LLM,信息混乱
return user
# 方式 B:返回空/None(也不好)
def get_user(user_id: str) -> dict | None:
user = db.find_user(user_id)
return user # None 序列化成 "null",LLM 不知道是"未找到"还是"系统错误"
# 方式 C:结构化错误(正确)
def get_user(user_id: str) -> dict:
try:
user = db.find_user(user_id)
if not user:
return {
"success": False,
"error_code": "USER_NOT_FOUND",
"error_message": f"用户 {user_id} 不存在",
"suggestion": "请确认用户 ID 是否正确,或使用 search_users 工具按姓名搜索"
}
return {
"success": True,
"data": user.to_dict()
}
except DatabaseConnectionError as e:
return {
"success": False,
"error_code": "DB_CONNECTION_ERROR",
"error_message": "数据库暂时不可用,请稍后重试",
"retryable": True,
"detail": str(e) if DEBUG else None
}
except Exception as e:
logger.exception(f"Unexpected error in get_user({user_id})")
return {
"success": False,
"error_code": "INTERNAL_ERROR",
"error_message": "系统内部错误",
"retryable": False
}
结构化错误返回有几个要素:
success字段:LLM 能快速判断是否成功,不需要解析错误文本error_code:机器可读的错误分类error_message:人类可读的错误描述(中文友好)suggestion:告诉 LLM 下一步怎么做(这很重要——等于给 Agent 内嵌了错误恢复提示)retryable:标明是否可以重试
有了 retryable 字段,Agent 框架就可以做自动重试决策:
async def execute_tool_with_retry(
tool_name: str,
tool_args: dict,
max_retries: int = 2,
retry_delay: float = 1.0
) -> dict:
last_result = None
for attempt in range(max_retries + 1):
result = await execute_tool(tool_name, tool_args)
# 成功直接返回
if result.get("success", True):
return result
# 不可重试的错误直接返回
if not result.get("retryable", False):
return result
# 可重试的错误,等待后重试
if attempt < max_retries:
await asyncio.sleep(retry_delay * (2 ** attempt)) # 指数退避
logger.info(f"Retrying {tool_name} (attempt {attempt + 2}/{max_retries + 1})")
last_result = result
return last_result
一个完整的工具执行器示例
把上面几个机制整合在一起,一个生产级的工具执行器大概长这样:
import asyncio
import hashlib
import json
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
class ProductionToolExecutor:
def __init__(
self,
tools: dict,
dedup_ttl: int = 300,
default_timeout: float = 15.0,
max_retries: int = 2,
):
self.tools = tools
self.dedup_cache: dict[str, tuple[float, Any]] = {}
self.dedup_ttl = dedup_ttl
self.default_timeout = default_timeout
self.max_retries = max_retries
async def execute(self, tool_name: str, tool_args: dict) -> dict:
"""执行工具调用,带幂等性、超时、重试、错误处理"""
if tool_name not in self.tools:
return {
"success": False,
"error_code": "TOOL_NOT_FOUND",
"error_message": f"工具 {tool_name} 不存在"
}
tool_fn = self.tools[tool_name]
tool_config = getattr(tool_fn, "_config", {})
is_write_op = tool_config.get("is_write", False)
timeout = tool_config.get("timeout", self.default_timeout)
max_retries = tool_config.get("max_retries", self.max_retries if not is_write_op else 0)
# 幂等性检查(仅写操作)
if is_write_op:
dedup_key = self._make_dedup_key(tool_name, tool_args)
cached = self._get_cached(dedup_key)
if cached is not None:
logger.info(f"Returning cached result for {tool_name} (idempotent)")
return {**cached, "idempotent": True}
# 执行(带超时和重试)
last_result = None
for attempt in range(max_retries + 1):
try:
result = await asyncio.wait_for(
self._call_tool(tool_fn, tool_args),
timeout=timeout
)
except asyncio.TimeoutError:
result = {
"success": False,
"error_code": "TIMEOUT",
"error_message": f"工具执行超时({timeout}s)",
"retryable": False # 超时不重试,避免雪崩
}
last_result = result
if result.get("success", True):
break # 成功,跳出重试
if not result.get("retryable", False):
break # 不可重试
if attempt < max_retries:
delay = 1.0 * (2 ** attempt)
logger.info(f"Retry {tool_name} in {delay}s (attempt {attempt + 2})")
await asyncio.sleep(delay)
# 写操作成功后记录幂等缓存
if is_write_op and last_result and last_result.get("success", True):
dedup_key = self._make_dedup_key(tool_name, tool_args)
self._set_cached(dedup_key, last_result)
return last_result
async def _call_tool(self, tool_fn, args: dict) -> dict:
if asyncio.iscoroutinefunction(tool_fn):
return await tool_fn(**args)
else:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: tool_fn(**args))
def _make_dedup_key(self, tool_name: str, args: dict) -> str:
payload = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
return hashlib.md5(payload.encode()).hexdigest()
def _get_cached(self, key: str) -> dict | None:
if key in self.dedup_cache:
ts, val = self.dedup_cache[key]
if time.time() - ts < self.dedup_ttl:
return val
del self.dedup_cache[key]
return None
def _set_cached(self, key: str, value: dict):
self.dedup_cache[key] = (time.time(), value)
# 工具注册方式
def tool_config(is_write: bool = False, timeout: float = 15.0, max_retries: int = 2):
"""工具配置装饰器"""
def decorator(fn):
fn._config = {"is_write": is_write, "timeout": timeout, "max_retries": max_retries}
return fn
return decorator
@tool_config(is_write=True, timeout=10.0, max_retries=0)
async def create_order(user_id: str, product_id: str, quantity: int) -> dict:
# 实际的订单创建逻辑
...
@tool_config(is_write=False, timeout=5.0, max_retries=2)
async def search_products(query: str, max_results: int = 5) -> dict:
# 实际的搜索逻辑
...
总结:Tool Calling 工程检查清单
| 问题域 | 检查项 |
|---|---|
| 幂等性 | 写操作是否有幂等性保护? |
| 超时 | 每个工具是否有单独超时?整个 Agent run 是否有全局超时? |
| 并发 | 并发工具调用是否有资源隔离? |
| Context 大小 | 工具返回值是否做了截断和分层? |
| 工具描述 | 每个工具是否有清晰的触发条件和禁止场景? |
| 错误处理 | 是否返回结构化错误?是否有 retryable 标志? |
这些不是什么神秘的技术,但每一条都是生产环境出过问题之后才补上的。
Agent 系统的可靠性不是靠更好的 prompt 换来的,是靠工程设施一点一点堆出来的。LLM 负责推理,工程层负责可靠性——这两件事要分得很清楚。
如果你在 Agent 工程上踩过其他坑,欢迎在评论区分享。
更多推荐


所有评论(0)