1. 项目概述:OpenClaw 不是玩具,是智能体开发的“结构化脚手架”

OpenClaw 这个名字刚出来的时候,我第一反应是——又一个套壳 LLM 工具?但真正把代码 clone 下来、跑通第一个 demo、再逐行读完那七个核心文件后,我立刻改了主意。它根本不是什么“低代码平台”或“可视化编排器”,而是一套 面向工程落地的智能体结构规范 ,用 Python 写成,但思想内核更接近操作系统内核设计:明确划分职责边界、定义清晰的数据契约、强制约束执行时序。所谓“7 大核心文件”,不是随便凑数的模块列表,而是 OpenClaw 架构的七根承重柱——缺一根,整个智能体就可能在真实业务流中塌方。我带过三支小团队落地过客服助手、数据清洗 Agent 和内部知识检索 Bot,凡是跳过这七份文件直接写 prompt 的,无一例外在第二周陷入“响应忽快忽慢、上下文莫名丢失、错误日志满屏飘红”的泥潭。它解决的不是“能不能调 API”,而是“怎么让智能体像服务一样稳定在线、可监控、可回滚、可灰度”。适合谁?不是纯 Prompt 工程师,也不是只写 backend 的 Python 后端,而是 需要把大模型能力真正嵌入业务流水线的复合型开发者 ——你得懂点模型推理的延迟敏感性,也得明白微服务里 context propagation 怎么传,还得能看懂 YAML 配置里的 timeout 字段到底影响哪一层。关键词里那个“新手必看”,不是说它简单,而是强调:哪怕你昨天才第一次听说 RAG,只要从这七份文件的命名、目录位置、import 关系开始读起,就能摸到智能体工程化的门把手。

2. 整体设计思路拆解:为什么是这七个文件?它们如何构成闭环?

OpenClaw 的目录结构干净得近乎苛刻,没有 src/ 或 lib/ 这类模糊分层,主干就是这七个 Python 文件,外加一个 config.yaml。这种极简不是偷懒,而是刻意为之的 责任锚定 。我把它比作一辆自行车:Chain.py 是车架(承载所有部件),Executor.py 是脚踏板(动力输入源),State.py 是齿轮组(状态传递与转换),Tool.py 是刹车和变速器(外部能力接入点),Memory.py 是车筐(临时信息暂存),Policy.py 是骑行路线图(决策逻辑中枢),而 Logger.py 则是车灯和反光条(可观测性保障)。少任何一个,车都能勉强动,但上路就危险。

为什么不是六个或八个?关键在“不可再分性”。比如有人问:Memory 和 State 有重叠,能不能合并?实测不行。State 存的是本次请求的瞬时上下文(如用户当前提问、上一轮 bot 回复),生命周期仅限单次调用;而 Memory 存的是跨会话的长期记忆(如用户偏好、历史订单 ID),需要持久化到 Redis 或向量库。强行合并会导致单次请求内存泄漏,或者跨会话记忆被意外覆盖。再比如 Policy.py 看似只是 if-else,但它必须独立存在,因为线上要支持热更新策略——运维人员改完 policy.yaml,系统 reload 时只重载 Policy 模块,Chain 和 Executor 完全不动。如果策略逻辑散落在 Executor 里,每次改规则就得重启整个服务,业务方根本无法接受。

这种设计背后是三个硬约束:

  1. 可观测性优先 :每个文件的职责必须能单独打日志、埋监控点。Logger.py 不是简单封装 logging,它内置了 trace_id 注入、耗时统计钩子、异常分类标签(如 tool_timeout、policy_fallback),所有其他文件都通过它输出结构化日志,而不是 print。
  2. 故障隔离 :当 Tool.py 调用某个外部 API 超时,Executor.py 必须能捕获并降级,不能让超时蔓延到 Policy.py 导致整个决策链卡死。七个文件的 import 关系是单向的:Chain → Executor → State/Policy → Tool/Memory,绝不存在 Policy import Executor 这种循环依赖。
  3. 配置驱动 :config.yaml 里每个字段都精准映射到某一个文件的初始化参数。比如 timeout: 30s 只作用于 Tool.py 的 requests.Session,而 max_history_length: 5 只控制 Memory.py 的 deque 长度。没有全局配置变量,避免“改一个参数,五个模块行为突变”的灾难。

