一、引言:一次让我凌晨三点爬起来的事故

凌晨两点四十七分,手机疯狂震动。PagerDuty 告警:生产环境的 Agent 系统成功率从 92% 跌到了 31%。

我穿着拖鞋冲进办公室,打开 Kibana 日志面板。3 万个用户请求中,有超过 2 万个因为"tool_call_execution_error"超时失败。用户投诉如潮水般涌入:“机器人在无限循环”“一直说请稍等”“客服机器人疯了”。

这不是模型能力的问题。这是工程问题。

追溯根因:一个上游 API 返回了 8MB 的非结构化 JSON,直接塞进了 Agent 的 tool call 结果。LLM 尝试处理这个庞然大物,上下文塞爆,开始胡言乱语。随后重试策略没有退避,同一个请求触发了 15 次重复调用,击穿了下游服务限流,最终整个系统雪崩。

那一夜之后,我带着团队花了三个月时间,系统性地重构了 Tool Calling 的整个工程链路。本文将还原这段经历,分享我们在 LLM Agent Tool Calling 场景中踩过的坑、提炼的实践和最终的工程方案。

二、背景:Tool Calling 远比你想象的脆弱

简单回顾一下 Tool Calling 的工作机制。LLM 在生成回复时,不是直接调用外部函数,而是由模型"声明"它想调用哪个工具以及传入什么参数,由工程层去实际执行调用,然后把结果返回给模型做下一步推理。

用户请求 → LLM 推理 → 输出 tool_call
         → 工程层解析参数 → 执行外部 API
         → 返回结果给 LLM → LLM 生成最终回答

听起来简单?在实际生产环境中,每一个箭头都可能断裂。

我们的系统管理着超过 120 个工具函数,涵盖订单查询、库存检查、物流跟踪、客服转接、优惠计算等业务场景。每个工具都有不同的参数、不同的超时限制、不同的返回数据量。维护这些工具描述,并与模型保持一致,本身就是一个工程难题。

三、核心工程挑战(掉过的坑)

挑战 1:工具描述质量——“模型看不懂你写的 API”

现象: 模型频繁传错参数,或者调用错误的工具。

我最初认为这很荒谬——API 文档明明写得很清楚。直到我们统计了 1000 次 tool call 数据:

工具描述类型 参数正确率 工具选择正确率 平均推理延迟
一句话描述 47% 62% 1.2s
带参数类型说明 68% 78% 1.4s
含业务上下文 83% 91% 1.5s
含完整示例 + 边界条件 91% 96% 1.7s

教训: 模型不读文档,它理解自然语言。让工具描述像业务文档一样写。

踩坑代码——不好的实践:

# 反模式:工具描述过于简略
tools = [
    {
        "type": "function",
        "function": {
            "name": "search_orders",
            "description": "搜索订单",
            "parameters": {
                "type": "object",
                "properties": {
                    "keyword": {"type": "string", "description": "关键词"},
                    "status": {"type": "string", "description": "订单状态"},
                    "page": {"type": "integer", "description": "页码"},
                },
                "required": ["keyword"]
            }
        }
    }
]

这个描述有什么问题?"关键词"不够精确——是订单号、商品名还是用户名?"订单状态"有哪些可选值?"页码"的默认值是多少?模型每次都在猜,而且猜不对。

正确实践——详细到让人反感的描述:

# 最佳实践:包含业务上下文、枚举值和默认值
tools = [
    {
        "type": "function",
        "function": {
            "name": "search_orders",
            "description": """按关键词搜索历史订单。关键词可以是:订单号(如 ORD-202405-0001)、商品名称、买家用户名。
如果用户说"最近订单"或没有明确关键词,使用 keyword="recent"。
分页默认返回第1页(page=1),每页20条。
如果搜索不到结果,不要编造,如实告知用户。""",
            "parameters": {
                "type": "object",
                "properties": {
                    "keyword": {
                        "type": "string",
                        "description": "搜索关键词(订单号/商品名/用户名)。当用户意图模糊时传'recent'"
                    },
                    "status": {
                        "type": "string",
                        "description": "筛选订单状态。可选值:pending(待付款), paid(已付款), shipped(已发货), delivered(已签收), cancelled(已取消), refunded(已退款)。不传则查询所有状态"
                    },
                    "page": {
                        "type": "integer",
                        "description": "页码,从1开始,默认1"
                    },
                    "page_size": {
                        "type": "integer",
                        "description": "每页数量,默认20,最大100"
                    }
                },
                "required": ["keyword"]
            }
        }
    }
]

这段描述看起来啰嗦,但实测准确率从 62% 提升到了 91%。模型不需要猜了,它直接"看到"了业务规则。

实验数据(我们内部 benchmark):

我们用同样的 500 个测试 query 测试了两种描述:

# 对比实验代码
import json
import time
from openai import OpenAI

client = OpenAI()

test_queries = [
    "帮我查一下昨天买的手机到哪了",
    "看看我有哪些待付款的订单",
    "订单号 ORD-202405-0088 现在什么状态",
    "最近一个月取消的订单有哪些",
    "搜索关键词洗衣机,看已签收的",
    # ... 共 500 条
]

