OpenClaw七文件架构:智能体工程化落地的结构化脚手架
1. 项目概述:OpenClaw 不是玩具,是智能体开发的“结构化脚手架”
OpenClaw 这个名字刚出来的时候,我第一反应是——又一个套壳 LLM 工具?但真正把代码 clone 下来、跑通第一个 demo、再逐行读完那七个核心文件后,我立刻改了主意。它根本不是什么“低代码平台”或“可视化编排器”,而是一套 面向工程落地的智能体结构规范 ,用 Python 写成,但思想内核更接近操作系统内核设计:明确划分职责边界、定义清晰的数据契约、强制约束执行时序。所谓“7 大核心文件”,不是随便凑数的模块列表,而是 OpenClaw 架构的七根承重柱——缺一根,整个智能体就可能在真实业务流中塌方。我带过三支小团队落地过客服助手、数据清洗 Agent 和内部知识检索 Bot,凡是跳过这七份文件直接写 prompt 的,无一例外在第二周陷入“响应忽快忽慢、上下文莫名丢失、错误日志满屏飘红”的泥潭。它解决的不是“能不能调 API”,而是“怎么让智能体像服务一样稳定在线、可监控、可回滚、可灰度”。适合谁?不是纯 Prompt 工程师,也不是只写 backend 的 Python 后端,而是 需要把大模型能力真正嵌入业务流水线的复合型开发者 ——你得懂点模型推理的延迟敏感性,也得明白微服务里 context propagation 怎么传,还得能看懂 YAML 配置里的 timeout 字段到底影响哪一层。关键词里那个“新手必看”,不是说它简单,而是强调:哪怕你昨天才第一次听说 RAG,只要从这七份文件的命名、目录位置、import 关系开始读起,就能摸到智能体工程化的门把手。
2. 整体设计思路拆解:为什么是这七个文件?它们如何构成闭环?
OpenClaw 的目录结构干净得近乎苛刻,没有 src/ 或 lib/ 这类模糊分层,主干就是这七个 Python 文件,外加一个 config.yaml。这种极简不是偷懒,而是刻意为之的 责任锚定 。我把它比作一辆自行车:Chain.py 是车架(承载所有部件),Executor.py 是脚踏板(动力输入源),State.py 是齿轮组(状态传递与转换),Tool.py 是刹车和变速器(外部能力接入点),Memory.py 是车筐(临时信息暂存),Policy.py 是骑行路线图(决策逻辑中枢),而 Logger.py 则是车灯和反光条(可观测性保障)。少任何一个,车都能勉强动,但上路就危险。
为什么不是六个或八个?关键在“不可再分性”。比如有人问:Memory 和 State 有重叠,能不能合并?实测不行。State 存的是本次请求的瞬时上下文(如用户当前提问、上一轮 bot 回复),生命周期仅限单次调用;而 Memory 存的是跨会话的长期记忆(如用户偏好、历史订单 ID),需要持久化到 Redis 或向量库。强行合并会导致单次请求内存泄漏,或者跨会话记忆被意外覆盖。再比如 Policy.py 看似只是 if-else,但它必须独立存在,因为线上要支持热更新策略——运维人员改完 policy.yaml,系统 reload 时只重载 Policy 模块,Chain 和 Executor 完全不动。如果策略逻辑散落在 Executor 里,每次改规则就得重启整个服务,业务方根本无法接受。
这种设计背后是三个硬约束:
- 可观测性优先 :每个文件的职责必须能单独打日志、埋监控点。Logger.py 不是简单封装 logging,它内置了 trace_id 注入、耗时统计钩子、异常分类标签(如 tool_timeout、policy_fallback),所有其他文件都通过它输出结构化日志,而不是 print。
- 故障隔离 :当 Tool.py 调用某个外部 API 超时,Executor.py 必须能捕获并降级,不能让超时蔓延到 Policy.py 导致整个决策链卡死。七个文件的 import 关系是单向的:Chain → Executor → State/Policy → Tool/Memory,绝不存在 Policy import Executor 这种循环依赖。
- 配置驱动 :config.yaml 里每个字段都精准映射到某一个文件的初始化参数。比如 timeout: 30s 只作用于 Tool.py 的 requests.Session,而 max_history_length: 5 只控制 Memory.py 的 deque 长度。没有全局配置变量,避免“改一个参数,五个模块行为突变”的灾难。
我见过最典型的反模式,是把所有逻辑塞进一个 main.py:prompt 拼接、API 调用、结果解析、缓存读写全在里面。调试时 log 分不清是 prompt 出错还是 API 返回异常;上线后想给工具调用加熔断,得重写三分之一代码。OpenClaw 的七文件结构,本质上是在用代码文件名代替注释,用 import 规则代替架构图,让新人第一天入职就能指着文件说:“哦,这里管记忆,那里管决策”。
3. 七大核心文件深度拆解:命名、职责、关键代码段与避坑指南
3.1 Chain.py:智能体的“主控板”,不是调度器而是流程编排器
Chain.py 是 OpenClaw 的入口文件,但它的角色常被误解。很多人以为它是类似 LangChain 的 Runnable,负责串起所有步骤。错。它的核心职责是 定义执行拓扑与错误传播路径 。打开源码,你会发现它没有 run() 方法,只有 build() 和 execute() 两个函数。build() 接收 config.yaml 解析后的字典,实例化 Executor、Policy、Memory 等对象,并用字典注册它们;execute() 则接收原始用户输入,启动一个严格按顺序执行的 pipeline:Input → State 初始化 → Policy 决策 → Tool 调用 → Memory 更新 → Output 格式化。
关键代码段在 execute() 的 try-except 块:
def execute(self, user_input: str) -> Dict[str, Any]:
state = self.state_cls(user_input)
try:
decision = self.policy.decide(state) # 进入 Policy.py
if decision.action == "tool_call":
result = self.tool.execute(decision.tool_name, decision.tool_args)
state.update_with_tool_result(result) # 更新 State
else:
state.set_response(decision.response)
except ToolTimeoutError as e:
state.set_response("服务暂时繁忙,请稍后再试") # 降级响应
self.logger.warn(f"Tool timeout: {e.tool_name}", extra={"trace_id": state.trace_id})
finally:
self.memory.save(state) # 无论成功失败,都保存状态快照
return state.to_dict()
这里藏着三个必须注意的细节:
- State 更新时机 :不是在 Tool 执行完立刻更新,而是在 Policy 决策后、Tool 执行前,先用 decision.action 和 decision.tool_name 预填充 state。这样即使 Tool 超时,state 里仍有完整的决策记录,方便后续审计。
- 降级不等于静默 :ToolTimeoutError 被捕获后,不仅返回友好提示,还通过 logger.warn 记录带 trace_id 的警告,且 extra 字段里明确标注了超时的 tool_name。这意味着运维可以在 Grafana 里直接查“最近1小时 tool_xxx 超时次数”,而不是翻日志大海捞针。
- Memory.save() 在 finally 中 :这是反直觉的设计。多数人觉得失败就不该存状态,但 OpenClaw 认为:失败本身也是状态的一部分。比如用户连续三次问“我的订单在哪”,第三次触发风控策略,这个“三次失败尝试”的状态必须存下来,否则下次用户换设备登录,风控就失效了。
提示:新手最容易犯的错,是修改 Chain.py 里的 execute() 流程顺序。比如想“先查缓存再走 Policy”,于是把 self.memory.load() 插到 decision = self.policy.decide(state) 前面。这会导致 Policy 决策时看到的 state 是旧的(缓存里的),而实际执行时 state 已被更新,造成决策与执行脱节。正确做法是让 Policy 自己决定是否查缓存——在 Policy.py 的 decide() 方法里调用 self.memory.load()。
3.2 Executor.py:真正的“执行引擎”,专注 IO 与资源管理
Executor.py 是七个文件里最薄的一个,通常不到 200 行,但它是性能瓶颈的集中爆发区。它的唯一使命是: 安全、可控、可观测地执行外部操作 。这里的“外部操作”包括三类:调用 HTTP API(Tool)、查询向量库(RAG)、执行本地 Python 函数(Custom Function)。它不关心决策逻辑,也不处理状态,只做三件事:建立连接池、设置超时、包装异常。
核心设计是 connection_pool 参数。config.yaml 里有一段:
executor:
http_pool_size: 10
vector_db_pool_size: 3
timeout: 8.0
Executor.py 初始化时会根据这些值创建对应的连接池。重点在 vector_db_pool_size: 3 —— 这不是随便写的数字。我们实测过:当并发请求超过 3 个时,向量库查询延迟从 200ms 暴涨到 1.2s,因为底层 Milvus 实例的 CPU 已达 95%。所以这个参数必须和你的向量库硬件规格强绑定,不能照搬文档。
另一个易忽略的点是异常包装。Executor.py 里所有 execute() 方法都返回 Result 对象,而非原始响应:
class Result:
def __init__(self, success: bool, data: Any = None, error: str = None):
self.success = success
self.data = data
self.error = error # 错误信息是字符串,不是 Exception 对象
为什么不用 raise Exception?因为 Chain.py 的 execute() 需要统一处理降级逻辑。如果 Executor 直接抛出 requests.Timeout,Chain 就得写一堆 except requests.Timeout、except MilvusException,耦合太重。而 Result 对象让 Chain 只需判断 if not result.success: 即可,错误类型由 error 字符串标识(如 "http_timeout"、"vector_db_unavailable"),Policy.py 甚至可以根据 error 类型动态切换 fallback 策略。
注意:不要在 Executor.py 里做任何数据清洗或格式转换。曾有个团队在 execute() 里把 API 返回的 JSON 自动转成 pandas.DataFrame,结果下游 Tool.py 的其他方法拿到的是 DataFrame 而非 dict,导致类型错误。Executor 只负责“拿回来”,不负责“改回来”。
3.3 State.py:智能体的“工作台”,状态即契约
State.py 定义了智能体运行时的唯一数据载体——State 类。它不是简单的 dataclass,而是一个 带行为约束的数据契约 。打开它的 init 方法,你会看到:
def __init__(self, user_input: str, trace_id: str = None):
self.user_input = user_input.strip() # 强制去首尾空格
self.trace_id = trace_id or str(uuid4())
self.history = [] # 严格限制为 list[Dict]
self.context = {} # 严格限制为 dict
self.response = ""
self.tool_calls = []
所有字段都有类型注解和初始化约束。这不是为了 IDE 提示,而是为了 序列化安全 。OpenClaw 默认用 pickle 序列化 State 存入 Redis,如果 history 字段被意外赋值为 tuple 或 numpy.array,pickle 会报错,整个请求就失败了。所以 State 类的 setattr 方法被重写,对 history 字段做了类型检查:
def __setattr__(self, name, value):
if name == "history" and not isinstance(value, list):
raise TypeError(f"history must be list, got {type(value).__name__}")
super().__setattr__(name, value)
State.py 的另一个关键是 to_dict() 和 from_dict() 方法。它们不是简单地 vars(self),而是做了字段过滤:to_dict() 只序列化业务相关字段(user_input, response, history),而隐藏了 trace_id、logger 等运行时字段;from_dict() 则在反序列化时校验字段完整性,缺失 user_input 就抛 ValidationError。这意味着,如果你用 curl 直接往 Redis 里塞一个伪造的 State JSON,OpenClaw 启动时会拒绝加载,而不是静默失败。
实操心得:State 的 history 字段存储格式有讲究。我们规定每条 history 必须是 {"role": "user|assistant", "content": "xxx", "timestamp": 171xxxxx}。很多团队初期用字符串拼接("user: xxx\nassistant: yyy"),结果 Policy.py 里写正则提取 role 时,遇到用户输入里含 "\nassistant:" 就解析错。用结构化 dict 是唯一可靠方案。
3.4 Tool.py:能力的“插槽”,不是工具箱而是协议网关
Tool.py 是 OpenClaw 最体现“工程思维”的文件。它不实现具体工具,而是定义了一个 标准化的工具接入协议 。所有外部能力(天气 API、数据库查询、Excel 生成)都必须继承 ToolBase 类,并实现两个抽象方法:
class ToolBase(ABC):
@abstractmethod
def schema(self) -> Dict[str, Any]: # 描述工具能力的 JSON Schema
pass
@abstractmethod
def execute(self, *args, **kwargs) -> Any: # 执行逻辑
pass
schema() 方法返回的 JSON Schema,会被 Policy.py 用来做 runtime 参数校验。比如天气工具的 schema:
{
"name": "get_weather",
"description": "获取指定城市的实时天气",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称,必须是中文"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}
}
Policy.py 在 decide() 时,如果生成了 {"action": "tool_call", "tool_name": "get_weather", "tool_args": {"city": "北京"}},就会用这个 schema 校验 tool_args 是否合法。缺少 city 报错,unit 值不是枚举值也报错。这比在 execute() 里写 if unit not in ["celsius", "fahrenheit"] 更早拦截问题。
Tool.py 的 init() 方法还做了连接复用。以数据库工具为例:
def __init__(self, db_config: Dict[str, Any]):
self.engine = create_engine(
f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['db']}",
pool_size=5,
max_overflow=10,
pool_pre_ping=True # 关键!每次取连接前先 ping
)
pool_pre_ping=True 是血泪教训。我们曾在线上遇到“数据库连接池里全是失效连接”的问题,现象是前几个请求正常,第 6 个开始全报 “connection closed”。启用 pre_ping 后,每次从池里取连接都会先发个 SELECT 1,失效连接自动丢弃,新连接重建,问题消失。
常见问题:新手常把敏感信息(如 API Key)硬编码在 Tool.py 里。正确做法是 config.yaml 中定义:
tools: weather_api: key: "${WEATHER_API_KEY}" # 从环境变量读取Tool.py 的 init 通过 os.getenv() 获取,启动容器时用 -e WEATHER_API_KEY=xxx 传入。永远不要在代码里写密钥。
3.5 Memory.py:记忆的“银行”,不是缓存而是状态保险库
Memory.py 的核心矛盾在于:它既要快(毫秒级响应),又要稳(不丢数据),还要准(不污染上下文)。OpenClaw 给出的解法是 分层记忆 + 异步落盘 。它定义了两个接口:ShortTermMemory 和 LongTermMemory。前者基于 Python deque,在内存中维护最近 N 轮对话(config.yaml 中 max_history_length 控制),后者则对接 Redis 或向量库。
关键设计在 save() 方法:
def save(self, state: State):
# 1. 先存短期记忆(内存)
self.short_term.append(state.to_dict())
# 2. 异步存长期记忆(Redis)
asyncio.create_task(self._save_to_redis(state))
# 3. 触发清理(如果超出长度限制)
if len(self.short_term) > self.max_history_length:
self.short_term.popleft()
这里用了 asyncio.create_task 而不是 await,是为了避免阻塞主线程。_save_to_redis() 是一个协程,它把 state 序列化后发往 Redis 的一个专用 channel,由后台消费者进程处理落盘。这样即使 Redis 瞬间不可用,短期记忆仍在,用户不会感知中断。
但异步带来新问题:数据一致性。我们实测发现,当用户快速连续发送 5 条消息,save() 被调用 5 次,但 _save_to_redis() 的执行顺序可能乱序,导致 Redis 里存储的历史顺序错乱。解决方案是在 state.to_dict() 里加入 sequence_id 字段,由 Chain.py 在 execute() 开始时递增生成,_save_to_redis() 存入 Redis 时带上这个 ID,后台消费者按 ID 排序后再写入最终存储。
注意:Memory.py 的 load() 方法绝不应该在 Policy.py 的 decide() 中同步调用。曾有个团队在 Policy 里写
history = self.memory.load(user_id),结果每次决策都要等 Redis RTT(平均 15ms),TPS 直接腰斩。正确做法是 Chain.py 在 execute() 开头就调用 load(),把 history 注入 state,Policy 决策时直接用 state.history。
3.6 Policy.py:决策的“大脑”,不是规则引擎而是策略路由器
Policy.py 是七个文件里业务耦合度最高的,但 OpenClaw 通过 策略路由机制 把它解耦了。它的核心是 decide() 方法,但这个方法本身只做三件事:
- 从 state 中提取关键特征(user_input 长度、是否含数字、history 长度、上一轮 response 类型);
- 根据特征匹配预设的策略路由表(routing_table);
- 调用匹配到的具体策略类的 execute() 方法。
routing_table 长这样:
self.routing_table = [
{"condition": lambda s: len(s.user_input) < 5, "strategy": ShortQueryStrategy()},
{"condition": lambda s: "订单" in s.user_input and s.history, "strategy": OrderTrackingStrategy()},
{"condition": lambda s: True, "strategy": DefaultStrategy()}, # default 必须在最后
]
每个策略类(如 OrderTrackingStrategy)都是独立的 Python 文件,放在 strategies/ 目录下。Policy.py 不包含任何业务逻辑,只负责“派单”。这样做的好处是:运营人员可以随时增删策略路由,而不用动 Policy.py 主干代码;A/B 测试时,可以把 50% 流量路由到新策略,50% 走老策略,只需改 routing_table 的权重字段。
decide() 的返回值是 Decision 对象:
class Decision:
def __init__(self, action: str, response: str = None, tool_name: str = None, tool_args: Dict = None):
self.action = action # "respond" | "tool_call" | "fallback"
self.response = response
self.tool_name = tool_name
self.tool_args = tool_args
注意 action 字段只有三个值。这是硬性约定,目的是让 Chain.py 的 execute() 流程能穷举所有分支。如果允许自定义 action(如 "ask_clarify"),Chain 就得不断修改 if-elif 链,违背了“稳定主干”的设计哲学。
实操陷阱:新手常在策略类里写复杂 SQL 或调用外部 API。这是严重违反分层原则。OrderTrackingStrategy.execute() 只能做两件事:1)从 state.history 里提取订单号(正则匹配);2)返回 Decision(action="tool_call", tool_name="query_order_status", tool_args={"order_id": extracted_id})。真正的订单查询必须交给 Tool.py。否则策略类就变成了“业务逻辑+IO 混合体”,既难测试,又难监控。
3.7 Logger.py:系统的“神经末梢”,不是日志器而是可观测性中枢
Logger.py 是 OpenClaw 最被低估的文件。它看起来只是封装了 Python logging,但实际承担着 分布式追踪、指标采集、告警触发 三重职责。它的 get_logger() 方法返回的 logger 对象,重写了 handle() 方法:
def handle(self, record):
# 1. 注入 trace_id(如果 record 没有,则从 state 或上下文提取)
if not hasattr(record, 'trace_id'):
record.trace_id = self._get_trace_id_from_context()
# 2. 计算耗时(如果 record 有 start_time 字段)
if hasattr(record, 'start_time') and hasattr(record, 'end_time'):
record.duration_ms = (record.end_time - record.start_time) * 1000
# 3. 发送到 Kafka(异步)
self.kafka_producer.send('openclaw-logs', value=record.__dict__)
# 4. 如果是 ERROR 级别,触发告警
if record.levelno >= logging.ERROR:
self.alert_manager.send_alert(record)
super().handle(record)
这意味着,你在任何文件里写 logger.info("tool called", extra={"tool_name": "weather"}) ,最终日志里会自动带上 trace_id、duration_ms、service_name 等字段,Kafka 消费者可以直接入库到 Elasticsearch,Grafana 用这些字段画 P95 延迟曲线。
Logger.py 还内置了采样机制。config.yaml 中:
logger:
sampling_rate: 0.01 # 1% 的 INFO 日志采样
error_sampling_rate: 1.0 # ERROR 日志 100% 记录
这解决了高并发下日志爆炸的问题。我们线上 QPS 200 时,INFO 日志量达 17GB/天,开启 1% 采样后降到 170MB/天,而 ERROR 日志一条不漏。
关键提醒:永远不要在 Logger.py 里写 file handler。所有日志必须走 Kafka 或 stdout(容器标准输出)。因为 OpenClaw 设计为云原生部署,日志收集由 Fluentd 或 Filebeat 统一处理。如果在代码里写 open("app.log", "a"),会导致容器磁盘爆满,且日志无法被中央系统采集。
4. 实操全流程:从零搭建一个订单查询智能体(附完整配置)
现在我们用一个真实场景——“用户问‘我的订单在哪’,智能体查数据库返回物流信息”——走一遍 OpenClaw 全流程。这不是 Demo,而是我们上周刚上线的生产版本简化版。
4.1 环境准备与依赖安装
我们用 Python 3.10,依赖管理用 pip-tools(不是 poetry,因为线上要求确定性版本):
# requirements.in
openclaw==0.8.2
psycopg2-binary==2.9.7
redis==4.6.0
kafka-python==2.0.2
# 生成 requirements.txt
pip-compile requirements.in
# 安装
pip install -r requirements.txt
注意 psycopg2-binary 版本必须锁定。我们吃过亏:升级到 2.9.8 后,连接 PostgreSQL 15 时偶发 segfault,回退到 2.9.7 稳定运行 3 个月。
4.2 config.yaml 全量配置详解
这是我们的生产 config.yaml,每一行都经过压测验证:
# 全局配置
service_name: "order-query-agent"
version: "1.2.0"
# 执行器配置
executor:
http_pool_size: 8 # 匹配 Nginx worker_connections
vector_db_pool_size: 0 # 本项目不用向量库,设为0
timeout: 5.0 # 工具调用总超时,单位秒
# 状态配置
state:
max_history_length: 3 # 只存最近3轮,平衡内存与上下文
# 记忆配置
memory:
short_term:
max_length: 3
long_term:
type: "redis" # 支持 redis / postgres / none
host: "redis-prod.internal"
port: 6379
db: 2
password: "${REDIS_PASSWORD}"
# 工具配置
tools:
order_db:
host: "pg-prod.internal"
port: 5432
database: "orders"
user: "readonly_user"
password: "${PG_PASSWORD}"
# 连接池参数,与 executor.http_pool_size 独立
pool_size: 5
max_overflow: 10
# 策略配置
policy:
routing_table:
- condition: |
lambda s: "订单" in s.user_input and ("在哪" in s.user_input or "物流" in s.user_input)
strategy: "OrderTrackingStrategy"
- condition: "lambda s: True"
strategy: "DefaultStrategy"
# 日志配置
logger:
level: "INFO"
sampling_rate: 0.005 # 0.5% INFO 采样
error_sampling_rate: 1.0
kafka:
bootstrap_servers: "kafka-prod.internal:9092"
topic: "openclaw-logs"
提示:所有密码字段都用 ${VAR_NAME} 占位,启动时用 envsubst 替换:
envsubst < config.yaml > config.prod.yaml python -m openclaw.chain --config config.prod.yaml
4.3 编写自定义 Tool:order_db.py
在 tools/ 目录下新建 order_db.py:
from openclaw.tool import ToolBase
from sqlalchemy import create_engine, text
from typing import Dict, Any
class OrderDBTool(ToolBase):
def __init__(self, config: Dict[str, Any]):
self.engine = create_engine(
f"postgresql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}",
pool_size=config.get("pool_size", 5),
max_overflow=config.get("max_overflow", 10),
pool_pre_ping=True,
echo=False # 生产关闭 SQL 输出
)
def schema(self) -> Dict[str, Any]:
return {
"name": "query_order_status",
"description": "查询指定订单号的最新物流状态",
"parameters": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "16位数字订单号"}
},
"required": ["order_id"]
}
}
def execute(self, order_id: str) -> Dict[str, Any]:
# 1. 校验订单号格式
if not order_id.isdigit() or len(order_id) != 16:
return {"error": "订单号格式错误,应为16位数字"}
# 2. 查询数据库(带超时)
try:
with self.engine.connect().execution_options(timeout=3.0) as conn:
stmt = text("SELECT status, logistics_no, updated_at FROM orders WHERE order_id = :oid")
result = conn.execute(stmt, {"oid": order_id}).fetchone()
if not result:
return {"error": "未找到该订单"}
return {
"status": result.status,
"logistics_no": result.logistics_no,
"updated_at": result.updated_at.isoformat()
}
except Exception as e:
return {"error": f"数据库查询失败: {str(e)}"}
注意两点:
- execute() 里用了
execution_options(timeout=3.0),这是 SQLAlchemy 的语句级超时,比连接池超时更细粒度; - 返回值是 dict,不是 ORM 对象,确保序列化安全。
4.4 编写自定义 Policy:strategies/order_tracking.py
在 strategies/ 目录下:
import re
from openclaw.policy import StrategyBase
from openclaw.state import State
class OrderTrackingStrategy(StrategyBase):
def execute(self, state: State) -> Decision:
# 1. 从用户输入提取订单号(支持多种格式)
user_input = state.user_input
# 匹配 16 位数字,或 "订单号:1234567890123456"
match = re.search(r'(\d{16})|订单号[::\s]*(\d{16})', user_input)
if not match:
# 从历史记录里找(用户可能之前提过)
for msg in reversed(state.history[-2:]):
if msg.get("role") == "user":
sub_match = re.search(r'(\d{16})', msg.get("content", ""))
if sub_match:
order_id = sub_match.group(1)
break
else:
return Decision(action="respond", response="请提供16位订单号,例如:1234567890123456")
else:
order_id = match.group(1) or match.group(2)
# 2. 返回工具调用指令
return Decision(
action="tool_call",
tool_name="query_order_status",
tool_args={"order_id": order_id}
)
这里体现了 Policy 的核心价值: 上下文感知的意图补全 。用户只说“我的订单”,Policy 自动从 history 里找上次提到的订单号,而不是傻等用户再输一遍。
4.5 启动与验证
启动命令:
python -m openclaw.chain --config config.prod.yaml
验证用 curl:
curl -X POST http://localhost:8000/execute \
-H "Content-Type: application/json" \
-d '{"user_input": "我的订单在哪"}'
预期响应:
{
"response": "您的订单 1234567890123456 当前状态:已发货,物流单号 SF123456789CN,最后更新时间:2024-05-20T14:22:33",
"trace_id": "a1b2c3d4-e5f6-7890-g1h2-i3j4k5l6m7n8"
}
同时,Kafka 的 openclaw-logs topic 里会有一条结构化日志,包含 trace_id、duration_ms、tool_name、status_code 等字段,可直接接入监控大盘。
5. 常见问题与排查技巧实录:那些文档里不会写的坑
5.1 问题速查表:高频故障与定位路径
| 现象 | 可能原因 | 定位命令/日志关键词 | 解决方案 |
|---|---|---|---|
| 请求卡住,CPU 100%,无响应 | Executor 连接池耗尽,线程死锁 | kubectl top pod 查 CPU; grep "acquiring connection" logs |
增大 executor.http_pool_size ;检查 Tool.py 是否漏关连接 |
| 返回“服务暂时繁忙”,但工具 API 正常 | Policy.py 的 decide() 抛出未捕获异常 | grep "ERROR.*Policy" logs ;检查 routing_table condition lambda 语法 |
用 ast.parse() 预检 condition 字符串;加 try-catch 包裹 execute() |
| Redis 里 memory 数据为空 | Memory.py 的 _save_to_redis() 任务被 event loop 拒绝 | grep "Task was destroyed but it is pending" logs |
降低并发量;检查 asyncio.run() 是否被多次调用 |
| 同一 trace_id 出现多条日志,内容不一致 | 多个线程/协程共享同一个 State 实例 | grep "trace_id.*duplicate" logs ;检查 Chain.py 是否复用 state |
State 必须每次 execute() 新建,禁止全局变量 |
| 工具调用返回 {"error": "connection closed"} | PostgreSQL 连接被服务端主动断开(idle_in_transaction_timeout) | SELECT * FROM pg_stat_activity WHERE state = 'idle in transaction'; |
在 DB 配置里加 idle_in_transaction_session_timeout = 0 |
5.2 独家避坑技巧:来自三次线上事故的总结
技巧一:用 py-spy record 抓取实时火焰图,而不是猜哪里卡住
当出现“请求慢但日志没报错”时,90% 是 IO 等待。我们不再看日志,而是直接:
py-spy record -p $(pgrep -f "openclaw.chain") -o profile.svg --duration 30
生成的火焰图里,如果 sqlalchemy.engine.base.Connection._exec_driver_sql 占比超高,说明数据库查询慢;如果 redis.connection.Connection.read_response 高,说明 Redis 延迟大。这比翻 1000 行日志快 10 倍。
技巧二:在 CI 流程里加入 schema 校验,而不是等上线后报错
我们写了一个 pre-commit hook,每次提交前自动检查所有 Tool.schema() 返回的 JSON Schema 是否符合 OpenClaw 的元 schema:
# validate_schema.py
import jsonschema
from openclaw.tool import ToolBase
def validate_tool_schemas():
for tool_class in get_all_tool_classes():
schema = tool_class().schema()
# 加载 OpenClaw 定义的元 schema
with open("tool_schema.json") as f:
meta_schema = json.load(f)
jsonschema.validate(instance=schema, schema=meta_schema)
这样,如果有人在 schema 里漏写 required 字段,CI 直接 fail,不许提交。
技巧三:用 pytest --tb=short 跑单元测试,但必须 mock 所有外部依赖
Policy.py 的单元测试,我们从不连真实 Redis 或 DB:
def test_order_tracking_strategy_with_history(mocker):
# mock更多推荐

所有评论(0)