我见过最典型的反模式,是把所有逻辑塞进一个 main.py:prompt 拼接、API 调用、结果解析、缓存读写全在里面。调试时 log 分不清是 prompt 出错还是 API 返回异常;上线后想给工具调用加熔断,得重写三分之一代码。OpenClaw 的七文件结构,本质上是在用代码文件名代替注释,用 import 规则代替架构图,让新人第一天入职就能指着文件说:“哦,这里管记忆,那里管决策”。

3. 七大核心文件深度拆解:命名、职责、关键代码段与避坑指南

3.1 Chain.py:智能体的“主控板”,不是调度器而是流程编排器

Chain.py 是 OpenClaw 的入口文件,但它的角色常被误解。很多人以为它是类似 LangChain 的 Runnable,负责串起所有步骤。错。它的核心职责是 定义执行拓扑与错误传播路径 。打开源码,你会发现它没有 run() 方法,只有 build() 和 execute() 两个函数。build() 接收 config.yaml 解析后的字典,实例化 Executor、Policy、Memory 等对象,并用字典注册它们;execute() 则接收原始用户输入,启动一个严格按顺序执行的 pipeline:Input → State 初始化 → Policy 决策 → Tool 调用 → Memory 更新 → Output 格式化。

关键代码段在 execute() 的 try-except 块:

def execute(self, user_input: str) -> Dict[str, Any]:
    state = self.state_cls(user_input)
    try:
        decision = self.policy.decide(state)  # 进入 Policy.py
        if decision.action == "tool_call":
            result = self.tool.execute(decision.tool_name, decision.tool_args)
            state.update_with_tool_result(result)  # 更新 State
        else:
            state.set_response(decision.response)
    except ToolTimeoutError as e:
        state.set_response("服务暂时繁忙,请稍后再试")  # 降级响应
        self.logger.warn(f"Tool timeout: {e.tool_name}", extra={"trace_id": state.trace_id})
    finally:
        self.memory.save(state)  # 无论成功失败,都保存状态快照
    return state.to_dict()

这里藏着三个必须注意的细节:

  • State 更新时机 :不是在 Tool 执行完立刻更新,而是在 Policy 决策后、Tool 执行前,先用 decision.action 和 decision.tool_name 预填充 state。这样即使 Tool 超时,state 里仍有完整的决策记录,方便后续审计。
  • 降级不等于静默 :ToolTimeoutError 被捕获后,不仅返回友好提示,还通过 logger.warn 记录带 trace_id 的警告,且 extra 字段里明确标注了超时的 tool_name。这意味着运维可以在 Grafana 里直接查“最近1小时 tool_xxx 超时次数”,而不是翻日志大海捞针。
  • Memory.save() 在 finally 中 :这是反直觉的设计。多数人觉得失败就不该存状态,但 OpenClaw 认为:失败本身也是状态的一部分。比如用户连续三次问“我的订单在哪”,第三次触发风控策略,这个“三次失败尝试”的状态必须存下来,否则下次用户换设备登录,风控就失效了。

提示:新手最容易犯的错,是修改 Chain.py 里的 execute() 流程顺序。比如想“先查缓存再走 Policy”,于是把 self.memory.load() 插到 decision = self.policy.decide(state) 前面。这会导致 Policy 决策时看到的 state 是旧的(缓存里的),而实际执行时 state 已被更新,造成决策与执行脱节。正确做法是让 Policy 自己决定是否查缓存——在 Policy.py 的 decide() 方法里调用 self.memory.load()。

3.2 Executor.py:真正的“执行引擎”,专注 IO 与资源管理

Executor.py 是七个文件里最薄的一个,通常不到 200 行,但它是性能瓶颈的集中爆发区。它的唯一使命是: 安全、可控、可观测地执行外部操作 。这里的“外部操作”包括三类:调用 HTTP API(Tool)、查询向量库(RAG)、执行本地 Python 函数(Custom Function)。它不关心决策逻辑,也不处理状态,只做三件事:建立连接池、设置超时、包装异常。

核心设计是 connection_pool 参数。config.yaml 里有一段:

executor:
  http_pool_size: 10
  vector_db_pool_size: 3
  timeout: 8.0

Executor.py 初始化时会根据这些值创建对应的连接池。重点在 vector_db_pool_size: 3 —— 这不是随便写的数字。我们实测过:当并发请求超过 3 个时,向量库查询延迟从 200ms 暴涨到 1.2s,因为底层 Milvus 实例的 CPU 已达 95%。所以这个参数必须和你的向量库硬件规格强绑定,不能照搬文档。