def test_tool_descriptions(tools, queries):
    successes = 0
    for q in queries:
        try:
            resp = client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": q}],
                tools=tools,
                tool_choice="auto"
            )
            msg = resp.choices[0].message
            if msg.tool_calls:
                # 验证参数格式是否合法
                for tc in msg.tool_calls:
                    args = json.loads(tc.function.arguments)
                    # 简单的合法性检查
                    if isinstance(args.get("keyword"), str):
                        successes += 1
                        break
        except Exception as e:
            print(f"Error: {e}")
    return successes / len(queries)

# 简单描述
simple_tools = [...]  # 简略版
# 详细描述
detailed_tools = [...]  # 详细版

print(f"简单描述成功率: {test_tool_descriptions(simple_tools, test_queries):.1%}")
print(f"详细描述成功率: {test_tool_descriptions(detailed_tools, test_queries):.1%}")

挑战 2:并发工具调用的竞态问题

现象: 当 LLM 同时调用多个工具(并行 tool calls)时,部分工具的参数被互相覆盖,或结果张冠李戴。

有一次我们遇到一个诡异 bug:用户查"我的手机和耳机",模型正确识别出两个查询意图,同时调用了 search_product("手机")search_product("耳机")。但最终对话输出说"您的手机是:耳机 A,售价 299 元……"

原因是我们在后端用一个共享的 dict 缓存了 tool call 的中间状态,两个并发的 tool call 写入了同一个键,导致了数据交叉。

问题复现代码:

import asyncio
import json
from typing import Any

# 反模式:共享的可变状态
_shared_state: dict[str, Any] = {}

async def execute_tool_call(tool_name: str, arguments: dict, call_id: str) -> str:
    global _shared_state
    
    # 竞态!两个并发调用会互相覆盖
    _shared_state["current_tool"] = tool_name
    _shared_state["current_args"] = arguments
    _shared_state["current_call_id"] = call_id
    
    # 模拟外部 API 调用延迟
    await asyncio.sleep(0.1)
    
    result = await call_external_api(tool_name, arguments)
    
    # 问题:此时 _shared_state 可能已经被另一个协程覆盖
    log_to_tracing(f"Tool {_shared_state['current_tool']} returned", _shared_state["current_call_id"])
    
    return json.dumps({"result": result, "call_id": call_id})

# 模拟并发调用
async def main():
    tasks = [
        execute_tool_call("search_product", {"keyword": "手机"}, "call_001"),
        execute_tool_call("search_product", {"keyword": "耳机"}, "call_002"),
    ]
    results = await asyncio.gather(*tasks)
    for r in results:
        print(r)

# 输出可能长这样:
# {"result": "手机搜索结果...", "call_id": "call_001"}  ✅ 正确
# {"result": "耳机搜索结果...", "call_id": "call_002"}  ❌ 日志中 current_tool 是 "search_product"(但无法区分是哪个)

解决方案——无状态设计 + 上下文隔离:

import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional

@dataclass
class ToolCallContext:
    """每个 tool call 拥有独立的上下文"""
    call_id: str
    tool_name: str
    arguments: dict
    trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
    result: Optional[str] = None
    error: Optional[str] = None
    start_time: float = 0.0
    end_time: float = 0.0

class ToolExecutor:
    """无状态的工具执行器"""
    
    def __init__(self, api_timeout: float = 5.0):
        self._semaphore = asyncio.Semaphore(5)  # 限制最大并发数
        self._api_timeout = api_timeout
        self._pending_calls: dict[str, asyncio.Task] = {}
        
    async def execute(self, ctx: ToolCallContext) -> str:
        async with self._semaphore:
            ctx.start_time = time.monotonic()
            try:
                # 每个调用完全隔离
                executor = self._get_handler(ctx.tool_name)
                if executor is None:
                    raise ValueError(f"未知工具: {ctx.tool_name}")
                
                # 带超时的 API 调用
                result = await asyncio.wait_for(
                    executor(**ctx.arguments),
                    timeout=self._api_timeout
                )
                
                ctx.result = json.dumps(result, ensure_ascii=False)
                ctx.end_time = time.monotonic()
                self._log_completion(ctx)
                return ctx.result
                
            except asyncio.TimeoutError:
                ctx.error = f"timeout after {self._api_timeout}s"
                ctx.end_time = time.monotonic()
                self._log_error(ctx)
                return json.dumps({"error": "timeout", "tool": ctx.tool_name})
                
            except Exception as e:
                ctx.error = str(e)
                ctx.end_time = time.monotonic()
                self._log_error(ctx)
                return json.dumps({"error": str(e), "tool": ctx.tool_name})
    
    async def execute_parallel(self, call_contexts: list[ToolCallContext]) -> list[str]:
        """并行执行多个工具调用,彼此完全隔离"""
        tasks = [self.execute(ctx) for ctx in call_contexts]
        return await asyncio.gather(*tasks)
    
    def _get_handler(self, tool_name: str):
        """查找工具处理函数"""
        registry = {
            "search_product": self._handler_search_product,
            "get_order_status": self._handler_get_order_status,
        }
        return registry.get(tool_name)
    
    async def _handler_search_product(self, keyword: str, **kwargs) -> dict:
        # 模拟外部 API
        await asyncio.sleep(0.2)
        return {"keyword": keyword, "results": [f"商品A({keyword}版)", f"商品B({keyword}版)"]}
    
    async def _handler_get_order_status(self, order_id: str, **kwargs) -> dict:
        await asyncio.sleep(0.1)
        return {"order_id": order_id, "status": "shipped"}
    
    def _log_completion(self, ctx: ToolCallContext):
        duration = (ctx.end_time - ctx.start_time) * 1000
        print(f"[OK] {ctx.trace_id} | {ctx.tool_name}({json.dumps(ctx.arguments, ensure_ascii=False)}) | {duration:.0f}ms")
    
    def _log_error(self, ctx: ToolCallContext):
        duration = (ctx.end_time - ctx.start_time) * 1000
        print(f"[ERR] {ctx.trace_id} | {ctx.tool_name} | {duration:.0f}ms | {ctx.error}")


