OpenClaw 中间件:请求拦截、转换与增强的完整指南
目录
摘要
中间件是 OpenClaw 处理链路中最灵活的一环。本文从中间件的设计哲学出发,系统讲解中间件的三种模式(前置、后置、环绕)、洋葱模型执行链、请求/响应变换机制,以及流式消息处理的特殊考量。通过两个完整实战案例——“消息敏感词过滤中间件"和"请求耗时统计仪表盘中间件”——你将掌握从零开发生产级中间件的全部技能。读完你会理解:为什么 AI Agent 框架需要中间件,以及如何用它优雅地解决横切关注点。
1. 引言:为什么 AI Agent 也需要中间件
1.1 场景还原
先说一个真实遇到过的问题。
你的 OpenClaw Agent 接入了企业微信和飞书两个渠道,一切都运行得很好。直到有一天,安全部门找上门来:“你们 Agent 回复的消息里,有没有可能泄露敏感信息?能不能在消息发出去之前做一次检查?”
你第一反应可能是——在每个处理逻辑里加判断。但是 Agent 的回复路径不止一条:直接回复、自动摘要、子代理消息、心跳输出……每条路径都要改一遍?
更麻烦的是,一周后运维团队又提了新需求:“能不能统计每个渠道的消息响应耗时?我们想做性能监控。” 两周后产品说:“飞书渠道的消息需要自动添加加粗格式,但微信渠道不需要。”
面对这些横切关注点(cross-cutting concerns),逐个修改业务逻辑的代价越来越高。这正是中间件的用武之地。
1.2 中间件的本质
| 方案 | 扩展性 | 维护性 | 复用性 |
|---|---|---|---|
| 无中间件(修改核心) | ❌ 每次加需求改核心 | ❌ 核心代码膨胀 | ❌ 无法复用 |
| 有中间件(拦截链) | ✅ 添加新中间件即可 | ✅ 核心保持干净 | ✅ 中间件独立复用 |
一句话总结:中间件让你在不修改 Agent 核心逻辑的前提下,插入任何需要的前置和后置处理逻辑。
2. 中间件架构设计
2.1 三种中间件模式
OpenClaw 的中间件根据切入时机分为三种:
| 模式 | 切入时机 | 典型用途 | 能否修改数据 |
|---|---|---|---|
| Before(前置) | Agent 处理之前 | 鉴权、参数校验、输入清洗 | ✅ 可修改请求 |
| After(后置) | Agent 处理之后 | 格式化、翻译、脱敏 | ✅ 可修改响应 |
| Around(环绕) | 包裹整个处理过程 | 性能统计、异常捕获、缓存 | ✅ 可控制全流程 |
2.2 洋葱模型执行链
多个中间件会形成一个洋葱模型的执行链——请求从外层穿过一层层中间件到达核心,响应再从核心穿过一层层中间件回到外层:
洋葱模型的执行顺序:
请求阶段:Mid 1 → Mid 2 → Agent Core
响应阶段:Agent Core → Mid 2 → Mid 1
💡 这个设计保证了:最先拦截请求的中间件,最后看到响应。就像剥洋葱——先接触的外层,最后离开。
2.3 中间件配置体系
# openclaw.yaml
middleware:
enabled: true
# 全局中间件(对所有渠道生效)
global:
- name: "auth-validator" # 身份验证
enabled: true
order: 10 # 执行顺序(数字越小越靠外)
- name: "rate-limiter" # 速率限制
enabled: true
order: 20
- name: "request-logger" # 请求日志
enabled: true
order: 30
# 按渠道配置中间件
channels:
feishu:
- name: "feishu-formatter" # 飞书格式转换
enabled: true
order: 50
- name: "sensitive-filter" # 敏感信息过滤
enabled: true
order: 60
config:
keywords: ["密码", "token", "密钥"]
mask_char: "*"
wecom:
- name: "wecom-formatter" # 企微格式转换
enabled: true
order: 50
discord:
- name: "discord-sanitizer" # Discord Markdown清洗
enabled: true
order: 50
配置策略表:
| 配置项 | 说明 | 建议 |
|---|---|---|
global |
对所有渠道生效 | 放基础中间件(鉴权、日志、限流) |
channels.<name> |
针对特定渠道 | 放渠道特有的中间件(格式化、清洗) |
order |
执行顺序 | 鉴权(order:10) → 限流(20) → 日志(30) → 格式化(50) → 过滤(60) |
config |
中间件特定配置 | 通过 config 段传递自定义参数 |
3. 中间件开发实战:消息敏感词过滤
3.1 需求分析
目标:开发一个敏感信息过滤中间件,在 Agent 回复消息发送前自动检测并脱敏。不阻止消息发送(非阻塞模式),但标记并记录敏感信息。
3.2 完整实现
"""
middleware_sensitive_filter.py
消息敏感词过滤中间件
功能:
1. 检测消息中的敏感信息(手机号、身份证、银行卡、API Key)
2. 自动脱敏替换(如 138****1234)
3. 记录脱敏日志供审计
4. 支持自定义规则和豁免名单
"""
import re
import json
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
# ============================================
# 1. 敏感信息检测规则定义
# ============================================
@dataclass
class DetectionRule:
"""检测规则定义"""
name: str
pattern: str
mask_fn: callable # 脱敏函数
severity: str = "medium" # low / medium / high
enabled: bool = True
# 预置检测规则库
BUILTIN_RULES: List[DetectionRule] = [
# 中国大陆手机号
DetectionRule(
name="phone_cn",
pattern=r'1[3-9]\d{9}',
mask_fn=lambda m: m.group()[:3] + "****" + m.group()[-4:],
severity="high"
),
# 身份证号码
DetectionRule(
name="id_card_cn",
pattern=r'\d{17}[\dXx]',
mask_fn=lambda m: m.group()[:4] + "**********" + m.group()[-4:],
severity="high"
),
# 银行卡号(16-19位)
DetectionRule(
name="bank_card",
pattern=r'\d{16,19}',
mask_fn=lambda m: m.group()[:4] + " **** **** " + m.group()[-4:],
severity="high"
),
# API Key(常见格式)
DetectionRule(
name="api_key",
pattern=r'(?:api[_-]?key|apikey|token|secret)[:=]\s*["\']?([\w-]{20,})["\']?',
mask_fn=lambda m: m.group(1)[:4] + "***..." + m.group(1)[-4:]
if len(m.group(1)) > 8 else "***",
severity="high"
),
# 邮箱地址
DetectionRule(
name="email",
pattern=r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
mask_fn=lambda m: m.group()[:2] + "****@" + m.group().split("@")[1],
severity="medium"
),
]
# ============================================
# 2. 自定义关键词匹配器
# ============================================
class KeywordMatcher:
"""基于关键词列表的匹配器
支持:
- 精确匹配
- 正则模式匹配
- 自定义脱敏字符
"""
def __init__(self, keywords: List[str], mask_char: str = "*"):
self.keywords = set(keywords)
self.mask_char = mask_char
def check(self, text: str) -> Dict[str, Any]:
"""检查文本是否包含关键词"""
found = []
clean_text = text
for kw in self.keywords:
if kw.lower() in clean_text.lower():
found.append(kw)
# 替换为脱敏字符
masked = self.mask_char * len(kw)
clean_text = clean_text.replace(kw, masked)
return {
"has_sensitive": len(found) > 0,
"matched_keywords": found,
"count": len(found),
"masked_text": clean_text
}
# ============================================
# 3. 核心中间件逻辑
# ============================================
class SensitiveFilterMiddleware:
"""敏感信息过滤中间件
工作流程:
1. 接收消息 → 正则匹配敏感模式
2. 关键词匹配
3. 脱敏替换
4. 记录审计日志
5. 返回脱敏后的消息
"""
def __init__(self, config: Optional[Dict] = None):
cfg = config or {}
# 加载规则
self.rules = BUILTIN_RULES.copy()
# 关键词过滤器
keywords = cfg.get("keywords", [])
mask_char = cfg.get("mask_char", "*")
self.keyword_matcher = KeywordMatcher(keywords, mask_char)
# 豁免名单(不对特定用户做过滤)
self.whitelist = set(cfg.get("whitelist", []))
# 审计日志
self.audit_log = []
# 统计
self.stats = {
"total_checked": 0,
"total_masked": 0,
"by_rule": {}
}
def check(self, text: str, user_id: Optional[str] = None) -> Dict[str, Any]:
"""
检查并脱敏一条消息
Args:
text: 待检查的文本
user_id: 消息发送者ID(用于豁免检查)
Returns:
包含脱敏文本和检测报告的字典
"""
# 豁免检查
if user_id and user_id in self.whitelist:
return {
"original": text,
"masked_text": text,
"masked": False,
"reason": "whitelist",
"details": []
}
clean_text = text
details = []
masked_count = 0
# 1. 正则规则检测
for rule in self.rules:
if not rule.enabled:
continue
matches = list(re.finditer(rule.pattern, clean_text, re.IGNORECASE))
if matches:
# 应用脱敏
for match in matches:
mask_result = rule.mask_fn(match)
clean_text = clean_text.replace(match.group(), mask_result)
details.append({
"rule": rule.name,
"severity": rule.severity,
"matches": len(matches),
"samples": [m.group()[:20] for m in matches[:3]]
})
masked_count += len(matches)
# 更新统计
self.stats["by_rule"][rule.name] = \
self.stats["by_rule"].get(rule.name, 0) + len(matches)
# 2. 关键词检测
kw_result = self.keyword_matcher.check(clean_text)
if kw_result["has_sensitive"]:
clean_text = kw_result["masked_text"]
details.append({
"rule": "keyword_match",
"severity": "low",
"matches": kw_result["count"],
"samples": kw_result["matched_keywords"][:3]
})
masked_count += kw_result["count"]
# 3. 更新统计
self.stats["total_checked"] += 1
if masked_count > 0:
self.stats["total_masked"] += 1
# 4. 审计日志(不记录原文内容,只记录元数据)
audit_entry = {
"timestamp": time.time(),
"user_id": user_id,
"text_length": len(text),
"masked": masked_count > 0,
"match_count": masked_count,
"rule_names": [d["rule"] for d in details]
}
self.audit_log.append(audit_entry)
return {
"original_length": len(text),
"masked_text": clean_text,
"masked": masked_count > 0,
"total_matched": masked_count,
"details": details,
"audit_id": len(self.audit_log)
}
# ============================================
# 4. 中间件钩子函数(OpenClaw 接口)
# ============================================
middleware_instance = None
def on_load(config: Dict = None):
"""中间件加载初始化"""
global middleware_instance
middleware_instance = SensitiveFilterMiddleware(config)
print(f"✅ 敏感信息过滤中间件已加载 "
f"(规则: {len(middleware_instance.rules)}, "
f"关键词: {len(middleware_instance.keyword_matcher.keywords)})")
return {"status": "loaded"}
def before_message(message: Dict) -> Dict:
"""消息发送前拦截"""
if not middleware_instance:
return message
text = message.get("content", "")
user_id = message.get("user_id")
result = middleware_instance.check(text, user_id)
# 替换为脱敏后的内容
message["content"] = result["masked_text"]
# 附加元数据(不影响消息内容,供下游使用)
message["_meta"] = message.get("_meta", {})
message["_meta"]["sensitive_filter"] = {
"masked": result["masked"],
"count": result["total_matched"]
}
return message
def on_unload():
"""卸载前清理"""
global middleware_instance
if middleware_instance:
print(f"📊 敏感信息过滤统计: "
f"检查 {middleware_instance.stats['total_checked']} 条, "
f"脱敏 {middleware_instance.stats['total_masked']} 条")
middleware_instance = None
print("🛑 敏感信息过滤中间件已卸载")
return {"status": "unloaded"}
3.3 效果演示
# 测试敏感词过滤
from middleware_sensitive_filter import SensitiveFilterMiddleware
middleware = SensitiveFilterMiddleware({
"keywords": ["密码", "密钥", "secret"],
"mask_char": "*"
})
test_messages = [
"您的手机号是13812341234,请记录",
"我的身份证号是310101199001011234",
"API密钥是sk-abcdefghijklmnopqrstuvw",
"请帮我重置一下登录密码",
]
for msg in test_messages:
result = middleware.check(msg)
status = "🟢" if not result["masked"] else "🟡"
print(f"{status} 原文: {result['original_length']}字")
print(f" 脱敏后: {result['masked_text']}")
if result["details"]:
for d in result["details"]:
print(f" └─ {d['rule']}: {d['matches']}处匹配")
print()
预期输出:
🟡 原文: 16字
脱敏后: 您的手机号是138****1234,请记录
└─ phone_cn: 1处匹配
🟡 原文: 19字
脱敏后: 我的身份证号是3101**********1234
└─ id_card_cn: 1处匹配
🟡 原文: 18字
脱敏后: API密钥是sk-a***...tuvw
└─ api_key: 1处匹配
🟡 原文: 11字
脱敏后: 请帮我重置一下登录**
└─ keyword_match: 1处匹配
4. 实战案例二:请求耗时统计仪表盘中间件
4.1 需求分析
目标:统计每个渠道的消息处理耗时,按小时聚合,提供 API 查询仪表盘数据。
4.2 环绕中间件实现
"""
middleware_latency_stats.py
请求耗时统计仪表盘中间件
采用环绕模式(Around)实现对 Agent 处理全流程的耗时统计,
并聚合为小时级数据供监控面板查询。
"""
import time
import json
from collections import defaultdict
from typing import Dict, Any, Optional
from datetime import datetime
class LatencyStatsMiddleware:
"""环绕中间件——包裹Agent处理过程并记录耗时"""
def __init__(self, config: Optional[Dict] = None):
# 小时级聚合数据 { "2026-06-21T12": { "feishu": {count, total, min, max} } }
self.hourly_stats = defaultdict(lambda: defaultdict(
lambda: {"count": 0, "total_ms": 0, "min": float("inf"), "max": 0, "p99": 0}
))
self.all_durations = [] # 用于计算P99
def get_hour_key(self) -> str:
"""获取当前小时标识"""
return datetime.now().strftime("%Y-%m-%dT%H")
def update_stats(self, channel: str, duration_ms: float):
"""更新统计指标"""
hour = self.get_hour_key()
stats = self.hourly_stats[hour][channel]
stats["count"] += 1
stats["total_ms"] += duration_ms
stats["min"] = min(stats["min"], duration_ms)
stats["max"] = max(stats["max"], duration_ms)
# 保留最近1000条用于P99计算
self.all_durations.append(duration_ms)
if len(self.all_durations) > 1000:
self.all_durations.pop(0)
def get_stats(self, hours: int = 24) -> Dict:
"""获取最近 N 小时的统计数据,供监控面板 API 查询"""
result = {}
now = datetime.now()
for h in range(hours):
key = (now.replace(hour=now.hour - h)).strftime("%Y-%m-%dT%H")
if key in self.hourly_stats:
hour_data = {}
for channel, stats in self.hourly_stats[key].items():
avg = stats["total_ms"] / stats["count"] if stats["count"] > 0 else 0
hour_data[channel] = {
**stats,
"avg_ms": round(avg, 2),
"min": round(stats["min"], 2) if stats["min"] != float("inf") else 0,
"max": round(stats["max"], 2)
}
result[key] = hour_data
return result
def calculate_p99(self) -> float:
"""计算P99延迟"""
if not self.all_durations:
return 0
sorted_durations = sorted(self.all_durations)
idx = int(len(sorted_durations) * 0.99)
return sorted_durations[idx]
# ============================================
# 环绕中间件——执行包裹
# ============================================
stats_middleware = None
def on_load(config: Dict = None):
global stats_middleware
stats_middleware = LatencyStatsMiddleware(config)
print("✅ 请求耗时统计中间件已加载")
return {"status": "loaded"}
# 环绕中间件需要一个包裹函数来同时处理请求前和请求后
# 在OpenClaw中,通过yield语法实现环绕
def around_message(message: Dict, next_handler: callable):
"""
环绕中间件——包裹完整的消息处理流程
prev_handler: 返回生成器(yield前是前置,yield后是后置)
"""
channel = message.get("channel", "unknown")
start = time.time()
# ====== 前置:记录开始时间 ======
# 可以在这里做限流判断
print(f"📊 [{channel}] 开始处理消息...")
# ====== 调用下一层(Agent处理) ======
result = next_handler(message)
# ====== 后置:计算耗时并更新统计 ======
duration_ms = (time.time() - start) * 1000
stats_middleware.update_stats(channel, duration_ms)
print(f"📊 [{channel}] 处理完成, 耗时: {duration_ms:.2f}ms")
# 将耗时信息附加到响应元数据
if isinstance(result, dict):
result["_meta"] = result.get("_meta", {})
result["_meta"]["latency_ms"] = round(duration_ms, 2)
return result
def get_dashboard_data():
"""查询仪表盘数据的API"""
if not stats_middleware:
return {"error": "中间件未初始化"}
return {
"p99_ms": round(stats_middleware.calculate_p99(), 2),
"hourly": stats_middleware.get_stats(24)
}
EXPORTS = {
"middleware": {
"around_message": around_message
},
"api": {
"get_dashboard_data": get_dashboard_data
}
}
4.3 模拟仪表盘数据查询
# 查询仪表盘数据
data = get_dashboard_data()
for hour_key, channels in sorted(data["hourly"].items()):
print(f"\n⏰ {hour_key}")
for channel, stats in channels.items():
print(f" 📡 {channel}: "
f"请求数={stats['count']}, "
f"平均={stats['avg_ms']}ms, "
f"P99={data['p99_ms']}ms")
5. 中间件开发最佳实践
5.1 中间件设计原则
| 原则 | 说明 | 反例 |
|---|---|---|
| 单一职责 | 一个中间件只做一件事 | 既做鉴权又做统计 |
| 无副作用 | 不改变业务逻辑的正确性 | 过滤中间件把正常内容也删了 |
| 可配置 | 行为由配置控制 | 硬编码所有参数 |
| 幂等 | 多次执行不会出错 | 重复脱敏导致信息丢失 |
| 快速失败 | 检查不通过快速拒绝 | 链路里卡30秒不返回 |
5.2 执行顺序的最佳实践
最外层(先拦截、最后返回)
├── 安全类中间件(鉴权、限流、CORS)
├── 日志类中间件(请求日志)
├── 业务类中间件(格式化、脱敏)
├── 监控类中间件(耗时、错误率)
└── 最内层:Agent 核心处理
| 类型 | order 建议 | 必须最先/最后 |
|---|---|---|
| 鉴权/限流 | 10-19 | ✅ 必须最先(不通过则拒绝) |
| 请求日志 | 20-29 | 记录原始请求 |
| 输入清洗 | 30-39 | 在 Agent 处理前完成 |
| 输出格式化 | 40-49 | 在 Agent 处理后立即执行 |
| 输出脱敏 | 50-59 | ✅ 必须最后(保证干净输出) |
| 性能统计 | 60-69 | 包裹整个流程 |
6. 总结
本文从零构建了两个生产级中间件,覆盖了 OpenClaw 中间件系统的完整知识点:
核心要点:
-
三种模式,一个入口:前置(before)做输入校验,后置(after)做输出增强,环绕(around)做全流程监控
-
洋葱模型:多个中间件形成层层嵌套的执行链——请求从外向内穿过,响应从内向外返回
-
敏感词过滤:正则匹配 → 关键词筛 → 脱敏替换 → 审计日志,四步完成安全过滤
-
耗时统计:环绕中间件记录全流程耗时,按小时+渠道聚合 P99 延迟
-
横切关注点分离:中间件的核心价值——不改核心代码,独立添加任何横向能力
思考题:
-
你有一个中间件需要耗时较长(如调用外部 API 做内容审核)。如何设计异步中间件,避免阻塞整个消息处理链路?
-
两个中间件可能产生冲突——比如"格式化中间件"添加了 Markdown 样式,但"脱敏中间件"把加粗语法也脱敏了。你会如何设计中间件的执行顺序避免这种冲突?
-
中间件本身也会出错(比如统计中间件的 Redis 挂了)。如何设计中间件的降级策略——是让整个流程失败,还是跳过出错的中间件继续处理?
参考资料
更多推荐

所有评论(0)