AI Agent 的 Tool Calling 工程陷阱:从幂等性到失败重试的 6 个生产踩坑

我们的 AI 客服 Agent 上线三周后,有一天收到用户投诉,说同一张工单被创建了三次。查日志才发现:Agent 调用 create_ticket 工具时,因为网络抖动触发了重试,结果三次调用全部成功,数据库里躺着三条一模一样的记录。

这是我遇到的第一个 Tool Calling 生产 bug,但不是最后一个。

过去一年多,我在两个 AI Agent 项目里踩了很多坑:幂等性、超时、并发、返回值解析、工具选择混乱……每一个坑单独看不起眼,但放在生产环境里,加上用户量和并发,都能变成故障。

这篇文章把 6 个最常见的 Tool Calling 工程陷阱整理出来,每个坑都附实际代码,以及我们最终用了什么方式绕开它。


陷阱一:写操作工具不幂等,重试就出问题

这是最经典的坑,也是最容易被忽视的。

Tool Calling 的调用链里有好几个地方可能触发重试:

  • LLM API 超时后 SDK 自动重试
  • Agent 框架检测到工具返回错误后主动重试
  • 网络层的透明重传

大部分读操作(search_webget_order_status)天然幂等,重试无害。但写操作不一样:

# 危险的写操作工具
def create_support_ticket(user_id: str, issue: str, priority: str) -> dict:
    ticket_id = db.insert("tickets", {
        "user_id": user_id,
        "issue": issue,
        "priority": priority,
        "created_at": datetime.now()
    })
    return {"ticket_id": ticket_id, "status": "created"}

每次调用都会在数据库插入一条新记录,没有任何去重逻辑。如果 Agent 重试,用户就会看到重复工单。

解法:给每次工具调用生成一个 idempotency key,写入端校验并去重。

import hashlib
import json

def create_support_ticket(
    user_id: str,
    issue: str,
    priority: str,
    idempotency_key: str | None = None
) -> dict:
    # 如果没传 key,基于参数生成一个确定性 key
    if not idempotency_key:
        payload = json.dumps(
            {"user_id": user_id, "issue": issue, "priority": priority},
            sort_keys=True
        )
        idempotency_key = hashlib.sha256(payload.encode()).hexdigest()[:16]

    # 先查是否已经执行过
    existing = db.query_one(
        "SELECT ticket_id FROM tickets WHERE idempotency_key = %s",
        [idempotency_key]
    )
    if existing:
        return {
            "ticket_id": existing["ticket_id"],
            "status": "already_created",
            "idempotent": True
        }

    # 插入时带上 idempotency_key
    ticket_id = db.insert("tickets", {
        "user_id": user_id,
        "issue": issue,
        "priority": priority,
        "idempotency_key": idempotency_key,
        "created_at": datetime.now()
    })
    return {"ticket_id": ticket_id, "status": "created", "idempotent": False}

在工具定义里把 idempotency_key 作为可选参数暴露出去,Agent 调用时由框架层注入(或者在工具描述里说明 LLM 可以自己传一个唯一标识符)。

另一个方案是在 Agent 框架层做调用去重:

class ToolCallDeduplicator:
    def __init__(self, ttl_seconds: int = 300):
        self.cache = {}  # 生产中换成 Redis
        self.ttl = ttl_seconds

    def is_duplicate(self, tool_name: str, args: dict) -> bool:
        key = self._make_key(tool_name, args)
        now = time.time()
        if key in self.cache:
            ts, _ = self.cache[key]
            if now - ts < self.ttl:
                return True
        return False

    def record(self, tool_name: str, args: dict, result: dict):
        key = self._make_key(tool_name, args)
        self.cache[key] = (time.time(), result)

    def get_cached_result(self, tool_name: str, args: dict) -> dict | None:
        key = self._make_key(tool_name, args)
        if key in self.cache:
            _, result = self.cache[key]
            return result
        return None

    def _make_key(self, tool_name: str, args: dict) -> str:
        payload = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
        return hashlib.md5(payload.encode()).hexdigest()

陷阱二:工具超时没有硬限制,Agent 卡住等死

我们有一个工具叫 run_sql_query,允许 Agent 对数据库执行只读查询。某次 Agent 生成了一个没有 LIMIT 的查询:

SELECT * FROM user_events WHERE event_type = 'click';

这张表有两千万行。查询跑了 40 秒,占满了数据库连接,把整个 Agent 服务拖慢了。