# 使用示例
async def demo():
    executor = ToolExecutor(api_timeout=3.0)
    contexts = [
        ToolCallContext(call_id="c1", tool_name="search_product", arguments={"keyword": "手机"}),
        ToolCallContext(call_id="c2", tool_name="search_product", arguments={"keyword": "耳机"}),
        ToolCallContext(call_id="c3", tool_name="get_order_status", arguments={"order_id": "ORD-001"}),
    ]
    results = await executor.execute_parallel(contexts)
    for ctx, result in zip(contexts, results):
        if ctx.error:
            print(f"  ❌ {ctx.tool_name}({ctx.arguments}): {ctx.error}")
        else:
            print(f"  ✅ {ctx.tool_name}({ctx.arguments}): {result[:60]}...")

# asyncio.run(demo())

挑战 3:工具返回内容过长导致的上下文溢出

现象: LLM 开始胡言乱语、遗漏信息、或者输出空的 tool call。

这就是开篇事故的根因。当工具返回大量数据时(比如一次拉取 500 条订单记录,或者一个日志搜索返回 2MB 的文本),这些数据全部塞进 assistant message 的 content 里,直接撑爆上下文窗口。

问题分析代码:

import json
from typing import Any

# 反模式:无裁剪返回
async def naive_execute(tool_name: str, args: dict) -> dict:
    raw_result = await call_api(tool_name, args)
    
    # 直接返回,完全不控制数据量
    return {
        "tool_call_id": args.get("call_id"),
        "result": raw_result  # 可能是 10MB 的数据
    }


# 经过实验,我们发现问题呈现明显的阈值效应:

def simulate_context_overflow(content_length: int) -> str:
    """模拟不同长度下模型的回复质量"""
    if content_length < 2000:
        return "回复准确,信息完整"
    elif content_length < 5000:
        return "回复基本正确,偶尔遗漏细节"
    elif content_length < 10000:
        return "回复质量下降明显,开始遗漏关键信息"
    elif content_length < 20000:
        return "经常遗漏核心内容,需要多次补充提问"
    else:
        return "胡言乱语,产生幻觉,或直接超时"

# 我们在 GPT-4o 上实测的阈值数据
# Context: 128K tokens
# 当 tool result 超过 ~8K tokens 时,开始出现显著质量下降
# 当 tool result 超过 ~20K tokens 时,几乎不可用

解决方案——智能结果压缩:

import json
import tiktoken
from typing import Any, Optional

class ToolResultCompressor:
    """智能压缩工具返回结果,保持信息密度最大化"""
    
    def __init__(self, max_tokens: int = 4000, model: str = "gpt-4o"):
        self.max_tokens = max_tokens
        self.encoder = tiktoken.encoding_for_model(model)
        
    def compress(self, raw_result: Any, tool_name: str) -> str:
        """根据工具类型选择合适的压缩策略"""
        
        # 检查原始大小
        serialized = json.dumps(raw_result, ensure_ascii=False, default=str)
        token_count = len(self.encoder.encode(serialized))
        
        if token_count <= self.max_tokens:
            return serialized
        
        # 需要压缩
        strategy = self._select_strategy(tool_name, raw_result)
        compressed = strategy(raw_result)
        
        final = json.dumps(compressed, ensure_ascii=False, default=str)
        final_tokens = len(self.encoder.encode(final))
        print(f"[压缩] {tool_name}: {token_count} tokens → {final_tokens} tokens ({(1 - final_tokens/token_count)*100:.0f}% 压缩率)")
        
        # 如果压缩后仍然超限,做截断
        if final_tokens > self.max_tokens:
            final = self._truncate(final)
            
        return final
    
    def _select_strategy(self, tool_name: str, data: Any):
        """为每种工具选择最优压缩策略"""
        strategies = {
            "search_orders": self._compress_list,
            "search_product": self._compress_product_list,
            "get_logs": self._compress_logs,
            "get_customer_info": self._compress_customer,
        }
        return strategies.get(tool_name, self._compress_generic)
    
    def _compress_list(self, data: list) -> list:
        """列表数据:保留关键字段,丢弃冗余信息"""
        if not isinstance(data, list):
            return data
        
        # 如果列表太长,采样并告诉模型总数
        if len(data) > 20:
            sampled = data[:15]  # 取前 15 条
            for item in sampled:
                if isinstance(item, dict):
                    # 只保留关键字段
                    item.pop("internal_note", None)
                    item.pop("raw_response", None)
                    item.pop("debug_info", None)
            
            return {
                "total_count": len(data),
                "showing": len(sampled),
                "sample": sampled,
                "_note": f"共 {len(data)} 条结果,这里仅展示 {len(sampled)} 条示例"
            }
        
        for item in data:
            if isinstance(item, dict):
                item.pop("internal_note", None)
                item.pop("raw_response", None)
        return data
    
    def _compress_product_list(self, data: list) -> list:
        if not isinstance(data, list):
            return data
        # 商品列表:保留标题、价格、库存等关键信息
        compressed = []
        for item in data[:20]:
            compressed.append({
                "id": item.get("id"),
                "name": item.get("name"),
                "price": item.get("price"),
                "stock": item.get("stock"),
                "rating": item.get("rating"),
            })
        
        return {
            "total": len(data),
            "items": compressed,
            "_note": f"共 {len(data)} 个商品,这里展示 {len(compressed)} 个关键信息"
        }
    
    def _compress_logs(self, data: list) -> str:
        """日志数据:聚合同类日志"""
        if not isinstance(data, list):
            return str(data)
        
        from collections import Counter
        levels = Counter()
        patterns = Counter()
        
        for log in data[:500]:  # 最多处理 500 条
            if isinstance(log, dict):
                levels[log.get("level", "INFO")] += 1
                msg = log.get("message", "")
                # 提取日志模式
                pattern = msg[:50] + "..." if len(msg) > 50 else msg
                patterns[pattern] += 1
            else:
                patterns[str(log)[:50]] += 1
        
        return json.dumps({
            "total_logs": len(data),
            "level_distribution": dict(levels),
            "common_patterns": dict(patterns.most_common(10)),
        }, ensure_ascii=False)
    
    def _compress_customer(self, data: dict) -> dict:
        """客户信息:按需筛选字段"""
        safe_fields = {"name", "phone", "email", "level", "total_orders", "register_date"}
        return {k: v for k, v in data.items() if k in safe_fields}
    
    def _compress_generic(self, data: Any) -> Any:
        if isinstance(data, dict):
            # 只保留前 15 个键
            keys = list(data.keys())[:15]
            return {k: data[k] for k in keys}
        if isinstance(data, list):
            return data[:30]
        return str(data)[:2000]
    
    def _truncate(self, text: str) -> str:
        tokens = self.encoder.encode(text)
        truncated = self.encoder.decode(tokens[:self.max_tokens - 100])  # 留一些余量
        return truncated + "\n\n[注意:结果已被截断,完整数据需要分页查询]"


