我在生产环境踩的那些LLM Tool Calling 坑——从崩溃到 99% 稳定的工程实践
凌晨两点四十七分,手机疯狂震动。PagerDuty 告警:生产环境的 Agent 系统成功率从 92% 跌到了 31%。
一、引言:一次让我凌晨三点爬起来的事故
凌晨两点四十七分,手机疯狂震动。PagerDuty 告警:生产环境的 Agent 系统成功率从 92% 跌到了 31%。
我穿着拖鞋冲进办公室,打开 Kibana 日志面板。3 万个用户请求中,有超过 2 万个因为"tool_call_execution_error"超时失败。用户投诉如潮水般涌入:“机器人在无限循环”“一直说请稍等”“客服机器人疯了”。
这不是模型能力的问题。这是工程问题。
追溯根因:一个上游 API 返回了 8MB 的非结构化 JSON,直接塞进了 Agent 的 tool call 结果。LLM 尝试处理这个庞然大物,上下文塞爆,开始胡言乱语。随后重试策略没有退避,同一个请求触发了 15 次重复调用,击穿了下游服务限流,最终整个系统雪崩。
那一夜之后,我带着团队花了三个月时间,系统性地重构了 Tool Calling 的整个工程链路。本文将还原这段经历,分享我们在 LLM Agent Tool Calling 场景中踩过的坑、提炼的实践和最终的工程方案。
二、背景:Tool Calling 远比你想象的脆弱
简单回顾一下 Tool Calling 的工作机制。LLM 在生成回复时,不是直接调用外部函数,而是由模型"声明"它想调用哪个工具以及传入什么参数,由工程层去实际执行调用,然后把结果返回给模型做下一步推理。
用户请求 → LLM 推理 → 输出 tool_call
→ 工程层解析参数 → 执行外部 API
→ 返回结果给 LLM → LLM 生成最终回答
听起来简单?在实际生产环境中,每一个箭头都可能断裂。
我们的系统管理着超过 120 个工具函数,涵盖订单查询、库存检查、物流跟踪、客服转接、优惠计算等业务场景。每个工具都有不同的参数、不同的超时限制、不同的返回数据量。维护这些工具描述,并与模型保持一致,本身就是一个工程难题。
三、核心工程挑战(掉过的坑)
挑战 1:工具描述质量——“模型看不懂你写的 API”
现象: 模型频繁传错参数,或者调用错误的工具。
我最初认为这很荒谬——API 文档明明写得很清楚。直到我们统计了 1000 次 tool call 数据:
| 工具描述类型 | 参数正确率 | 工具选择正确率 | 平均推理延迟 |
|---|---|---|---|
| 一句话描述 | 47% | 62% | 1.2s |
| 带参数类型说明 | 68% | 78% | 1.4s |
| 含业务上下文 | 83% | 91% | 1.5s |
| 含完整示例 + 边界条件 | 91% | 96% | 1.7s |
教训: 模型不读文档,它理解自然语言。让工具描述像业务文档一样写。
踩坑代码——不好的实践:
# 反模式:工具描述过于简略
tools = [
{
"type": "function",
"function": {
"name": "search_orders",
"description": "搜索订单",
"parameters": {
"type": "object",
"properties": {
"keyword": {"type": "string", "description": "关键词"},
"status": {"type": "string", "description": "订单状态"},
"page": {"type": "integer", "description": "页码"},
},
"required": ["keyword"]
}
}
}
]
这个描述有什么问题?"关键词"不够精确——是订单号、商品名还是用户名?"订单状态"有哪些可选值?"页码"的默认值是多少?模型每次都在猜,而且猜不对。
正确实践——详细到让人反感的描述:
# 最佳实践:包含业务上下文、枚举值和默认值
tools = [
{
"type": "function",
"function": {
"name": "search_orders",
"description": """按关键词搜索历史订单。关键词可以是:订单号(如 ORD-202405-0001)、商品名称、买家用户名。
如果用户说"最近订单"或没有明确关键词,使用 keyword="recent"。
分页默认返回第1页(page=1),每页20条。
如果搜索不到结果,不要编造,如实告知用户。""",
"parameters": {
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "搜索关键词(订单号/商品名/用户名)。当用户意图模糊时传'recent'"
},
"status": {
"type": "string",
"description": "筛选订单状态。可选值:pending(待付款), paid(已付款), shipped(已发货), delivered(已签收), cancelled(已取消), refunded(已退款)。不传则查询所有状态"
},
"page": {
"type": "integer",
"description": "页码,从1开始,默认1"
},
"page_size": {
"type": "integer",
"description": "每页数量,默认20,最大100"
}
},
"required": ["keyword"]
}
}
}
]
这段描述看起来啰嗦,但实测准确率从 62% 提升到了 91%。模型不需要猜了,它直接"看到"了业务规则。
实验数据(我们内部 benchmark):
我们用同样的 500 个测试 query 测试了两种描述:
# 对比实验代码
import json
import time
from openai import OpenAI
client = OpenAI()
test_queries = [
"帮我查一下昨天买的手机到哪了",
"看看我有哪些待付款的订单",
"订单号 ORD-202405-0088 现在什么状态",
"最近一个月取消的订单有哪些",
"搜索关键词洗衣机,看已签收的",
# ... 共 500 条
]
def test_tool_descriptions(tools, queries):
successes = 0
for q in queries:
try:
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": q}],
tools=tools,
tool_choice="auto"
)
msg = resp.choices[0].message
if msg.tool_calls:
# 验证参数格式是否合法
for tc in msg.tool_calls:
args = json.loads(tc.function.arguments)
# 简单的合法性检查
if isinstance(args.get("keyword"), str):
successes += 1
break
except Exception as e:
print(f"Error: {e}")
return successes / len(queries)
# 简单描述
simple_tools = [...] # 简略版
# 详细描述
detailed_tools = [...] # 详细版
print(f"简单描述成功率: {test_tool_descriptions(simple_tools, test_queries):.1%}")
print(f"详细描述成功率: {test_tool_descriptions(detailed_tools, test_queries):.1%}")
挑战 2:并发工具调用的竞态问题
现象: 当 LLM 同时调用多个工具(并行 tool calls)时,部分工具的参数被互相覆盖,或结果张冠李戴。
有一次我们遇到一个诡异 bug:用户查"我的手机和耳机",模型正确识别出两个查询意图,同时调用了 search_product("手机") 和 search_product("耳机")。但最终对话输出说"您的手机是:耳机 A,售价 299 元……"
原因是我们在后端用一个共享的 dict 缓存了 tool call 的中间状态,两个并发的 tool call 写入了同一个键,导致了数据交叉。
问题复现代码:
import asyncio
import json
from typing import Any
# 反模式:共享的可变状态
_shared_state: dict[str, Any] = {}
async def execute_tool_call(tool_name: str, arguments: dict, call_id: str) -> str:
global _shared_state
# 竞态!两个并发调用会互相覆盖
_shared_state["current_tool"] = tool_name
_shared_state["current_args"] = arguments
_shared_state["current_call_id"] = call_id
# 模拟外部 API 调用延迟
await asyncio.sleep(0.1)
result = await call_external_api(tool_name, arguments)
# 问题:此时 _shared_state 可能已经被另一个协程覆盖
log_to_tracing(f"Tool {_shared_state['current_tool']} returned", _shared_state["current_call_id"])
return json.dumps({"result": result, "call_id": call_id})
# 模拟并发调用
async def main():
tasks = [
execute_tool_call("search_product", {"keyword": "手机"}, "call_001"),
execute_tool_call("search_product", {"keyword": "耳机"}, "call_002"),
]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
# 输出可能长这样:
# {"result": "手机搜索结果...", "call_id": "call_001"} ✅ 正确
# {"result": "耳机搜索结果...", "call_id": "call_002"} ❌ 日志中 current_tool 是 "search_product"(但无法区分是哪个)
解决方案——无状态设计 + 上下文隔离:
import asyncio
import json
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class ToolCallContext:
"""每个 tool call 拥有独立的上下文"""
call_id: str
tool_name: str
arguments: dict
trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
result: Optional[str] = None
error: Optional[str] = None
start_time: float = 0.0
end_time: float = 0.0
class ToolExecutor:
"""无状态的工具执行器"""
def __init__(self, api_timeout: float = 5.0):
self._semaphore = asyncio.Semaphore(5) # 限制最大并发数
self._api_timeout = api_timeout
self._pending_calls: dict[str, asyncio.Task] = {}
async def execute(self, ctx: ToolCallContext) -> str:
async with self._semaphore:
ctx.start_time = time.monotonic()
try:
# 每个调用完全隔离
executor = self._get_handler(ctx.tool_name)
if executor is None:
raise ValueError(f"未知工具: {ctx.tool_name}")
# 带超时的 API 调用
result = await asyncio.wait_for(
executor(**ctx.arguments),
timeout=self._api_timeout
)
ctx.result = json.dumps(result, ensure_ascii=False)
ctx.end_time = time.monotonic()
self._log_completion(ctx)
return ctx.result
except asyncio.TimeoutError:
ctx.error = f"timeout after {self._api_timeout}s"
ctx.end_time = time.monotonic()
self._log_error(ctx)
return json.dumps({"error": "timeout", "tool": ctx.tool_name})
except Exception as e:
ctx.error = str(e)
ctx.end_time = time.monotonic()
self._log_error(ctx)
return json.dumps({"error": str(e), "tool": ctx.tool_name})
async def execute_parallel(self, call_contexts: list[ToolCallContext]) -> list[str]:
"""并行执行多个工具调用,彼此完全隔离"""
tasks = [self.execute(ctx) for ctx in call_contexts]
return await asyncio.gather(*tasks)
def _get_handler(self, tool_name: str):
"""查找工具处理函数"""
registry = {
"search_product": self._handler_search_product,
"get_order_status": self._handler_get_order_status,
}
return registry.get(tool_name)
async def _handler_search_product(self, keyword: str, **kwargs) -> dict:
# 模拟外部 API
await asyncio.sleep(0.2)
return {"keyword": keyword, "results": [f"商品A({keyword}版)", f"商品B({keyword}版)"]}
async def _handler_get_order_status(self, order_id: str, **kwargs) -> dict:
await asyncio.sleep(0.1)
return {"order_id": order_id, "status": "shipped"}
def _log_completion(self, ctx: ToolCallContext):
duration = (ctx.end_time - ctx.start_time) * 1000
print(f"[OK] {ctx.trace_id} | {ctx.tool_name}({json.dumps(ctx.arguments, ensure_ascii=False)}) | {duration:.0f}ms")
def _log_error(self, ctx: ToolCallContext):
duration = (ctx.end_time - ctx.start_time) * 1000
print(f"[ERR] {ctx.trace_id} | {ctx.tool_name} | {duration:.0f}ms | {ctx.error}")
# 使用示例
async def demo():
executor = ToolExecutor(api_timeout=3.0)
contexts = [
ToolCallContext(call_id="c1", tool_name="search_product", arguments={"keyword": "手机"}),
ToolCallContext(call_id="c2", tool_name="search_product", arguments={"keyword": "耳机"}),
ToolCallContext(call_id="c3", tool_name="get_order_status", arguments={"order_id": "ORD-001"}),
]
results = await executor.execute_parallel(contexts)
for ctx, result in zip(contexts, results):
if ctx.error:
print(f" ❌ {ctx.tool_name}({ctx.arguments}): {ctx.error}")
else:
print(f" ✅ {ctx.tool_name}({ctx.arguments}): {result[:60]}...")
# asyncio.run(demo())
挑战 3:工具返回内容过长导致的上下文溢出
现象: LLM 开始胡言乱语、遗漏信息、或者输出空的 tool call。
这就是开篇事故的根因。当工具返回大量数据时(比如一次拉取 500 条订单记录,或者一个日志搜索返回 2MB 的文本),这些数据全部塞进 assistant message 的 content 里,直接撑爆上下文窗口。
问题分析代码:
import json
from typing import Any
# 反模式:无裁剪返回
async def naive_execute(tool_name: str, args: dict) -> dict:
raw_result = await call_api(tool_name, args)
# 直接返回,完全不控制数据量
return {
"tool_call_id": args.get("call_id"),
"result": raw_result # 可能是 10MB 的数据
}
# 经过实验,我们发现问题呈现明显的阈值效应:
def simulate_context_overflow(content_length: int) -> str:
"""模拟不同长度下模型的回复质量"""
if content_length < 2000:
return "回复准确,信息完整"
elif content_length < 5000:
return "回复基本正确,偶尔遗漏细节"
elif content_length < 10000:
return "回复质量下降明显,开始遗漏关键信息"
elif content_length < 20000:
return "经常遗漏核心内容,需要多次补充提问"
else:
return "胡言乱语,产生幻觉,或直接超时"
# 我们在 GPT-4o 上实测的阈值数据
# Context: 128K tokens
# 当 tool result 超过 ~8K tokens 时,开始出现显著质量下降
# 当 tool result 超过 ~20K tokens 时,几乎不可用
解决方案——智能结果压缩:
import json
import tiktoken
from typing import Any, Optional
class ToolResultCompressor:
"""智能压缩工具返回结果,保持信息密度最大化"""
def __init__(self, max_tokens: int = 4000, model: str = "gpt-4o"):
self.max_tokens = max_tokens
self.encoder = tiktoken.encoding_for_model(model)
def compress(self, raw_result: Any, tool_name: str) -> str:
"""根据工具类型选择合适的压缩策略"""
# 检查原始大小
serialized = json.dumps(raw_result, ensure_ascii=False, default=str)
token_count = len(self.encoder.encode(serialized))
if token_count <= self.max_tokens:
return serialized
# 需要压缩
strategy = self._select_strategy(tool_name, raw_result)
compressed = strategy(raw_result)
final = json.dumps(compressed, ensure_ascii=False, default=str)
final_tokens = len(self.encoder.encode(final))
print(f"[压缩] {tool_name}: {token_count} tokens → {final_tokens} tokens ({(1 - final_tokens/token_count)*100:.0f}% 压缩率)")
# 如果压缩后仍然超限,做截断
if final_tokens > self.max_tokens:
final = self._truncate(final)
return final
def _select_strategy(self, tool_name: str, data: Any):
"""为每种工具选择最优压缩策略"""
strategies = {
"search_orders": self._compress_list,
"search_product": self._compress_product_list,
"get_logs": self._compress_logs,
"get_customer_info": self._compress_customer,
}
return strategies.get(tool_name, self._compress_generic)
def _compress_list(self, data: list) -> list:
"""列表数据:保留关键字段,丢弃冗余信息"""
if not isinstance(data, list):
return data
# 如果列表太长,采样并告诉模型总数
if len(data) > 20:
sampled = data[:15] # 取前 15 条
for item in sampled:
if isinstance(item, dict):
# 只保留关键字段
item.pop("internal_note", None)
item.pop("raw_response", None)
item.pop("debug_info", None)
return {
"total_count": len(data),
"showing": len(sampled),
"sample": sampled,
"_note": f"共 {len(data)} 条结果,这里仅展示 {len(sampled)} 条示例"
}
for item in data:
if isinstance(item, dict):
item.pop("internal_note", None)
item.pop("raw_response", None)
return data
def _compress_product_list(self, data: list) -> list:
if not isinstance(data, list):
return data
# 商品列表:保留标题、价格、库存等关键信息
compressed = []
for item in data[:20]:
compressed.append({
"id": item.get("id"),
"name": item.get("name"),
"price": item.get("price"),
"stock": item.get("stock"),
"rating": item.get("rating"),
})
return {
"total": len(data),
"items": compressed,
"_note": f"共 {len(data)} 个商品,这里展示 {len(compressed)} 个关键信息"
}
def _compress_logs(self, data: list) -> str:
"""日志数据:聚合同类日志"""
if not isinstance(data, list):
return str(data)
from collections import Counter
levels = Counter()
patterns = Counter()
for log in data[:500]: # 最多处理 500 条
if isinstance(log, dict):
levels[log.get("level", "INFO")] += 1
msg = log.get("message", "")
# 提取日志模式
pattern = msg[:50] + "..." if len(msg) > 50 else msg
patterns[pattern] += 1
else:
patterns[str(log)[:50]] += 1
return json.dumps({
"total_logs": len(data),
"level_distribution": dict(levels),
"common_patterns": dict(patterns.most_common(10)),
}, ensure_ascii=False)
def _compress_customer(self, data: dict) -> dict:
"""客户信息:按需筛选字段"""
safe_fields = {"name", "phone", "email", "level", "total_orders", "register_date"}
return {k: v for k, v in data.items() if k in safe_fields}
def _compress_generic(self, data: Any) -> Any:
if isinstance(data, dict):
# 只保留前 15 个键
keys = list(data.keys())[:15]
return {k: data[k] for k in keys}
if isinstance(data, list):
return data[:30]
return str(data)[:2000]
def _truncate(self, text: str) -> str:
tokens = self.encoder.encode(text)
truncated = self.encoder.decode(tokens[:self.max_tokens - 100]) # 留一些余量
return truncated + "\n\n[注意:结果已被截断,完整数据需要分页查询]"
# 使用示例
async def safe_execute_with_compress(executor, ctx, compressor):
raw_result = await executor.execute(ctx)
compressed = compressor.compress(
json.loads(raw_result),
ctx.tool_name
)
return compressed
挑战 4:错误处理和重试策略——不能简单重试
现象: 工具执行失败后,把错误简单返回给 LLM,导致模型陷入无限重试循环。
这是我们最惨痛的教训。初始的"重试"逻辑:
# 反模式:无限制重试
MAX_RETRIES = 5
attempt = 0
while attempt < MAX_RETRIES:
try:
result = await execute_tool(tool_name, args)
return result
except Exception as e:
attempt += 1
# 直接把错误丢给 LLM 处理
return {"error": str(e), "retry_count": attempt}
问题是 LLM 没有足够的上下文理解为什么失败,经常重新发起同一个 tool call,参数一模一样,结果再次失败。我们见过一个 API 被同一个参数调用了 37 次的案例。
正确方案——分级重试策略:
import asyncio
import json
import time
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable
class ErrorCategory(Enum):
TRANSIENT = "transient" # 可重试的临时错误
FATAL = "fatal" # 不可恢复的错误
RATE_LIMIT = "rate_limit" # 限流,需要等待
TIMEOUT = "timeout" # 超时,需要调整参数
@dataclass
class RetryPolicy:
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 30.0
backoff_factor: float = 2.0
jitter: bool = True
# 各类错误的专门策略
timeout_retries: int = 2
timeout_multiplier: float = 1.5 # 超时后,每次放宽超时时间
rate_limit_retries: int = 3
rate_limit_wait: float = 5.0 # 初始等待时间
def classify_error(error: Exception) -> ErrorCategory:
"""根据异常类型分类"""
if isinstance(error, asyncio.TimeoutError):
return ErrorCategory.TIMEOUT
if isinstance(error, ConnectionError):
return ErrorCategory.TRANSIENT
if "rate limit" in str(error).lower() or "429" in str(error):
return ErrorCategory.RATE_LIMIT
if "invalid" in str(error).lower() or "permission" in str(error).lower():
return ErrorCategory.FATAL
return ErrorCategory.TRANSIENT
class ResilientToolExecutor:
def __init__(self, tool_handlers: dict[str, Callable], default_timeout: float = 10.0):
self.handlers = tool_handlers
self.default_timeout = default_timeout
self.retry_policy = RetryPolicy()
self._metrics = {"retries": 0, "timeouts": 0, "fatals": 0, "successes": 0}
async def execute_with_retry(
self,
tool_name: str,
arguments: dict,
timeout: Optional[float] = None
) -> tuple[bool, dict]:
"""
返回 (success, result_or_error)
如果在多次重试后仍然失败,返回给 LLM 一个善意的提示,而不是错误堆栈
"""
current_timeout = timeout or self.default_timeout
delay = self.retry_policy.base_delay
last_error = None
for attempt in range(self.retry_policy.max_retries + 1):
try:
result = await asyncio.wait_for(
self.handlers[tool_name](**arguments),
timeout=current_timeout
)
self._metrics["successes"] += 1
return True, result
except asyncio.TimeoutError:
self._metrics["timeouts"] += 1
last_error = asyncio.TimeoutError(f"Command timed out after {current_timeout}s")
if attempt < self.retry_policy.timeout_retries:
# 超时:增加超时时间 + 提示可能是数据量大
current_timeout *= self.retry_policy.timeout_multiplier
print(f"[重试] {tool_name} attempt {attempt + 1}: 超时,延长至 {current_timeout:.0f}s")
await asyncio.sleep(delay)
delay = min(delay * self.retry_policy.backoff_factor, self.retry_policy.max_delay)
continue
break
except Exception as e:
category = classify_error(e)
last_error = e
if category == ErrorCategory.FATAL:
# 致命错误,不重试
self._metrics["fatals"] += 1
print(f"[致命] {tool_name}: {e} — 不重试")
return False, {"error_type": "fatal", "message": str(e)}
if category == ErrorCategory.RATE_LIMIT:
self._metrics["retries"] += 1
# 限流:等待更长时间
wait_time = self.retry_policy.rate_limit_wait * (attempt + 1)
print(f"[限流] {tool_name} attempt {attempt + 1}: 等待 {wait_time:.0f}s")
await asyncio.sleep(wait_time)
continue
# TRANSIENT: 指数退避重试
self._metrics["retries"] += 1
if attempt < self.retry_policy.max_retries:
jitter = (time.monotonic() % 0.5) if self.retry_policy.jitter else 0
actual_delay = delay + jitter
print(f"[重试] {tool_name} attempt {attempt + 1}: 等待 {actual_delay:.2f}s")
await asyncio.sleep(actual_delay)
delay = min(delay * self.retry_policy.backoff_factor, self.retry_policy.max_delay)
continue
break
# 所有重试用完,返回友好的错误消息
self._metrics["fatals"] += 1
error_summary = self._get_error_summary(tool_name, last_error)
return False, {"error_type": "unavailable", "message": error_summary}
def _get_error_summary(self, tool_name: str, error: Exception) -> str:
"""把技术错误转成 LLM 能理解的自然语言"""
summaries = {
"search_orders": "订单查询服务暂时不可用,请稍后再试",
"get_order_status": "订单状态暂时无法获取,可能是系统维护中",
"search_product": "商品搜索接口异常,请尝试使用其他关键词重试",
"get_logs": "日志查询超时,请缩小时间范围后重试",
}
return summaries.get(tool_name, f"该功能暂时不可用,原因:{str(error)[:100]}")
def get_metrics(self) -> dict:
return self._metrics
# 使用示例
async def demo_resilient():
async def mock_api(**kwargs):
# 模拟偶尔失败
import random
await asyncio.sleep(0.1)
if random.random() < 0.3:
raise ConnectionError("Connection reset by peer")
return {"result": f"success for {kwargs}"}
executor = ResilientToolExecutor(
tool_handlers={"test_api": mock_api},
default_timeout=5.0
)
results = []
for i in range(20):
success, result = await executor.execute_with_retry(
"test_api", {"index": i}
)
results.append(("OK" if success else "FAIL", str(result)[:50]))
# 统计
ok_count = sum(1 for r in results if r[0] == "OK")
print(f"\n=== 结果: {ok_count}/20 成功 ===")
print(f"指标: {executor.get_metrics()}")
四、稳定性工程实践
1. 工具描述模板体系
我们最终建立了一套规范化的工具描述编写模板:
def make_tool_spec(name: str, spec: dict) -> dict:
"""
统一的工具 spec 生成器
spec 结构:
{
"description": str, # 一句话说明
"business_context": str, # 业务上下文(谁在用、什么场景)
"examples": [str], # 用户请求示例
"edge_cases": [str], # 边界情况说明
"params": [ # 参数定义
{
"name": str,
"type": str,
"description": str,
"enum": list, # 可选
"default": any, # 可选
"required": bool
}
]
}
"""
param_properties = {}
required_params = []
for p in spec["params"]:
# 构建参数 schema
prop = {
"type": p.get("type", "string"),
"description": p["description"]
}
if "enum" in p:
prop["enum"] = p["enum"]
if p.get("default") is not None:
prop["default"] = p["default"]
param_properties[p["name"]] = prop
if p.get("required", False):
required_params.append(p["name"])
# 构建完整的 description,包含业务上下文和示例
full_description = f"""{spec["description"]}
业务场景:{spec["business_context"]}"""
if spec.get("examples"):
full_description += "\n\n用户常见表达示例:\n" + "\n".join(
f"- {ex}" for ex in spec["examples"]
)
if spec.get("edge_cases"):
full_description += "\n\n注意:\n" + "\n".join(
f"- {ec}" for ec in spec["edge_cases"]
)
return {
"type": "function",
"function": {
"name": name,
"description": full_description,
"parameters": {
"type": "object",
"properties": param_properties,
"required": required_params
}
}
}
# 使用示例
order_search_spec = make_tool_spec("search_orders", {
"description": "按多种条件搜索历史订单",
"business_context": "用户查询自己的购买记录。常用于售后、物流追踪、账单核对。",
"examples": [
"帮我看看昨天买的手机到哪了",
"查一下最近的订单",
"我想看看去年买的那个电脑的订单号",
],
"edge_cases": [
"如果没有找到任何结果,不要假设订单存在,直接告知用户",
"如果是查物流状态,需要同时调用 get_order_tracking 获取详细信息",
"用户说'昨天'指的是自然日,不是 24 小时内",
],
"params": [
{"name": "keyword", "type": "string", "description": "搜索关键词:订单号、商品名、用户名。意图模糊时传'recent'", "required": True},
{"name": "status", "type": "string", "description": "筛选状态", "enum": ["pending", "paid", "shipped", "delivered", "cancelled", "refunded"]},
{"name": "start_date", "type": "string", "description": "开始日期,YYYY-MM-DD 格式"},
{"name": "end_date", "type": "string", "description": "结束日期,YYYY-MM-DD 格式"},
{"name": "page", "type": "integer", "description": "页码,从1开始", "default": 1},
{"name": "page_size", "type": "integer", "description": "每页数量,默认20,最大100", "default": 20},
]
})
这套模板引入后,工具描述的编写时间从开发者自己写(平均 10 分钟/个,质量参差不齐)变成了业务方填空(15 分钟/个,质量有保障)。最重要的是,模型的工具调用准确率从平均 73% 提升到了 94%。
2. 限流和超时控制体系
我们设计了一个三层限流架构:
import asyncio
import time
import json
from collections import defaultdict
class ToolCallRateLimiter:
"""
三层限流器:
1. 全局 QPS 限制
2. 每个工具单独的限制
3. 每个用户的限制
"""
def __init__(self):
self.global_qps = 50
self.tool_limits = {
"search_orders": 10,
"get_order_status": 30,
"search_product": 15,
"get_logs": 5, # 日志查询消耗大
}
self.user_limits = 5 # 每个用户每秒最多5次
# 时间窗口追踪
self._global_window = _SlidingWindow(1.0, self.global_qps)
self._tool_windows: dict[str, _SlidingWindow] = {}
self._user_windows: dict[str, _SlidingWindow] = {}
async def acquire(self, tool_name: str, user_id: str) -> bool:
"""获取执行许可,返回 True=允许,False=拒绝"""
# 1. 全局限流
if not self._global_window.acquire():
return False
# 2. 工具级限流
tool_limit = self.tool_limits.get(tool_name, 10)
tool_key = f"tool:{tool_name}"
if tool_key not in self._tool_windows:
self._tool_windows[tool_key] = _SlidingWindow(1.0, tool_limit)
if not self._tool_windows[tool_key].acquire():
return False
# 3. 用户级限流
user_key = f"user:{user_id}"
if user_key not in self._user_windows:
self._user_windows[user_key] = _SlidingWindow(1.0, self.user_limits)
if not self._user_windows[user_key].acquire():
return False
return True
class _SlidingWindow:
"""滑动窗口计数器,线程安全"""
def __init__(self, window_seconds: float, max_requests: int):
self.window = window_seconds
self.max = max_requests
self._timestamps: list[float] = []
def acquire(self) -> bool:
now = time.monotonic()
cutoff = now - self.window
# 清理过期记录
while self._timestamps and self._timestamps[0] < cutoff:
self._timestamps.pop(0)
if len(self._timestamps) >= self.max:
return False
self._timestamps.append(now)
return True
async def limited_execute(executor: ResilientToolExecutor, tool_name: str, args: dict, user_id: str) -> tuple[bool, dict]:
"""带限流的工具执行包装"""
limiter = ToolCallRateLimiter()
if not await limiter.acquire(tool_name, user_id):
return False, {
"error_type": "rate_limited",
"message": "当前请求频率过高,请稍后再试"
}
return await executor.execute_with_retry(tool_name, args)
3. 结构化日志追踪
Tool Calling 的调试比普通 API 难得多。一次用户请求可能包含:1 次 LLM 调用 → 3 个并行 tool call → 每个 tool call 可能多次重试 → 结果汇总后再 1 次 LLM 调用。没有结构化日志根本没法排查问题。
import json
import uuid
import time
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
@dataclass
class ToolCallSpan:
"""一次工具调用的完整追踪跨度"""
session_id: str
turn_id: int
call_id: str
tool_name: str
arguments: dict
status: str = "pending" # pending | running | success | failed | rate_limited
retry_count: int = 0
duration_ms: Optional[float] = None
result_preview: Optional[str] = None
error: Optional[str] = None
token_usage: Optional[dict] = None
def to_log(self) -> str:
return json.dumps({
"type": "tool_call",
"session_id": self.session_id,
"turn_id": self.turn_id,
"call_id": self.call_id,
"tool_name": self.tool_name,
"status": self.status,
"retry": self.retry_count,
"duration_ms": self.duration_ms,
"error": self.error,
}, ensure_ascii=False)
class ToolCallTracer:
"""工具调用追踪器,输出结构化的 JSON 日志"""
def __init__(self):
self._spans: dict[str, ToolCallSpan] = {}
self._session_spans: dict[str, list[str]] = {}
def start_call(self, session_id: str, turn_id: int, call_id: str,
tool_name: str, arguments: dict) -> ToolCallSpan:
span = ToolCallSpan(
session_id=session_id,
turn_id=turn_id,
call_id=call_id,
tool_name=tool_name,
arguments=arguments
)
self._spans[call_id] = span
if session_id not in self._session_spans:
self._session_spans[session_id] = []
self._session_spans[session_id].append(call_id)
# 打印结构化日志行,便于 ELK 等收集
print(span.to_log())
return span
def end_call(self, call_id: str, status: str, result: Any = None,
error: Optional[str] = None, retry_count: int = 0,
duration_ms: Optional[float] = None):
span = self._spans.get(call_id)
if not span:
return
span.status = status
span.retry_count = retry_count
span.duration_ms = duration_ms
if result:
span.result_preview = str(result)[:200]
span.error = error
print(span.to_log())
def get_session_trace(self, session_id: str) -> list[dict]:
"""获取一次会话的完整调用链"""
call_ids = self._session_spans.get(session_id, [])
return [asdict(self._spans[cid]) for cid in call_ids if cid in self._spans]
def get_summary(self, session_id: str) -> dict:
"""生成会话摘要"""
spans = self.get_session_trace(session_id)
total = len(spans)
successes = sum(1 for s in spans if s["status"] == "success")
failures = sum(1 for s in spans if s["status"] == "failed")
retries = sum(s.get("retry_count", 0) for s in spans)
avg_duration = sum(s.get("duration_ms", 0) or 0 for s in spans) / max(total, 1)
return {
"session_id": session_id,
"total_calls": total,
"success_rate": f"{successes/max(total,1)*100:.1f}%",
"total_retries": retries,
"avg_duration_ms": round(avg_duration, 1),
"tool_breakdown": {
tool: sum(1 for s in spans if s["tool_name"] == tool)
for tool in set(s["tool_name"] for s in spans)
}
}
# 使用示例
tracer = ToolCallTracer()
async def traced_execute(session_id: str, turn_id: int, executor,
tool_name: str, args: dict) -> tuple[bool, dict]:
call_id = f"{session_id}-{turn_id}-{uuid.uuid4().hex[:6]}"
start = time.monotonic()
tracer.start_call(session_id, turn_id, call_id, tool_name, args)
success, result = await executor.execute_with_retry(tool_name, args)
duration_ms = (time.monotonic() - start) * 1000
status = "success" if success else "failed"
error = result.get("message") if not success else None
tracer.end_call(call_id, status, result, error, duration_ms=duration_ms)
return success, result
有了这个追踪体系后,我们每个 tool call 都会输出如下格式的 JSON 行:
{"type":"tool_call","session_id":"sess_abc123","turn_id":1,"call_id":"sess_abc123-1-a1b2c3","tool_name":"search_orders","status":"success","retry":0,"duration_ms":235,"error":null}
直接用 jq 就能做数据分析:
# 查看失败率最高的工具
cat tool_traces.jsonl | jq 'select(.status == "failed") | .tool_name' | sort | uniq -c | sort -rn
# 查看平均执行时间
cat tool_traces.jsonl | jq 'select(.status == "success") | .duration_ms' | awk '{s+=$1;c++} END {print s/c}'
五、评估体系:如何量化 Tool Calling 成功率
没有度量就没有改进。我们建立了一套完整的评估体系来量化工具调用的效果。
评估指标
from dataclasses import dataclass, field
from typing import Optional
from collections import Counter
@dataclass
class TurnEvaluation:
"""单轮对话的 tool calling 评估"""
query: str
expected_tool: str
expected_params: dict # 关键参数期望值
actual_tool: Optional[str] = None
actual_params: Optional[dict] = None
tool_success: bool = False # 工具执行是否成功
param_correctness: float = 0.0 # 参数正确率 0-1
final_answer_quality: int = 0 # 人工评分 1-5
def evaluate_tool_call_accuracy(dataset_path: str) -> dict:
"""工具调用准确率评估"""
import json
with open(dataset_path) as f:
cases = json.load(f)
results = {
"total": len(cases),
"tool_match": 0, # 工具选择正确
"param_exact": 0, # 参数完全正确
"param_partial": 0, # 参数部分正确
"execution_success": 0, # 执行成功
}
parameter_errors = Counter()
for case in cases:
query = case["query"]
expected = case["expected"]
# 模拟调用
actual = simulate_tool_call(query)
# 1. 工具选择是否正确
if actual["tool_name"] == expected["tool"]:
results["tool_match"] += 1
else:
continue # 工具都选错了,后面的不用测了
# 2. 参数正确率
expected_params = expected.get("params", {})
actual_params = actual.get("params", {})
if not expected_params:
results["param_exact"] += 1
results["execution_success"] += 1
continue
param_score = 0
total_params = len(expected_params)
for key, expected_val in expected_params.items():
actual_val = actual_params.get(key)
if actual_val == expected_val:
param_score += 1
else:
parameter_errors[f"{key}: {expected_val} → {actual_val}"] += 1
if param_score == total_params:
results["param_exact"] += 1
elif param_score > 0:
results["param_partial"] += 1
# 3. 模拟执行(只用参数正确的)
if actual_params:
results["execution_success"] += 1
return {
"total": results["total"],
"tool_selection_rate": f"{results['tool_match'] / results['total']:.1%}",
"param_exact_rate": f"{results['param_exact'] / results['total']:.1%}",
"param_partial_rate": f"{results['param_partial'] / results['total']:.1%}",
"execution_success_rate": f"{results['execution_success'] / results['total']:.1%}",
"top_parameter_errors": parameter_errors.most_common(10),
}
# 生成报告
def generate_report(metrics: dict, tracer_summary: dict) -> str:
"""生成可读的评估报告"""
lines = [
"## Tool Calling 质量报告\n",
f"生成时间: {datetime.now().isoformat()}",
"",
"### 核心指标",
f"- 工具选择准确率: {metrics['tool_selection_rate']}",
f"- 参数完全正确率: {metrics['param_exact_rate']}",
f"- 执行成功率: {metrics['execution_success_rate']}",
f"- 平均调用耗时: {tracer_summary.get('avg_duration_ms', 'N/A')}ms",
f"- 总重试次数: {tracer_summary.get('total_retries', 'N/A')}",
"",
"### 各工具调用频次",
]
for tool, count in tracer_summary.get("tool_breakdown", {}).items():
lines.append(f"- {tool}: {count} 次")
lines.extend([
"",
"### Top 参数错误",
])
for err, count in metrics.get("top_parameter_errors", []):
lines.append(f"- {err} (出现 {count} 次)")
return "\n".join(lines)
我们的实测数据
在我们的生产环境(120+ 工具,日均 50 万次调用),经过上述改造后,各项指标变化如下:
| 指标 | 改造前 | 改造后 |
|---|---|---|
| 工具选择准确率 | 72% | 97% |
| 参数完全正确率 | 51% | 89% |
| 执行成功率(含重试) | 78% | 99.3% |
| 平均调用耗时 | 1.2s | 0.8s |
| 用户满意度(NPS) | 32 | 68 |
特别说明:99.3% 的执行成功率意味着我们仍然有 0.7% 的失败率。对一个日均 50 万次调用的系统来说,这就是每天 3500 次失败。这 3500 次失败大部分来自下游服务的真实故障(第三方 API 不稳定、数据库超时等),而不是工程框架的问题。我们接受这个现实,但通过优雅降级(友好提示 + 转接人工)确保用户体验不受影响。
六、总结
回到开篇那个凌晨三点的事故。在完成了上述所有改造后,同样的场景再次出现——某上游 API 返回了 10MB 的数据。这一次,压缩器成功将结果缩小到 1200 tokens,限流器阻止了雪崩,重试器在两次超时后返回了
友好的降级提示。整个系统安然无恙。
我凌晨三点被叫醒的故事再也没有发生过。
核心经验总结
如果你也在构建 LLM Agent 的 Tool Calling 系统,这是我的核心建议:
- 工具描述是工程问题,不是文档问题:花时间写好每条描述,包含业务上下文、示例和边界条件。让产品经理参与描述编写。
- 无状态是并发安全的基础:每个 tool call 拥有独立的上下文对象,使用
asyncio.Semaphore控制并发度。 - 永远不要信任工具返回的数据量:所有返回结果都要经过压缩/截断,设置硬性上限。
- 分级重试,优雅降级:把 HTTP 500 和 HTTP 429 分别处理,指数退避+抖动。最终失败要有友好的用户提示。
- 三层限流:全局、工具级、用户级,任何一层都不能少。
- 没有日志等于没有系统:结构化 JSON 日志行,让 ELK 或任何日志平台可以轻松分析。
- 量化评估:没有指标就没有改进。建立持续评估 pipeline,追踪工具选择率、参数正确率、执行成功率。
最后一点感悟
很多团队把 LLM Agent 看作一个"黑盒魔法盒子"——扔进去一个 Prompt,就能自动完成任务。但真实的生产环境告诉我们,Agent 系统的可靠性不是模型给出的,而是工程构建的。
Tool Calling 是 Agent 与外部世界交互的桥梁。这座桥梁的设计质量,直接决定了整个系统的可用性。那些看似琐碎的工程细节——工具描述怎么写、超时怎么设、错误怎么处理——恰恰是区分 Demo 级产品和生产级系统的关键。
如果这篇文章能帮你避免哪怕一次凌晨三点的告警电话,那就值了。
附录:文中涉及的完整代码
所有代码示例可以在以下 GitHub Gist 中找到:https://gist.github.com/your-username/tool-calling-engineering
文中代码已经过简化以便阅读,生产环境的版本增加了更完善的类型检查、重试持久化和监控集成,欢迎 fork 和 PR。
更多推荐


所有评论(0)