另一个易忽略的点是异常包装。Executor.py 里所有 execute() 方法都返回 Result 对象,而非原始响应:

class Result:
    def __init__(self, success: bool, data: Any = None, error: str = None):
        self.success = success
        self.data = data
        self.error = error  # 错误信息是字符串,不是 Exception 对象

为什么不用 raise Exception?因为 Chain.py 的 execute() 需要统一处理降级逻辑。如果 Executor 直接抛出 requests.Timeout,Chain 就得写一堆 except requests.Timeout、except MilvusException,耦合太重。而 Result 对象让 Chain 只需判断 if not result.success: 即可,错误类型由 error 字符串标识(如 "http_timeout"、"vector_db_unavailable"),Policy.py 甚至可以根据 error 类型动态切换 fallback 策略。

注意:不要在 Executor.py 里做任何数据清洗或格式转换。曾有个团队在 execute() 里把 API 返回的 JSON 自动转成 pandas.DataFrame,结果下游 Tool.py 的其他方法拿到的是 DataFrame 而非 dict,导致类型错误。Executor 只负责“拿回来”,不负责“改回来”。

3.3 State.py:智能体的“工作台”,状态即契约

State.py 定义了智能体运行时的唯一数据载体——State 类。它不是简单的 dataclass,而是一个 带行为约束的数据契约 。打开它的 init 方法,你会看到:

def __init__(self, user_input: str, trace_id: str = None):
    self.user_input = user_input.strip()  # 强制去首尾空格
    self.trace_id = trace_id or str(uuid4())
    self.history = []  # 严格限制为 list[Dict]
    self.context = {}   # 严格限制为 dict
    self.response = ""
    self.tool_calls = []

所有字段都有类型注解和初始化约束。这不是为了 IDE 提示,而是为了 序列化安全 。OpenClaw 默认用 pickle 序列化 State 存入 Redis,如果 history 字段被意外赋值为 tuple 或 numpy.array,pickle 会报错,整个请求就失败了。所以 State 类的 setattr 方法被重写,对 history 字段做了类型检查:

def __setattr__(self, name, value):
    if name == "history" and not isinstance(value, list):
        raise TypeError(f"history must be list, got {type(value).__name__}")
    super().__setattr__(name, value)

State.py 的另一个关键是 to_dict() 和 from_dict() 方法。它们不是简单地 vars(self),而是做了字段过滤:to_dict() 只序列化业务相关字段(user_input, response, history),而隐藏了 trace_id、logger 等运行时字段;from_dict() 则在反序列化时校验字段完整性,缺失 user_input 就抛 ValidationError。这意味着,如果你用 curl 直接往 Redis 里塞一个伪造的 State JSON,OpenClaw 启动时会拒绝加载,而不是静默失败。

实操心得:State 的 history 字段存储格式有讲究。我们规定每条 history 必须是 {"role": "user|assistant", "content": "xxx", "timestamp": 171xxxxx}。很多团队初期用字符串拼接("user: xxx\nassistant: yyy"),结果 Policy.py 里写正则提取 role 时,遇到用户输入里含 "\nassistant:" 就解析错。用结构化 dict 是唯一可靠方案。

3.4 Tool.py:能力的“插槽”,不是工具箱而是协议网关

Tool.py 是 OpenClaw 最体现“工程思维”的文件。它不实现具体工具,而是定义了一个 标准化的工具接入协议 。所有外部能力(天气 API、数据库查询、Excel 生成)都必须继承 ToolBase 类,并实现两个抽象方法:

class ToolBase(ABC):
    @abstractmethod
    def schema(self) -> Dict[str, Any]:  # 描述工具能力的 JSON Schema
        pass
    
    @abstractmethod
    def execute(self, *args, **kwargs) -> Any:  # 执行逻辑
        pass

schema() 方法返回的 JSON Schema,会被 Policy.py 用来做 runtime 参数校验。比如天气工具的 schema:

{
  "name": "get_weather",
  "description": "获取指定城市的实时天气",
  "parameters": {
    "type": "object",
    "properties": {
      "city": {"type": "string", "description": "城市名称,必须是中文"},
      "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
    },
    "required": ["city"]
  }
}