# 使用示例
async def safe_execute_with_compress(executor, ctx, compressor):
    raw_result = await executor.execute(ctx)
    compressed = compressor.compress(
        json.loads(raw_result),
        ctx.tool_name
    )
    return compressed

挑战 4:错误处理和重试策略——不能简单重试

现象: 工具执行失败后,把错误简单返回给 LLM,导致模型陷入无限重试循环。

这是我们最惨痛的教训。初始的"重试"逻辑:

# 反模式:无限制重试
MAX_RETRIES = 5
attempt = 0
while attempt < MAX_RETRIES:
    try:
        result = await execute_tool(tool_name, args)
        return result
    except Exception as e:
        attempt += 1
        # 直接把错误丢给 LLM 处理
        return {"error": str(e), "retry_count": attempt}

问题是 LLM 没有足够的上下文理解为什么失败,经常重新发起同一个 tool call,参数一模一样,结果再次失败。我们见过一个 API 被同一个参数调用了 37 次的案例。

正确方案——分级重试策略:

import asyncio
import json
import time
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable

class ErrorCategory(Enum):
    TRANSIENT = "transient"    # 可重试的临时错误
    FATAL = "fatal"           # 不可恢复的错误
    RATE_LIMIT = "rate_limit" # 限流,需要等待
    TIMEOUT = "timeout"       # 超时,需要调整参数

@dataclass
class RetryPolicy:
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 30.0
    backoff_factor: float = 2.0
    jitter: bool = True
    
    # 各类错误的专门策略
    timeout_retries: int = 2
    timeout_multiplier: float = 1.5  # 超时后,每次放宽超时时间
    
    rate_limit_retries: int = 3
    rate_limit_wait: float = 5.0  # 初始等待时间

def classify_error(error: Exception) -> ErrorCategory:
    """根据异常类型分类"""
    if isinstance(error, asyncio.TimeoutError):
        return ErrorCategory.TIMEOUT
    if isinstance(error, ConnectionError):
        return ErrorCategory.TRANSIENT
    if "rate limit" in str(error).lower() or "429" in str(error):
        return ErrorCategory.RATE_LIMIT
    if "invalid" in str(error).lower() or "permission" in str(error).lower():
        return ErrorCategory.FATAL
    return ErrorCategory.TRANSIENT