工具超时在 Agent 场景里有两层:

  1. 网络/IO 超时:工具调用的外部服务响应时间
  2. 整体任务超时:整个 Agent run 不能无限跑下去

两层超时都要设:

import asyncio
from functools import wraps

def with_timeout(seconds: float, error_message: str = "Tool execution timed out"):
    """给工具调用加超时装饰器"""
    def decorator(func):
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
            except asyncio.TimeoutError:
                return {
                    "error": error_message,
                    "timeout_seconds": seconds,
                    "status": "timeout"
                }
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            import signal
            def handler(signum, frame):
                raise TimeoutError(error_message)
            signal.signal(signal.SIGALRM, handler)
            signal.alarm(int(seconds))
            try:
                result = func(*args, **kwargs)
                signal.alarm(0)
                return result
            except TimeoutError:
                return {
                    "error": error_message,
                    "timeout_seconds": seconds,
                    "status": "timeout"
                }
            finally:
                signal.alarm(0)
        
        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
    return decorator


# 用法
@with_timeout(seconds=10, error_message="数据库查询超时,请优化查询条件")
async def run_sql_query(query: str, params: list | None = None) -> dict:
    # 额外加 statement_timeout 防止数据库端慢查询
    async with db.acquire() as conn:
        await conn.execute("SET statement_timeout = '8s'")
        rows = await conn.fetch(query, *(params or []))
        return {"rows": [dict(r) for r in rows], "count": len(rows)}

对于整个 Agent run,要在 orchestrator 层加一个全局超时:

class AgentOrchestrator:
    def __init__(self, max_run_seconds: int = 120, max_tool_calls: int = 30):
        self.max_run_seconds = max_run_seconds
        self.max_tool_calls = max_tool_calls

    async def run(self, user_message: str) -> str:
        start_time = time.time()
        tool_call_count = 0
        messages = [{"role": "user", "content": user_message}]

        while True:
            # 全局超时检查
            elapsed = time.time() - start_time
            if elapsed > self.max_run_seconds:
                return f"任务超时(已运行 {elapsed:.0f}s),请拆解成更小的子任务"

            # 工具调用次数上限
            if tool_call_count >= self.max_tool_calls:
                return f"工具调用次数已达上限({self.max_tool_calls}次),任务终止"

            # 调用 LLM
            response = await llm.chat(messages)
            
            if response.stop_reason == "end_turn":
                return response.content
            
            # 执行工具调用
            for tool_call in response.tool_calls:
                tool_call_count += 1
                result = await self.execute_tool(tool_call)
                messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": str(result)})

陷阱三:并发工具调用没有资源隔离,互相干扰

现代 LLM(GPT-4o、Claude 3.5 Sonnet)在一次响应里可以返回多个工具调用,框架通常会并发执行。

这本来是好事——并发执行多个独立工具比串行快得多。

问题出在"独立"这个假设上。如果两个工具调用都在操作同一个资源,没有并发控制,就会出现竞态条件:

# Agent 同时触发了两个工具调用:
# 1. update_user_plan(user_id="123", plan="pro")
# 2. apply_promo_code(user_id="123", code="SUMMER30")
# 两个都在修改用户账户,没有锁保护

async def parallel_tool_execution(tool_calls: list) -> list:
    # 简单的并发执行——危险!
    tasks = [execute_tool(tc) for tc in tool_calls]
    return await asyncio.gather(*tasks)

解法:按资源维度分组,组内串行,组间并行。

from collections import defaultdict

async def safe_parallel_tool_execution(tool_calls: list) -> list:
    """
    对工具调用按资源分组,同一资源的操作串行执行,
    不同资源的操作并行执行
    """
    # 每个工具需要声明自己的 resource_key
    # 例如: create_ticket -> None (无状态)
    #       update_user  -> f"user:{user_id}"
    
    groups = defaultdict(list)
    orderless = []  # 无资源竞争的工具
    
    for i, tc in enumerate(tool_calls):
        resource_key = get_tool_resource_key(tc)
        if resource_key is None:
            orderless.append((i, tc))
        else:
            groups[resource_key].append((i, tc))
    
    results = [None] * len(tool_calls)
    
    async def run_group(tool_call_group):
        for idx, tc in tool_call_group:
            results[idx] = await execute_tool(tc)
    
    # 无竞争的工具直接并发
    tasks = [execute_tool(tc) for i, tc in orderless]
    orderless_results = await asyncio.gather(*tasks)
    for (i, _), r in zip(orderless, orderless_results):
        results[i] = r
    
    # 有资源竞争的组串行执行,组间并发
    group_tasks = [run_group(g) for g in groups.values()]
    await asyncio.gather(*group_tasks)
    
    return results