Policy.py 在 decide() 时,如果生成了 {"action": "tool_call", "tool_name": "get_weather", "tool_args": {"city": "北京"}},就会用这个 schema 校验 tool_args 是否合法。缺少 city 报错,unit 值不是枚举值也报错。这比在 execute() 里写 if unit not in ["celsius", "fahrenheit"] 更早拦截问题。

Tool.py 的 init() 方法还做了连接复用。以数据库工具为例:

def __init__(self, db_config: Dict[str, Any]):
    self.engine = create_engine(
        f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['db']}",
        pool_size=5,
        max_overflow=10,
        pool_pre_ping=True  # 关键!每次取连接前先 ping
    )

pool_pre_ping=True 是血泪教训。我们曾在线上遇到“数据库连接池里全是失效连接”的问题,现象是前几个请求正常,第 6 个开始全报 “connection closed”。启用 pre_ping 后,每次从池里取连接都会先发个 SELECT 1,失效连接自动丢弃,新连接重建,问题消失。

常见问题:新手常把敏感信息(如 API Key)硬编码在 Tool.py 里。正确做法是 config.yaml 中定义:

tools:
  weather_api:
    key: "${WEATHER_API_KEY}"  # 从环境变量读取

Tool.py 的 init 通过 os.getenv() 获取,启动容器时用 -e WEATHER_API_KEY=xxx 传入。永远不要在代码里写密钥。

3.5 Memory.py:记忆的“银行”,不是缓存而是状态保险库

Memory.py 的核心矛盾在于:它既要快(毫秒级响应),又要稳(不丢数据),还要准(不污染上下文)。OpenClaw 给出的解法是 分层记忆 + 异步落盘 。它定义了两个接口:ShortTermMemory 和 LongTermMemory。前者基于 Python deque,在内存中维护最近 N 轮对话(config.yaml 中 max_history_length 控制),后者则对接 Redis 或向量库。

关键设计在 save() 方法:

def save(self, state: State):
    # 1. 先存短期记忆(内存)
    self.short_term.append(state.to_dict())
    
    # 2. 异步存长期记忆(Redis)
    asyncio.create_task(self._save_to_redis(state))
    
    # 3. 触发清理(如果超出长度限制)
    if len(self.short_term) > self.max_history_length:
        self.short_term.popleft()

这里用了 asyncio.create_task 而不是 await,是为了避免阻塞主线程。_save_to_redis() 是一个协程,它把 state 序列化后发往 Redis 的一个专用 channel,由后台消费者进程处理落盘。这样即使 Redis 瞬间不可用,短期记忆仍在,用户不会感知中断。

但异步带来新问题:数据一致性。我们实测发现,当用户快速连续发送 5 条消息,save() 被调用 5 次,但 _save_to_redis() 的执行顺序可能乱序,导致 Redis 里存储的历史顺序错乱。解决方案是在 state.to_dict() 里加入 sequence_id 字段,由 Chain.py 在 execute() 开始时递增生成,_save_to_redis() 存入 Redis 时带上这个 ID,后台消费者按 ID 排序后再写入最终存储。

注意:Memory.py 的 load() 方法绝不应该在 Policy.py 的 decide() 中同步调用。曾有个团队在 Policy 里写 history = self.memory.load(user_id) ,结果每次决策都要等 Redis RTT(平均 15ms),TPS 直接腰斩。正确做法是 Chain.py 在 execute() 开头就调用 load(),把 history 注入 state,Policy 决策时直接用 state.history。

3.6 Policy.py:决策的“大脑”,不是规则引擎而是策略路由器

Policy.py 是七个文件里业务耦合度最高的,但 OpenClaw 通过 策略路由机制 把它解耦了。它的核心是 decide() 方法,但这个方法本身只做三件事:

  1. 从 state 中提取关键特征(user_input 长度、是否含数字、history 长度、上一轮 response 类型);
  2. 根据特征匹配预设的策略路由表(routing_table);
  3. 调用匹配到的具体策略类的 execute() 方法。

routing_table 长这样:

self.routing_table = [
    {"condition": lambda s: len(s.user_input) < 5, "strategy": ShortQueryStrategy()},
    {"condition": lambda s: "订单" in s.user_input and s.history, "strategy": OrderTrackingStrategy()},
    {"condition": lambda s: True, "strategy": DefaultStrategy()},  # default 必须在最后
]