class ResilientToolExecutor:
    
    def __init__(self, tool_handlers: dict[str, Callable], default_timeout: float = 10.0):
        self.handlers = tool_handlers
        self.default_timeout = default_timeout
        self.retry_policy = RetryPolicy()
        self._metrics = {"retries": 0, "timeouts": 0, "fatals": 0, "successes": 0}
        
    async def execute_with_retry(
        self,
        tool_name: str,
        arguments: dict,
        timeout: Optional[float] = None
    ) -> tuple[bool, dict]:
        """
        返回 (success, result_or_error)
        如果在多次重试后仍然失败,返回给 LLM 一个善意的提示,而不是错误堆栈
        """
        current_timeout = timeout or self.default_timeout
        delay = self.retry_policy.base_delay
        last_error = None
        
        for attempt in range(self.retry_policy.max_retries + 1):
            try:
                result = await asyncio.wait_for(
                    self.handlers[tool_name](**arguments),
                    timeout=current_timeout
                )
                self._metrics["successes"] += 1
                return True, result
                
            except asyncio.TimeoutError:
                self._metrics["timeouts"] += 1
                last_error = asyncio.TimeoutError(f"Command timed out after {current_timeout}s")
                
                if attempt < self.retry_policy.timeout_retries:
                    # 超时:增加超时时间 + 提示可能是数据量大
                    current_timeout *= self.retry_policy.timeout_multiplier
                    print(f"[重试] {tool_name} attempt {attempt + 1}: 超时,延长至 {current_timeout:.0f}s")
                    await asyncio.sleep(delay)
                    delay = min(delay * self.retry_policy.backoff_factor, self.retry_policy.max_delay)
                    continue
                break
                
            except Exception as e:
                category = classify_error(e)
                last_error = e
                
                if category == ErrorCategory.FATAL:
                    # 致命错误,不重试
                    self._metrics["fatals"] += 1
                    print(f"[致命] {tool_name}: {e} — 不重试")
                    return False, {"error_type": "fatal", "message": str(e)}
                
                if category == ErrorCategory.RATE_LIMIT:
                    self._metrics["retries"] += 1
                    # 限流:等待更长时间
                    wait_time = self.retry_policy.rate_limit_wait * (attempt + 1)
                    print(f"[限流] {tool_name} attempt {attempt + 1}: 等待 {wait_time:.0f}s")
                    await asyncio.sleep(wait_time)
                    continue
                
                # TRANSIENT: 指数退避重试
                self._metrics["retries"] += 1
                if attempt < self.retry_policy.max_retries:
                    jitter = (time.monotonic() % 0.5) if self.retry_policy.jitter else 0
                    actual_delay = delay + jitter
                    print(f"[重试] {tool_name} attempt {attempt + 1}: 等待 {actual_delay:.2f}s")
                    await asyncio.sleep(actual_delay)
                    delay = min(delay * self.retry_policy.backoff_factor, self.retry_policy.max_delay)
                    continue
                break
        
        # 所有重试用完,返回友好的错误消息
        self._metrics["fatals"] += 1
        error_summary = self._get_error_summary(tool_name, last_error)
        return False, {"error_type": "unavailable", "message": error_summary}
    
    def _get_error_summary(self, tool_name: str, error: Exception) -> str:
        """把技术错误转成 LLM 能理解的自然语言"""
        summaries = {
            "search_orders": "订单查询服务暂时不可用,请稍后再试",
            "get_order_status": "订单状态暂时无法获取,可能是系统维护中",
            "search_product": "商品搜索接口异常,请尝试使用其他关键词重试",
            "get_logs": "日志查询超时,请缩小时间范围后重试",
        }
        return summaries.get(tool_name, f"该功能暂时不可用,原因:{str(error)[:100]}")
    
    def get_metrics(self) -> dict:
        return self._metrics


# 使用示例
async def demo_resilient():
    async def mock_api(**kwargs):
        # 模拟偶尔失败
        import random
        await asyncio.sleep(0.1)
        if random.random() < 0.3:
            raise ConnectionError("Connection reset by peer")
        return {"result": f"success for {kwargs}"}
    
    executor = ResilientToolExecutor(
        tool_handlers={"test_api": mock_api},
        default_timeout=5.0
    )
    
    results = []
    for i in range(20):
        success, result = await executor.execute_with_retry(
            "test_api", {"index": i}
        )
        results.append(("OK" if success else "FAIL", str(result)[:50]))
    
    # 统计
    ok_count = sum(1 for r in results if r[0] == "OK")
    print(f"\n=== 结果: {ok_count}/20 成功 ===")
    print(f"指标: {executor.get_metrics()}")

四、稳定性工程实践

1. 工具描述模板体系

我们最终建立了一套规范化的工具描述编写模板:

def make_tool_spec(name: str, spec: dict) -> dict:
    """
    统一的工具 spec 生成器
    spec 结构:
    {
        "description": str,          # 一句话说明
        "business_context": str,     # 业务上下文(谁在用、什么场景)
        "examples": [str],           # 用户请求示例
        "edge_cases": [str],         # 边界情况说明
        "params": [                  # 参数定义
            {
                "name": str,
                "type": str,
                "description": str,
                "enum": list,        # 可选
                "default": any,      # 可选
                "required": bool
            }
        ]
    }
    """
    param_properties = {}
    required_params = []
    
    for p in spec["params"]:
        # 构建参数 schema
        prop = {
            "type": p.get("type", "string"),
            "description": p["description"]
        }
        if "enum" in p:
            prop["enum"] = p["enum"]
        if p.get("default") is not None:
            prop["default"] = p["default"]
        
        param_properties[p["name"]] = prop
        if p.get("required", False):
            required_params.append(p["name"])
    
    # 构建完整的 description,包含业务上下文和示例
    full_description = f"""{spec["description"]}

业务场景:{spec["business_context"]}"""

    if spec.get("examples"):
        full_description += "\n\n用户常见表达示例:\n" + "\n".join(
            f"- {ex}" for ex in spec["examples"]
        )
    
    if spec.get("edge_cases"):
        full_description += "\n\n注意:\n" + "\n".join(
            f"- {ec}" for ec in spec["edge_cases"]
        )
    
    return {
        "type": "function",
        "function": {
            "name": name,
            "description": full_description,
            "parameters": {
                "type": "object",
                "properties": param_properties,
                "required": required_params
            }
        }
    }