def get_tool_resource_key(tool_call) -> str | None:
    """根据工具名和参数推断资源 key"""
    resource_tools = {
        "update_user": lambda args: f"user:{args.get('user_id')}",
        "apply_promo_code": lambda args: f"user:{args.get('user_id')}",
        "update_order": lambda args: f"order:{args.get('order_id')}",
    }
    tool_name = tool_call.function.name
    if tool_name in resource_tools:
        args = json.loads(tool_call.function.arguments)
        return resource_tools[tool_name](args)
    return None  # 无状态工具

陷阱四:工具返回值太大,把 context 撑爆

Agent 让工具返回数据,LLM 消化数据后继续推理。这个模式很自然,但有个隐患:工具返回的内容直接进 context,很快就把 token 限制撑满。

我们有一个 search_documents 工具,初版直接返回搜索到的全文:

# 危险版本:返回完整文档内容
def search_documents(query: str, top_k: int = 5) -> dict:
    docs = vector_db.search(query, top_k=top_k)
    return {
        "results": [
            {
                "title": doc.title,
                "content": doc.content,  # 可能几千字
                "score": doc.score
            }
            for doc in docs
        ]
    }

5 篇文档,每篇 2000 字,直接往 context 里塞进去 10000 token。多几轮工具调用,context 就爆了。

解法:工具只返回摘要和引用 ID,需要全文时再按需获取。

# 分层返回:先给摘要,按需给全文
def search_documents(query: str, top_k: int = 5, include_full_content: bool = False) -> dict:
    docs = vector_db.search(query, top_k=top_k)
    results = []
    for doc in docs:
        item = {
            "doc_id": doc.id,
            "title": doc.title,
            "score": round(doc.score, 3),
            # 只返回前 300 字的摘要
            "snippet": doc.content[:300] + ("..." if len(doc.content) > 300 else ""),
        }
        if include_full_content:
            item["content"] = doc.content
        results.append(item)
    
    return {
        "results": results,
        "total": len(results),
        "hint": "Use get_document(doc_id) to fetch full content of a specific document"
    }


def get_document(doc_id: str) -> dict:
    """按需获取单篇文档全文"""
    doc = vector_db.get_by_id(doc_id)
    if not doc:
        return {"error": f"Document {doc_id} not found"}
    return {
        "doc_id": doc.id,
        "title": doc.title,
        "content": doc.content,
        "metadata": doc.metadata
    }

工具描述里明确告诉 LLM 这两个工具的配合使用方式,它会自己判断什么时候需要拿全文。

另一个常见的压缩技巧:对于列表类工具,返回结果时做截断,并附上分页信息:

def list_orders(user_id: str, status: str | None = None, page: int = 1) -> dict:
    PAGE_SIZE = 10  # 每次最多返回 10 条
    orders = db.query_orders(user_id, status, offset=(page-1)*PAGE_SIZE, limit=PAGE_SIZE+1)
    
    has_more = len(orders) > PAGE_SIZE
    if has_more:
        orders = orders[:PAGE_SIZE]
    
    return {
        "orders": [order.to_summary_dict() for order in orders],  # summary 不含详情字段
        "page": page,
        "has_more": has_more,
        "total_hint": "Use page parameter to get more results"
    }

陷阱五:工具描述写得含糊,LLM 选错工具

这个问题不是代码 bug,而是工程设计问题。工具描述写得不够清晰,LLM 就会选错工具,或者用错参数。

我们有两个工具,最初描述是这样的:

tools = [
    {
        "name": "search_products",
        "description": "搜索产品",  # 太简单
        "parameters": {
            "type": "object",
            "properties": {
                "query": {"type": "string"}
            }
        }
    },
    {
        "name": "get_product_details",
        "description": "获取产品详情",  # 太简单
        "parameters": {
            "type": "object",
            "properties": {
                "product_id": {"type": "string"}
            }
        }
    }
]

LLM 经常混用这两个工具——明明有 product_id,还是会去调 search_products;明明要搜索,却把用户问题当成 product_id 传给 get_product_details

解法:工具描述要写清楚触发条件、不要用的场景、参数格式要求。