每个策略类(如 OrderTrackingStrategy)都是独立的 Python 文件,放在 strategies/ 目录下。Policy.py 不包含任何业务逻辑,只负责“派单”。这样做的好处是:运营人员可以随时增删策略路由,而不用动 Policy.py 主干代码;A/B 测试时,可以把 50% 流量路由到新策略,50% 走老策略,只需改 routing_table 的权重字段。

decide() 的返回值是 Decision 对象:

class Decision:
    def __init__(self, action: str, response: str = None, tool_name: str = None, tool_args: Dict = None):
        self.action = action  # "respond" | "tool_call" | "fallback"
        self.response = response
        self.tool_name = tool_name
        self.tool_args = tool_args

注意 action 字段只有三个值。这是硬性约定,目的是让 Chain.py 的 execute() 流程能穷举所有分支。如果允许自定义 action(如 "ask_clarify"),Chain 就得不断修改 if-elif 链,违背了“稳定主干”的设计哲学。

实操陷阱:新手常在策略类里写复杂 SQL 或调用外部 API。这是严重违反分层原则。OrderTrackingStrategy.execute() 只能做两件事:1)从 state.history 里提取订单号(正则匹配);2)返回 Decision(action="tool_call", tool_name="query_order_status", tool_args={"order_id": extracted_id})。真正的订单查询必须交给 Tool.py。否则策略类就变成了“业务逻辑+IO 混合体”,既难测试,又难监控。

3.7 Logger.py:系统的“神经末梢”,不是日志器而是可观测性中枢

Logger.py 是 OpenClaw 最被低估的文件。它看起来只是封装了 Python logging,但实际承担着 分布式追踪、指标采集、告警触发 三重职责。它的 get_logger() 方法返回的 logger 对象,重写了 handle() 方法:

def handle(self, record):
    # 1. 注入 trace_id(如果 record 没有,则从 state 或上下文提取)
    if not hasattr(record, 'trace_id'):
        record.trace_id = self._get_trace_id_from_context()
    
    # 2. 计算耗时(如果 record 有 start_time 字段)
    if hasattr(record, 'start_time') and hasattr(record, 'end_time'):
        record.duration_ms = (record.end_time - record.start_time) * 1000
    
    # 3. 发送到 Kafka(异步)
    self.kafka_producer.send('openclaw-logs', value=record.__dict__)
    
    # 4. 如果是 ERROR 级别,触发告警
    if record.levelno >= logging.ERROR:
        self.alert_manager.send_alert(record)
    
    super().handle(record)

这意味着,你在任何文件里写 logger.info("tool called", extra={"tool_name": "weather"}) ,最终日志里会自动带上 trace_id、duration_ms、service_name 等字段,Kafka 消费者可以直接入库到 Elasticsearch,Grafana 用这些字段画 P95 延迟曲线。

Logger.py 还内置了采样机制。config.yaml 中:

logger:
  sampling_rate: 0.01  # 1% 的 INFO 日志采样
  error_sampling_rate: 1.0  # ERROR 日志 100% 记录

这解决了高并发下日志爆炸的问题。我们线上 QPS 200 时,INFO 日志量达 17GB/天,开启 1% 采样后降到 170MB/天,而 ERROR 日志一条不漏。

关键提醒:永远不要在 Logger.py 里写 file handler。所有日志必须走 Kafka 或 stdout(容器标准输出)。因为 OpenClaw 设计为云原生部署,日志收集由 Fluentd 或 Filebeat 统一处理。如果在代码里写 open("app.log", "a"),会导致容器磁盘爆满,且日志无法被中央系统采集。

4. 实操全流程:从零搭建一个订单查询智能体(附完整配置)

现在我们用一个真实场景——“用户问‘我的订单在哪’,智能体查数据库返回物流信息”——走一遍 OpenClaw 全流程。这不是 Demo,而是我们上周刚上线的生产版本简化版。

4.1 环境准备与依赖安装

我们用 Python 3.10,依赖管理用 pip-tools(不是 poetry,因为线上要求确定性版本):

# requirements.in
openclaw==0.8.2
psycopg2-binary==2.9.7
redis==4.6.0
kafka-python==2.0.2
# 生成 requirements.txt
pip-compile requirements.in
# 安装
pip install -r requirements.txt

注意 psycopg2-binary 版本必须锁定。我们吃过亏:升级到 2.9.8 后,连接 PostgreSQL 15 时偶发 segfault,回退到 2.9.7 稳定运行 3 个月。

4.2 config.yaml 全量配置详解