# 使用示例
order_search_spec = make_tool_spec("search_orders", {
    "description": "按多种条件搜索历史订单",
    "business_context": "用户查询自己的购买记录。常用于售后、物流追踪、账单核对。",
    "examples": [
        "帮我看看昨天买的手机到哪了",
        "查一下最近的订单",
        "我想看看去年买的那个电脑的订单号",
    ],
    "edge_cases": [
        "如果没有找到任何结果,不要假设订单存在,直接告知用户",
        "如果是查物流状态,需要同时调用 get_order_tracking 获取详细信息",
        "用户说'昨天'指的是自然日,不是 24 小时内",
    ],
    "params": [
        {"name": "keyword", "type": "string", "description": "搜索关键词:订单号、商品名、用户名。意图模糊时传'recent'", "required": True},
        {"name": "status", "type": "string", "description": "筛选状态", "enum": ["pending", "paid", "shipped", "delivered", "cancelled", "refunded"]},
        {"name": "start_date", "type": "string", "description": "开始日期,YYYY-MM-DD 格式"},
        {"name": "end_date", "type": "string", "description": "结束日期,YYYY-MM-DD 格式"},
        {"name": "page", "type": "integer", "description": "页码,从1开始", "default": 1},
        {"name": "page_size", "type": "integer", "description": "每页数量,默认20,最大100", "default": 20},
    ]
})

这套模板引入后,工具描述的编写时间从开发者自己写(平均 10 分钟/个,质量参差不齐)变成了业务方填空(15 分钟/个,质量有保障)。最重要的是,模型的工具调用准确率从平均 73% 提升到了 94%。

2. 限流和超时控制体系

我们设计了一个三层限流架构:

import asyncio
import time
import json
from collections import defaultdict

class ToolCallRateLimiter:
    """
    三层限流器:
    1. 全局 QPS 限制
    2. 每个工具单独的限制
    3. 每个用户的限制
    """
    
    def __init__(self):
        self.global_qps = 50
        self.tool_limits = {
            "search_orders": 10,
            "get_order_status": 30,
            "search_product": 15,
            "get_logs": 5,  # 日志查询消耗大
        }
        self.user_limits = 5  # 每个用户每秒最多5次
        
        # 时间窗口追踪
        self._global_window = _SlidingWindow(1.0, self.global_qps)
        self._tool_windows: dict[str, _SlidingWindow] = {}
        self._user_windows: dict[str, _SlidingWindow] = {}
        
    async def acquire(self, tool_name: str, user_id: str) -> bool:
        """获取执行许可,返回 True=允许,False=拒绝"""
        # 1. 全局限流
        if not self._global_window.acquire():
            return False
        
        # 2. 工具级限流
        tool_limit = self.tool_limits.get(tool_name, 10)
        tool_key = f"tool:{tool_name}"
        if tool_key not in self._tool_windows:
            self._tool_windows[tool_key] = _SlidingWindow(1.0, tool_limit)
        if not self._tool_windows[tool_key].acquire():
            return False
        
        # 3. 用户级限流
        user_key = f"user:{user_id}"
        if user_key not in self._user_windows:
            self._user_windows[user_key] = _SlidingWindow(1.0, self.user_limits)
        if not self._user_windows[user_key].acquire():
            return False
        
        return True


class _SlidingWindow:
    """滑动窗口计数器,线程安全"""
    def __init__(self, window_seconds: float, max_requests: int):
        self.window = window_seconds
        self.max = max_requests
        self._timestamps: list[float] = []
        
    def acquire(self) -> bool:
        now = time.monotonic()
        cutoff = now - self.window
        
        # 清理过期记录
        while self._timestamps and self._timestamps[0] < cutoff:
            self._timestamps.pop(0)
        
        if len(self._timestamps) >= self.max:
            return False
        
        self._timestamps.append(now)
        return True


async def limited_execute(executor: ResilientToolExecutor, tool_name: str, args: dict, user_id: str) -> tuple[bool, dict]:
    """带限流的工具执行包装"""
    limiter = ToolCallRateLimiter()
    
    if not await limiter.acquire(tool_name, user_id):
        return False, {
            "error_type": "rate_limited",
            "message": "当前请求频率过高,请稍后再试"
        }
    
    return await executor.execute_with_retry(tool_name, args)

3. 结构化日志追踪

Tool Calling 的调试比普通 API 难得多。一次用户请求可能包含:1 次 LLM 调用 → 3 个并行 tool call → 每个 tool call 可能多次重试 → 结果汇总后再 1 次 LLM 调用。没有结构化日志根本没法排查问题。

import json
import uuid
import time
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional, Any

@dataclass
class ToolCallSpan:
    """一次工具调用的完整追踪跨度"""
    session_id: str
    turn_id: int
    call_id: str
    tool_name: str
    arguments: dict
    status: str = "pending"  # pending | running | success | failed | rate_limited
    retry_count: int = 0
    duration_ms: Optional[float] = None
    result_preview: Optional[str] = None
    error: Optional[str] = None
    token_usage: Optional[dict] = None
    
    def to_log(self) -> str:
        return json.dumps({
            "type": "tool_call",
            "session_id": self.session_id,
            "turn_id": self.turn_id,
            "call_id": self.call_id,
            "tool_name": self.tool_name,
            "status": self.status,
            "retry": self.retry_count,
            "duration_ms": self.duration_ms,
            "error": self.error,
        }, ensure_ascii=False)