tools = [
    {
        "name": "search_products",
        "description": (
            "根据自然语言查询搜索产品目录。"
            "用于:用户描述了产品需求但没有具体产品 ID 时。"
            "不要用于:已知产品 ID 时(改用 get_product_details)。"
            "返回:产品列表,每项含 product_id、名称、简介和价格。"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "用户的产品需求描述,例如:'蓝牙降噪耳机 500元以内'"
                },
                "max_results": {
                    "type": "integer",
                    "description": "返回结果数量,默认 5,最大 20",
                    "default": 5
                }
            },
            "required": ["query"]
        }
    },
    {
        "name": "get_product_details",
        "description": (
            "通过产品 ID 获取单个产品的完整信息,包括规格、库存、用户评价。"
            "用于:已知确切产品 ID,需要获取详细信息时。"
            "不要用于:搜索或浏览产品(改用 search_products)。"
            "产品 ID 格式:'PROD-' 开头的字母数字串,如 'PROD-ABC123'。"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "product_id": {
                    "type": "string",
                    "description": "产品唯一 ID,格式为 PROD-XXXXXXX",
                    "pattern": "^PROD-[A-Z0-9]{6,10}$"
                }
            },
            "required": ["product_id"]
        }
    }
]

几个提升工具选择准确率的技巧:

  1. 用"用于/不要用于"格式:明确说明边界,比"可以用来"更清晰
  2. 描述输出内容:LLM 在选工具时会考虑哪个工具能提供它需要的信息
  3. 指定参数格式:用 pattern 或者在 description 里举例,减少参数格式错误
  4. 工具数量控制:一次性给 LLM 超过 20 个工具,选择准确率明显下降;优先按需注入工具,而不是全量暴露

陷阱六:工具错误处理不一致,Agent 不知道该如何继续

工具调用失败时,返回什么给 LLM 很关键。我见过三种典型的错误返回方式:

# 方式 A:抛异常(最差)
def get_user(user_id: str) -> dict:
    user = db.find_user(user_id)
    if not user:
        raise ValueError(f"User {user_id} not found")  # 框架层会把异常转成字符串返回 LLM,信息混乱
    return user

# 方式 B:返回空/None(也不好)
def get_user(user_id: str) -> dict | None:
    user = db.find_user(user_id)
    return user  # None 序列化成 "null",LLM 不知道是"未找到"还是"系统错误"

# 方式 C:结构化错误(正确)
def get_user(user_id: str) -> dict:
    try:
        user = db.find_user(user_id)
        if not user:
            return {
                "success": False,
                "error_code": "USER_NOT_FOUND",
                "error_message": f"用户 {user_id} 不存在",
                "suggestion": "请确认用户 ID 是否正确,或使用 search_users 工具按姓名搜索"
            }
        return {
            "success": True,
            "data": user.to_dict()
        }
    except DatabaseConnectionError as e:
        return {
            "success": False,
            "error_code": "DB_CONNECTION_ERROR",
            "error_message": "数据库暂时不可用,请稍后重试",
            "retryable": True,
            "detail": str(e) if DEBUG else None
        }
    except Exception as e:
        logger.exception(f"Unexpected error in get_user({user_id})")
        return {
            "success": False,
            "error_code": "INTERNAL_ERROR",
            "error_message": "系统内部错误",
            "retryable": False
        }

结构化错误返回有几个要素:

  • success 字段:LLM 能快速判断是否成功,不需要解析错误文本
  • error_code:机器可读的错误分类
  • error_message:人类可读的错误描述(中文友好)
  • suggestion:告诉 LLM 下一步怎么做(这很重要——等于给 Agent 内嵌了错误恢复提示)
  • retryable:标明是否可以重试

有了 retryable 字段,Agent 框架就可以做自动重试决策:

async def execute_tool_with_retry(
    tool_name: str,
    tool_args: dict,
    max_retries: int = 2,
    retry_delay: float = 1.0
) -> dict:
    last_result = None
    for attempt in range(max_retries + 1):
        result = await execute_tool(tool_name, tool_args)
        
        # 成功直接返回
        if result.get("success", True):
            return result
        
        # 不可重试的错误直接返回
        if not result.get("retryable", False):
            return result
        
        # 可重试的错误,等待后重试
        if attempt < max_retries:
            await asyncio.sleep(retry_delay * (2 ** attempt))  # 指数退避
            logger.info(f"Retrying {tool_name} (attempt {attempt + 2}/{max_retries + 1})")
        
        last_result = result
    
    return last_result

一个完整的工具执行器示例

把上面几个机制整合在一起,一个生产级的工具执行器大概长这样:

import asyncio
import hashlib
import json
import logging
import time
from typing import Any

logger = logging.getLogger(__name__)