这是我们的生产 config.yaml,每一行都经过压测验证:

# 全局配置
service_name: "order-query-agent"
version: "1.2.0"

# 执行器配置
executor:
  http_pool_size: 8          # 匹配 Nginx worker_connections
  vector_db_pool_size: 0     # 本项目不用向量库,设为0
  timeout: 5.0               # 工具调用总超时,单位秒

# 状态配置
state:
  max_history_length: 3      # 只存最近3轮,平衡内存与上下文

# 记忆配置
memory:
  short_term:
    max_length: 3
  long_term:
    type: "redis"            # 支持 redis / postgres / none
    host: "redis-prod.internal"
    port: 6379
    db: 2
    password: "${REDIS_PASSWORD}"

# 工具配置
tools:
  order_db:
    host: "pg-prod.internal"
    port: 5432
    database: "orders"
    user: "readonly_user"
    password: "${PG_PASSWORD}"
    # 连接池参数,与 executor.http_pool_size 独立
    pool_size: 5
    max_overflow: 10

# 策略配置
policy:
  routing_table:
    - condition: |
        lambda s: "订单" in s.user_input and ("在哪" in s.user_input or "物流" in s.user_input)
      strategy: "OrderTrackingStrategy"
    - condition: "lambda s: True"
      strategy: "DefaultStrategy"

# 日志配置
logger:
  level: "INFO"
  sampling_rate: 0.005       # 0.5% INFO 采样
  error_sampling_rate: 1.0
  kafka:
    bootstrap_servers: "kafka-prod.internal:9092"
    topic: "openclaw-logs"

提示:所有密码字段都用 ${VAR_NAME} 占位,启动时用 envsubst 替换:

envsubst < config.yaml > config.prod.yaml
python -m openclaw.chain --config config.prod.yaml

4.3 编写自定义 Tool:order_db.py

在 tools/ 目录下新建 order_db.py:

from openclaw.tool import ToolBase
from sqlalchemy import create_engine, text
from typing import Dict, Any

class OrderDBTool(ToolBase):
    def __init__(self, config: Dict[str, Any]):
        self.engine = create_engine(
            f"postgresql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}",
            pool_size=config.get("pool_size", 5),
            max_overflow=config.get("max_overflow", 10),
            pool_pre_ping=True,
            echo=False  # 生产关闭 SQL 输出
        )

    def schema(self) -> Dict[str, Any]:
        return {
            "name": "query_order_status",
            "description": "查询指定订单号的最新物流状态",
            "parameters": {
                "type": "object",
                "properties": {
                    "order_id": {"type": "string", "description": "16位数字订单号"}
                },
                "required": ["order_id"]
            }
        }

    def execute(self, order_id: str) -> Dict[str, Any]:
        # 1. 校验订单号格式
        if not order_id.isdigit() or len(order_id) != 16:
            return {"error": "订单号格式错误,应为16位数字"}

        # 2. 查询数据库(带超时)
        try:
            with self.engine.connect().execution_options(timeout=3.0) as conn:
                stmt = text("SELECT status, logistics_no, updated_at FROM orders WHERE order_id = :oid")
                result = conn.execute(stmt, {"oid": order_id}).fetchone()
                
                if not result:
                    return {"error": "未找到该订单"}
                
                return {
                    "status": result.status,
                    "logistics_no": result.logistics_no,
                    "updated_at": result.updated_at.isoformat()
                }
        except Exception as e:
            return {"error": f"数据库查询失败: {str(e)}"}

注意两点:

  • execute() 里用了 execution_options(timeout=3.0) ,这是 SQLAlchemy 的语句级超时,比连接池超时更细粒度;
  • 返回值是 dict,不是 ORM 对象,确保序列化安全。

4.4 编写自定义 Policy:strategies/order_tracking.py

在 strategies/ 目录下:

import re
from openclaw.policy import StrategyBase
from openclaw.state import State

class OrderTrackingStrategy(StrategyBase):
    def execute(self, state: State) -> Decision:
        # 1. 从用户输入提取订单号(支持多种格式)
        user_input = state.user_input
        # 匹配 16 位数字,或 "订单号:1234567890123456"
        match = re.search(r'(\d{16})|订单号[::\s]*(\d{16})', user_input)
        if not match:
            # 从历史记录里找(用户可能之前提过)
            for msg in reversed(state.history[-2:]):
                if msg.get("role") == "user":
                    sub_match = re.search(r'(\d{16})', msg.get("content", ""))
                    if sub_match:
                        order_id = sub_match.group(1)
                        break
            else:
                return Decision(action="respond", response="请提供16位订单号,例如:1234567890123456")
        else:
            order_id = match.group(1) or match.group(2)

        # 2. 返回工具调用指令
        return Decision(
            action="tool_call",
            tool_name="query_order_status",
            tool_args={"order_id": order_id}
        )