class ToolCallTracer:
    """工具调用追踪器,输出结构化的 JSON 日志"""
    
    def __init__(self):
        self._spans: dict[str, ToolCallSpan] = {}
        self._session_spans: dict[str, list[str]] = {}
    
    def start_call(self, session_id: str, turn_id: int, call_id: str, 
                   tool_name: str, arguments: dict) -> ToolCallSpan:
        span = ToolCallSpan(
            session_id=session_id,
            turn_id=turn_id,
            call_id=call_id,
            tool_name=tool_name,
            arguments=arguments
        )
        self._spans[call_id] = span
        
        if session_id not in self._session_spans:
            self._session_spans[session_id] = []
        self._session_spans[session_id].append(call_id)
        
        # 打印结构化日志行,便于 ELK 等收集
        print(span.to_log())
        return span
    
    def end_call(self, call_id: str, status: str, result: Any = None, 
                 error: Optional[str] = None, retry_count: int = 0,
                 duration_ms: Optional[float] = None):
        span = self._spans.get(call_id)
        if not span:
            return
        span.status = status
        span.retry_count = retry_count
        span.duration_ms = duration_ms
        if result:
            span.result_preview = str(result)[:200]
        span.error = error
        print(span.to_log())
    
    def get_session_trace(self, session_id: str) -> list[dict]:
        """获取一次会话的完整调用链"""
        call_ids = self._session_spans.get(session_id, [])
        return [asdict(self._spans[cid]) for cid in call_ids if cid in self._spans]
    
    def get_summary(self, session_id: str) -> dict:
        """生成会话摘要"""
        spans = self.get_session_trace(session_id)
        total = len(spans)
        successes = sum(1 for s in spans if s["status"] == "success")
        failures = sum(1 for s in spans if s["status"] == "failed")
        retries = sum(s.get("retry_count", 0) for s in spans)
        avg_duration = sum(s.get("duration_ms", 0) or 0 for s in spans) / max(total, 1)
        
        return {
            "session_id": session_id,
            "total_calls": total,
            "success_rate": f"{successes/max(total,1)*100:.1f}%",
            "total_retries": retries,
            "avg_duration_ms": round(avg_duration, 1),
            "tool_breakdown": {
                tool: sum(1 for s in spans if s["tool_name"] == tool)
                for tool in set(s["tool_name"] for s in spans)
            }
        }


# 使用示例
tracer = ToolCallTracer()

async def traced_execute(session_id: str, turn_id: int, executor, 
                         tool_name: str, args: dict) -> tuple[bool, dict]:
    call_id = f"{session_id}-{turn_id}-{uuid.uuid4().hex[:6]}"
    start = time.monotonic()
    
    tracer.start_call(session_id, turn_id, call_id, tool_name, args)
    
    success, result = await executor.execute_with_retry(tool_name, args)
    
    duration_ms = (time.monotonic() - start) * 1000
    status = "success" if success else "failed"
    error = result.get("message") if not success else None
    
    tracer.end_call(call_id, status, result, error, duration_ms=duration_ms)
    
    return success, result

有了这个追踪体系后,我们每个 tool call 都会输出如下格式的 JSON 行:

{"type":"tool_call","session_id":"sess_abc123","turn_id":1,"call_id":"sess_abc123-1-a1b2c3","tool_name":"search_orders","status":"success","retry":0,"duration_ms":235,"error":null}

直接用 jq 就能做数据分析:

# 查看失败率最高的工具
cat tool_traces.jsonl | jq 'select(.status == "failed") | .tool_name' | sort | uniq -c | sort -rn

# 查看平均执行时间
cat tool_traces.jsonl | jq 'select(.status == "success") | .duration_ms' | awk '{s+=$1;c++} END {print s/c}'

五、评估体系:如何量化 Tool Calling 成功率

没有度量就没有改进。我们建立了一套完整的评估体系来量化工具调用的效果。

评估指标

from dataclasses import dataclass, field
from typing import Optional
from collections import Counter

@dataclass
class TurnEvaluation:
    """单轮对话的 tool calling 评估"""
    query: str
    expected_tool: str
    expected_params: dict  # 关键参数期望值
    actual_tool: Optional[str] = None
    actual_params: Optional[dict] = None
    tool_success: bool = False  # 工具执行是否成功
    param_correctness: float = 0.0  # 参数正确率 0-1
    final_answer_quality: int = 0  # 人工评分 1-5