class ProductionToolExecutor:
    def __init__(
        self,
        tools: dict,
        dedup_ttl: int = 300,
        default_timeout: float = 15.0,
        max_retries: int = 2,
    ):
        self.tools = tools
        self.dedup_cache: dict[str, tuple[float, Any]] = {}
        self.dedup_ttl = dedup_ttl
        self.default_timeout = default_timeout
        self.max_retries = max_retries

    async def execute(self, tool_name: str, tool_args: dict) -> dict:
        """执行工具调用,带幂等性、超时、重试、错误处理"""
        if tool_name not in self.tools:
            return {
                "success": False,
                "error_code": "TOOL_NOT_FOUND",
                "error_message": f"工具 {tool_name} 不存在"
            }

        tool_fn = self.tools[tool_name]
        tool_config = getattr(tool_fn, "_config", {})
        is_write_op = tool_config.get("is_write", False)
        timeout = tool_config.get("timeout", self.default_timeout)
        max_retries = tool_config.get("max_retries", self.max_retries if not is_write_op else 0)

        # 幂等性检查(仅写操作)
        if is_write_op:
            dedup_key = self._make_dedup_key(tool_name, tool_args)
            cached = self._get_cached(dedup_key)
            if cached is not None:
                logger.info(f"Returning cached result for {tool_name} (idempotent)")
                return {**cached, "idempotent": True}

        # 执行(带超时和重试)
        last_result = None
        for attempt in range(max_retries + 1):
            try:
                result = await asyncio.wait_for(
                    self._call_tool(tool_fn, tool_args),
                    timeout=timeout
                )
            except asyncio.TimeoutError:
                result = {
                    "success": False,
                    "error_code": "TIMEOUT",
                    "error_message": f"工具执行超时({timeout}s)",
                    "retryable": False  # 超时不重试,避免雪崩
                }

            last_result = result

            if result.get("success", True):
                break  # 成功,跳出重试
            if not result.get("retryable", False):
                break  # 不可重试
            if attempt < max_retries:
                delay = 1.0 * (2 ** attempt)
                logger.info(f"Retry {tool_name} in {delay}s (attempt {attempt + 2})")
                await asyncio.sleep(delay)

        # 写操作成功后记录幂等缓存
        if is_write_op and last_result and last_result.get("success", True):
            dedup_key = self._make_dedup_key(tool_name, tool_args)
            self._set_cached(dedup_key, last_result)

        return last_result

    async def _call_tool(self, tool_fn, args: dict) -> dict:
        if asyncio.iscoroutinefunction(tool_fn):
            return await tool_fn(**args)
        else:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(None, lambda: tool_fn(**args))

    def _make_dedup_key(self, tool_name: str, args: dict) -> str:
        payload = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
        return hashlib.md5(payload.encode()).hexdigest()

    def _get_cached(self, key: str) -> dict | None:
        if key in self.dedup_cache:
            ts, val = self.dedup_cache[key]
            if time.time() - ts < self.dedup_ttl:
                return val
            del self.dedup_cache[key]
        return None

    def _set_cached(self, key: str, value: dict):
        self.dedup_cache[key] = (time.time(), value)


# 工具注册方式
def tool_config(is_write: bool = False, timeout: float = 15.0, max_retries: int = 2):
    """工具配置装饰器"""
    def decorator(fn):
        fn._config = {"is_write": is_write, "timeout": timeout, "max_retries": max_retries}
        return fn
    return decorator


@tool_config(is_write=True, timeout=10.0, max_retries=0)
async def create_order(user_id: str, product_id: str, quantity: int) -> dict:
    # 实际的订单创建逻辑
    ...

@tool_config(is_write=False, timeout=5.0, max_retries=2)
async def search_products(query: str, max_results: int = 5) -> dict:
    # 实际的搜索逻辑
    ...

总结:Tool Calling 工程检查清单

问题域 检查项
幂等性 写操作是否有幂等性保护?
超时 每个工具是否有单独超时?整个 Agent run 是否有全局超时?
并发 并发工具调用是否有资源隔离?
Context 大小 工具返回值是否做了截断和分层?
工具描述 每个工具是否有清晰的触发条件和禁止场景?
错误处理 是否返回结构化错误?是否有 retryable 标志?

这些不是什么神秘的技术,但每一条都是生产环境出过问题之后才补上的。

Agent 系统的可靠性不是靠更好的 prompt 换来的,是靠工程设施一点一点堆出来的。LLM 负责推理,工程层负责可靠性——这两件事要分得很清楚。


如果你在 Agent 工程上踩过其他坑,欢迎在评论区分享。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