这里体现了 Policy 的核心价值: 上下文感知的意图补全 。用户只说“我的订单”,Policy 自动从 history 里找上次提到的订单号,而不是傻等用户再输一遍。

4.5 启动与验证

启动命令:

python -m openclaw.chain --config config.prod.yaml

验证用 curl:

curl -X POST http://localhost:8000/execute \
  -H "Content-Type: application/json" \
  -d '{"user_input": "我的订单在哪"}'

预期响应:

{
  "response": "您的订单 1234567890123456 当前状态:已发货,物流单号 SF123456789CN,最后更新时间:2024-05-20T14:22:33",
  "trace_id": "a1b2c3d4-e5f6-7890-g1h2-i3j4k5l6m7n8"
}

同时,Kafka 的 openclaw-logs topic 里会有一条结构化日志,包含 trace_id、duration_ms、tool_name、status_code 等字段,可直接接入监控大盘。

5. 常见问题与排查技巧实录:那些文档里不会写的坑

5.1 问题速查表:高频故障与定位路径

现象 可能原因 定位命令/日志关键词 解决方案
请求卡住,CPU 100%,无响应 Executor 连接池耗尽,线程死锁 kubectl top pod 查 CPU; grep "acquiring connection" logs 增大 executor.http_pool_size ;检查 Tool.py 是否漏关连接
返回“服务暂时繁忙”,但工具 API 正常 Policy.py 的 decide() 抛出未捕获异常 grep "ERROR.*Policy" logs ;检查 routing_table condition lambda 语法 ast.parse() 预检 condition 字符串;加 try-catch 包裹 execute()
Redis 里 memory 数据为空 Memory.py 的 _save_to_redis() 任务被 event loop 拒绝 grep "Task was destroyed but it is pending" logs 降低并发量;检查 asyncio.run() 是否被多次调用
同一 trace_id 出现多条日志,内容不一致 多个线程/协程共享同一个 State 实例 grep "trace_id.*duplicate" logs ;检查 Chain.py 是否复用 state State 必须每次 execute() 新建,禁止全局变量
工具调用返回 {"error": "connection closed"} PostgreSQL 连接被服务端主动断开(idle_in_transaction_timeout) SELECT * FROM pg_stat_activity WHERE state = 'idle in transaction'; 在 DB 配置里加 idle_in_transaction_session_timeout = 0

5.2 独家避坑技巧:来自三次线上事故的总结

技巧一:用 py-spy record 抓取实时火焰图,而不是猜哪里卡住
当出现“请求慢但日志没报错”时,90% 是 IO 等待。我们不再看日志,而是直接:

py-spy record -p $(pgrep -f "openclaw.chain") -o profile.svg --duration 30

生成的火焰图里,如果 sqlalchemy.engine.base.Connection._exec_driver_sql 占比超高,说明数据库查询慢;如果 redis.connection.Connection.read_response 高,说明 Redis 延迟大。这比翻 1000 行日志快 10 倍。

技巧二:在 CI 流程里加入 schema 校验,而不是等上线后报错
我们写了一个 pre-commit hook,每次提交前自动检查所有 Tool.schema() 返回的 JSON Schema 是否符合 OpenClaw 的元 schema:

# validate_schema.py
import jsonschema
from openclaw.tool import ToolBase

def validate_tool_schemas():
    for tool_class in get_all_tool_classes():
        schema = tool_class().schema()
        # 加载 OpenClaw 定义的元 schema
        with open("tool_schema.json") as f:
            meta_schema = json.load(f)
        jsonschema.validate(instance=schema, schema=meta_schema)

这样,如果有人在 schema 里漏写 required 字段,CI 直接 fail,不许提交。

技巧三:用 pytest --tb=short 跑单元测试,但必须 mock 所有外部依赖
Policy.py 的单元测试,我们从不连真实 Redis 或 DB:

def test_order_tracking_strategy_with_history(mocker):
    # mock
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