def evaluate_tool_call_accuracy(dataset_path: str) -> dict:
    """工具调用准确率评估"""
    import json
    
    with open(dataset_path) as f:
        cases = json.load(f)
    
    results = {
        "total": len(cases),
        "tool_match": 0,  # 工具选择正确
        "param_exact": 0,  # 参数完全正确
        "param_partial": 0,  # 参数部分正确
        "execution_success": 0,  # 执行成功
    }
    
    parameter_errors = Counter()
    
    for case in cases:
        query = case["query"]
        expected = case["expected"]
        
        # 模拟调用
        actual = simulate_tool_call(query)
        
        # 1. 工具选择是否正确
        if actual["tool_name"] == expected["tool"]:
            results["tool_match"] += 1
        else:
            continue  # 工具都选错了,后面的不用测了
        
        # 2. 参数正确率
        expected_params = expected.get("params", {})
        actual_params = actual.get("params", {})
        
        if not expected_params:
            results["param_exact"] += 1
            results["execution_success"] += 1
            continue
        
        param_score = 0
        total_params = len(expected_params)
        for key, expected_val in expected_params.items():
            actual_val = actual_params.get(key)
            if actual_val == expected_val:
                param_score += 1
            else:
                parameter_errors[f"{key}: {expected_val}{actual_val}"] += 1
        
        if param_score == total_params:
            results["param_exact"] += 1
        elif param_score > 0:
            results["param_partial"] += 1
        
        # 3. 模拟执行(只用参数正确的)
        if actual_params:
            results["execution_success"] += 1
    
    return {
        "total": results["total"],
        "tool_selection_rate": f"{results['tool_match'] / results['total']:.1%}",
        "param_exact_rate": f"{results['param_exact'] / results['total']:.1%}",
        "param_partial_rate": f"{results['param_partial'] / results['total']:.1%}",
        "execution_success_rate": f"{results['execution_success'] / results['total']:.1%}",
        "top_parameter_errors": parameter_errors.most_common(10),
    }


# 生成报告
def generate_report(metrics: dict, tracer_summary: dict) -> str:
    """生成可读的评估报告"""
    lines = [
        "## Tool Calling 质量报告\n",
        f"生成时间: {datetime.now().isoformat()}",
        "",
        "### 核心指标",
        f"- 工具选择准确率: {metrics['tool_selection_rate']}",
        f"- 参数完全正确率: {metrics['param_exact_rate']}",
        f"- 执行成功率: {metrics['execution_success_rate']}",
        f"- 平均调用耗时: {tracer_summary.get('avg_duration_ms', 'N/A')}ms",
        f"- 总重试次数: {tracer_summary.get('total_retries', 'N/A')}",
        "",
        "### 各工具调用频次",
    ]
    
    for tool, count in tracer_summary.get("tool_breakdown", {}).items():
        lines.append(f"- {tool}: {count} 次")
    
    lines.extend([
        "",
        "### Top 参数错误",
    ])
    
    for err, count in metrics.get("top_parameter_errors", []):
        lines.append(f"- {err} (出现 {count} 次)")
    
    return "\n".join(lines)

我们的实测数据

在我们的生产环境(120+ 工具,日均 50 万次调用),经过上述改造后,各项指标变化如下:

指标 改造前 改造后
工具选择准确率 72% 97%
参数完全正确率 51% 89%
执行成功率(含重试) 78% 99.3%
平均调用耗时 1.2s 0.8s
用户满意度(NPS) 32 68

特别说明:99.3% 的执行成功率意味着我们仍然有 0.7% 的失败率。对一个日均 50 万次调用的系统来说,这就是每天 3500 次失败。这 3500 次失败大部分来自下游服务的真实故障(第三方 API 不稳定、数据库超时等),而不是工程框架的问题。我们接受这个现实,但通过优雅降级(友好提示 + 转接人工)确保用户体验不受影响。

六、总结

回到开篇那个凌晨三点的事故。在完成了上述所有改造后,同样的场景再次出现——某上游 API 返回了 10MB 的数据。这一次,压缩器成功将结果缩小到 1200 tokens,限流器阻止了雪崩,重试器在两次超时后返回了
友好的降级提示。整个系统安然无恙。

我凌晨三点被叫醒的故事再也没有发生过。

核心经验总结

如果你也在构建 LLM Agent 的 Tool Calling 系统,这是我的核心建议:

  1. 工具描述是工程问题,不是文档问题:花时间写好每条描述,包含业务上下文、示例和边界条件。让产品经理参与描述编写。
  2. 无状态是并发安全的基础:每个 tool call 拥有独立的上下文对象,使用 asyncio.Semaphore 控制并发度。
  3. 永远不要信任工具返回的数据量:所有返回结果都要经过压缩/截断,设置硬性上限。
  4. 分级重试,优雅降级:把 HTTP 500 和 HTTP 429 分别处理,指数退避+抖动。最终失败要有友好的用户提示。
  5. 三层限流:全局、工具级、用户级,任何一层都不能少。
  6. 没有日志等于没有系统:结构化 JSON 日志行,让 ELK 或任何日志平台可以轻松分析。
  7. 量化评估:没有指标就没有改进。建立持续评估 pipeline,追踪工具选择率、参数正确率、执行成功率。

最后一点感悟

很多团队把 LLM Agent 看作一个"黑盒魔法盒子"——扔进去一个 Prompt,就能自动完成任务。但真实的生产环境告诉我们,Agent 系统的可靠性不是模型给出的,而是工程构建的

Tool Calling 是 Agent 与外部世界交互的桥梁。这座桥梁的设计质量,直接决定了整个系统的可用性。那些看似琐碎的工程细节——工具描述怎么写、超时怎么设、错误怎么处理——恰恰是区分 Demo 级产品和生产级系统的关键。

如果这篇文章能帮你避免哪怕一次凌晨三点的告警电话,那就值了。

附录:文中涉及的完整代码

所有代码示例可以在以下 GitHub Gist 中找到:https://gist.github.com/your-username/tool-calling-engineering

文中代码已经过简化以便阅读,生产环境的版本增加了更完善的类型检查、重试持久化和监控集成,欢迎 fork 和 PR。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