从 nanobot 理解 openclaw 核心设计
❝当 Manus、OpenAI Codex、Claude Code 动辄几十万行代码时,一个仅 4000 行核心代码的 AI Agent 框架却五脏俱全:ReAct 推理循环、双层记忆系统、多渠道接入、子 Agent 协作、MCP 协议……这就是 nanobot。本文将带你逐层拆解它的技术内核。
点击上方 程序员成长指北,关注公众号
回复1,加入高级Node交流群
1. 什么是 nanobot?
1.1 介绍
在 AI Agent 的赛道上,你可能见过 Cursor、Cline、OpenAI Codex 这些庞然大物——动辄几十万行代码、复杂的微服务架构。
而 nanobot 走了一条截然不同的路:核心代码仅约 4000 行,却实现了 openclaw 的核心能力:
-
支持 10+ 聊天渠道(Telegram、Discord、飞书、钉钉、微信、Slack、QQ、Email、WhatsApp、Matrix)
-
内置 ReAct推理循环,支持多轮工具调用
-
双层记忆系统:长期记忆 + 可检索历史
-
支持 20+LLM提供商(OpenAI、Claude、DeepSeek、Gemini、通义千问、Kimi……)
-
MCP协议原生支持,可动态接入外部工具
-
子 Agent 协作、定时任务、心跳机制
1.2 安装和配置
安装:
# 使用 PyPI 安装
pip install nanobot-ai
# 使用 uv 安装
uv tool install nanobot-ai
nanobot 的配置核心在 ~/.nanobot/config.json。通常你先执行一次:
nanobot onboard
这个命令会帮你初始化工作区、模板文件和基础配置。
接下来真正需要你关心的配置,主要有三块:模型提供方、Agent 默认参数、聊天渠道。
我这里给出一份使用方舟作为提供方,集成飞书的配置:
{
"providers": {
"openai": {
"apiKey": "xxx",
"apiBase": "https://ark.cn-beijing.volces.com/api/v3"
}
},
"agents": {
"defaults": {
"workspace": "~/.nanobot/workspace",
"model": "doubao-seed-1-8-251228",
"maxTokens": 8192,
"temperature": 0.7,
"maxToolIterations": 20
}
},
"channels": {
"feishu": {
"enabled": true,
"appId": "xxx",
"appSecret": "xxx",
"encryptKey": "",
"verificationToken": "",
"allowFrom": ["*"]
}
}
}
飞书的 appId 和 appSecret 需要到 飞书开放平台 创建一个应用,开启机器人功能,配置使用长连接接收事件,添加 im.message.receive_v1 事件。 注意:需要本地先启动 nanobot gateway 之后,才能使用长连接接收事件。
配置完成之后,将机器人发布上线,你就可以和他开始愉快对话了。
2. 整体流程
2.1 架构
官方 README 已经给出了一张架构图:

但如果要用更容易理解的话来概括,我更愿意把 nanobot 描述为下面这个结构:
nanobot 的代码组织也非常清晰。
nanobot/
├── agent/ # 核心:Agent 循环、上下文、记忆、技能、子代理
│ ├── loop.py # Agent Loop —— 大脑中枢
│ ├── context.py # 上下文构建器(System Prompt 组装)
│ ├── memory.py # 记忆系统(MEMORY.md + HISTORY.md)
│ ├── skills.py # 技能加载器
│ ├── subagent.py # 子代理管理器
│ └── tools/ # 内建工具集
│ ├── base.py # 工具基类
│ ├── registry.py # 工具注册表
│ ├── filesystem.py # 文件读写编辑
│ ├── shell.py # Shell 命令执行
│ ├── web.py # Web 搜索与网页抓取
│ ├── message.py # 跨渠道消息发送
│ ├── spawn.py # 子代理生成
│ ├── cron.py # 定时任务管理
│ └── mcp.py # MCP 协议客户端
├── bus/ # 消息总线(解耦渠道与 Agent)
│ ├── events.py # 事件数据结构
│ └── queue.py # 异步消息队列
├── channels/ # 多渠道接入层
│ ├── base.py # 渠道抽象基类
│ ├── manager.py # 渠道管理器
│ ├── feishu.py # 飞书
│ ├── telegram.py # Telegram
│ └── ... # Discord、Slack、WhatsApp 等
├── session/ # 会话管理
│ └── manager.py # 会话持久化(JSONL)
├── cron/ # 定时任务引擎
│ ├── service.py # 调度服务
│ └── types.py # 任务数据结构
├── heartbeat/ # 心跳服务
│ └── service.py # 周期性唤醒
├── providers/ # LLM 提供商抽象层
│ ├── base.py # 统一接口
│ ├── litellm_provider.py # LiteLLM 适配器
│ └── ...
├── config/ # 配置加载
├── skills/ # 内置技能包(Markdown 文件)
├── templates/ # 工作区模板
└── cli/ # CLI 入口
在这个结构里,有几个模块需要先在脑子里立住:
| 模块 | 作用 |
|---|---|
| Channel |
负责“接消息和发消息”。它知道怎么跟飞书长连接,怎么从 Telegram 收消息,怎么把最后的回复发回去,但它并不决定内容怎么生成。 |
| MessageBus |
负责“转运消息”。渠道层把收到的消息丢进 |
| Gateway |
整个系统真正跑起来的现场总控。你执行 |
| AgentLoop |
nanobot 的核心脑回路。用户说了什么、最近聊过什么、长期记忆里有什么、系统 prompt 长什么样、这轮是不是该调工具、工具结果回来之后要不要继续思考,这些事都发生在这里。 |
| SessionManager |
会话历史。它的作用不是让消息“临时过一下”,而是让每个渠道、每个 chat 都有自己的连续上下文。 |
| MemoryStore |
解决的是比 Session 更长期的问题。会话历史可以很长,但不能永远全塞进上下文,所以必须沉淀成更高层次的长期记忆与历史日志。 |
| SkillsLoader |
解决的是另一个经典问题:模型再强,也不可能天然掌握你所有的工作流和领域技巧。所以 nanobot 把这类“能力说明书”做成 Skill,让模型按需加载。 |
| ToolRegistry |
把模型和现实世界接起来的那座桥。文件系统、shell、Web、message、cron、spawn,甚至外部 MCP 服务,最终都会变成模型可调用的“工具”。 |
可以看到,nanobot 不追求每个模块都做得特别炫,而是追求边界足够清晰。你会发现它几乎所有机制都围绕一个原则展开,那就是把复杂度拆散,分别安放在合适的层级里,而不是堆在一个“大脑”里面。
2.2 核心处理流程
理解一个系统最好的方式,就是跟着一条数据走完它的全部旅程。现在,假设用户在飞书里给 nanobot 发了一条消息:"帮我查一下今天的天气"。让我们跟着这条消息,完整地走一遍 Agent 的处理流程。
2.2.1 Channel 层
消息的起点在 FeishuChannel。飞书渠道采用了一种非常巧妙的接入方式——WebSocket 长连接,而不是传统的 Webhook 回调。这意味着部署 nanobot 不需要公网 IP,不需要域名,不需要 SSL 证书,一台能上网的电脑就够了。
# nanobot/channels/feishu.py
class FeishuChannel(BaseChannel):
"""
Feishu/Lark channel using WebSocket long connection.
Uses WebSocket to receive events - no public IP or webhook required.
"""
asyncdef start(self) -> None:
self._ws_client = lark.ws.Client(
self.config.app_id,
self.config.app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.INFO
)
# WebSocket 在独立线程中运行,自带断线重连
def run_ws():
while self._running:
try:
self._ws_client.start()
except Exception as e:
logger.warning("Feishu WebSocket error: {}", e)
if self._running:
time.sleep(5)
self._ws_thread = threading.Thread(target=run_ws, daemon=True)
self._ws_thread.start()
在异步的 _on_message 方法中,飞书的各种消息类型(文本、富文本 Post、图片、文件、卡片等)被统一解析成纯文本 + 媒体文件的形式。
如果消息中有图片,会被下载到本地并转为 base64 编码传给 LLM。还有一个贴心的小设计——收到消息后会立即给用户一个 Emoji 回应(默认是 👍),让用户知道消息已经被接收,Agent 正在处理中。
async def _on_message(self, data: "P2ImMessageReceiveV1") -> None:
# 消息去重
message_id = message.message_id
if message_id in self._processed_message_ids:
return
self._processed_message_ids[message_id] = None
# 给用户一个"收到"的反应
await self._add_reaction(message_id, self.config.react_emoji)
# 解析消息内容...
# 最终推送到消息总线
await self._handle_message(
sender_id=sender_id,
chat_id=reply_to,
content=content,
media=media_paths,
metadata={"message_id": message_id, "chat_type": chat_type, "msg_type": msg_type}
)
2.2.2 消息总线
_handle_message 最终做的事情很简单——构造一个 InboundMessage 对象,推入消息总线。
# nanobot/bus/events.py
@dataclass
class InboundMessage:
channel: str # "feishu"
sender_id: str # 发送者 ID
chat_id: str # 聊天 ID
content: str # 消息文本
media: list[str] # 媒体文件路径列表
metadata: dict # 渠道元数据
@property
def session_key(self) -> str:
return self.session_key_override or f"{self.channel}:{self.chat_id}"
MessageBus 的实现极简到只有两个 asyncio.Queue:
# nanobot/bus/queue.py
class MessageBus:
def __init__(self):
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
async def publish_inbound(self, msg: InboundMessage) -> None:
await self.inbound.put(msg)
async def consume_inbound(self) -> InboundMessage:
return await self.inbound.get()
就是两个队列,一进一出。对于单进程的个人助手来说,asyncio.Queue 提供了恰到好处的异步解耦能力,而且零依赖、零延迟、零运维成本。这正是 nanobot 设计哲学的体现——不要引入超出实际需要的复杂度。
2.2.3 Agent Loop
消息进入总线后,就来到了 nanobot 的核心——AgentLoop。这是整个系统的大脑中枢,它的 run() 方法是一个永不停歇的消费循环:
# nanobot/agent/loop.py
asyncdef run(self) -> None:
self._running = True
await self._connect_mcp()
logger.info("Agent loop started")
while self._running:
try:
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
except asyncio.TimeoutError:
continue
if msg.content.strip().lower() == "/stop":
await self._handle_stop(msg)
else:
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
这里有两个精心设计的地方。
-
消息消费设置了 1 秒超时,而不是无限阻塞。这让循环始终保持响应性——如果用户发了
/stop命令,不需要等到当前消息处理完,随时都能响应。 -
每条消息被封装为一个独立的
asyncio.Task,并按session_key分组跟踪。这意味着可以精确地取消某个会话的所有进行中任务,而不影响其他会话。
消息进入 _dispatch 后,在全局处理锁 _processing_lock 的保护下交给 _process_message,接下来就是真正的 Agent 推理流程了。
2.2.4 Process Message
_process_message 方法大约有 120 行,它编排了从消息接收到最终回复的全部逻辑,如下图所示:
我们逐步来看关键步骤的代码。
-
系统消息, 如果消息来自
system渠道(比如 Subagent 的结果回报),会走一条简化的处理路径——不检查斜杠命令、不触发记忆整合,直接构建上下文并执行。系统消息的chat_id格式是channel:chat_id(如feishu:oc_xxx),需要先拆分出真实的渠道和聊天 ID。 -
获取或创建 Session, 根据
session_key(格式为channel:chat_id,如feishu:ou_abc123)查找已有会话,找不到就创建一个新的。SessionManager内部有一层内存缓存,热路径上不需要每次都读磁盘。 -
命令, 在做任何 LLM 调用之前,先检查用户是否发送了斜杠命令。
/new会触发同步的全量记忆归档(这里是阻塞的,因为必须确保归档成功后才能清空会话),/help则直接返回帮助文本。如果不是斜杠命令,继续后续处理。 -
检查记忆整合, 这是很容易被忽略但至关重要的一步——在构建上下文之前,先检查当前会话中未合并的消息数是否达到了阈值(
memory_window,默认 100):
unconsolidated = len(session.messages) - session.last_consolidated
if (unconsolidated >= self.memory_window and session.key notin self._consolidating):
self._consolidating.add(session.key)
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
asyncdef _consolidate_and_unlock():
try:
asyncwith lock:
await self._consolidate_memory(session)
finally:
self._consolidating.discard(session.key)
_task = asyncio.current_task()
if _task isnotNone:
self._consolidation_tasks.discard(_task)
_task = asyncio.create_task(_consolidate_and_unlock())
self._consolidation_tasks.add(_task)
为什么要在构建上下文之前做这个检查?
因为记忆整合会更新 MEMORY.md 的内容和 session.last_consolidated ,而这两者都会影响后续步骤——MEMORY.md 会被注入到 System Prompt 中(步骤 7),last_consolidated 决定了 get_history 返回哪些消息(步骤 6)。
但这里有个微妙之处:整合任务是通过 asyncio.create_task 放到后台的,不会阻塞当前消息的处理。所以当前这轮对话可能还用的是"旧"的记忆内容,但下一轮对话就能用到整合后的新记忆了。这是一个有意的权衡——如果等待整合完成再继续,用户每隔 100 条消息就会遇到一次明显的延迟。
注意 _consolidating 集合的作用:它确保同一个会话不会被重复触发整合(比如用户连续发了两条消息,第一条触发了整合,第二条检查时发现整合正在进行就不会再触发)。_consolidation_locks 提供了更细粒度的互斥保护,_consolidation_tasks 保持强引用防止 GC 回收后台任务。
-
设置工具上下文。 把当前消息的 channel、chat_id、message_id 注入到
message、spawn、cron等工具中,这样这些工具在执行时知道应该往哪个渠道发送消息。同时重置message_tool的_sent_in_turn标记,用于在最后判断 Agent 是否已经主动发过消息。
最后,就是我们接下来要详细展开的上下文构建、ReAct 循环、保存和回复流程。
2.2.5 上下文构建
在调用 LLM 之前,nanobot 需要组装完整的上下文。这件事由 ContextBuilder 负责。System Prompt 的构建过程就像搭积木,每一层都有明确的职责:
身份认知(Identity)
这是 System Prompt 的基石。nanobot 在这里告诉 LLM "你是谁"、"你在哪里工作"、"你应该遵循什么规则":
def _get_identity(self) -> str:
workspace_path = str(self.workspace.expanduser().resolve())
system = platform.system()
runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
return f"""# nanobot 🐈
You are nanobot, a helpful AI assistant.
## Runtime
{runtime}
## Workspace
Your workspace is at: {workspace_path}
- Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here)
- History log: {workspace_path}/memory/HISTORY.md (grep-searchable). Each entry starts with [YYYY-MM-DD HH:MM].
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
## nanobot Guidelines
- State intent before tool calls, but NEVER predict or claim results before receiving them.
- Before modifying a file, read it first. Do not assume files or directories exist.
- After writing or editing a file, re-read it if accuracy matters.
- If a tool call fails, analyze the error before retrying with a different approach.
- Ask for clarification when the request is ambiguous.
Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel."""
这段 prompt 里有几个精心雕琢的细节。首先,它明确告知了 LLM 记忆文件的路径和用法,这使得 Agent 在用户要求"记住某件事"时知道往哪里写。
其次,Guidelines 中的 "NEVER predict or claim results before receiving them" 这条规则非常关键——它阻止了 LLM 在调用工具之前就自信满满地"编造"结果。
最后,明确区分了"直接回复"和"使用 message 工具"的场景,避免 Agent 在不需要的时候调用消息发送工具。
引导文件(Bootstrap Files)
这是 nanobot 最具可定制性的部分。系统会按照固定的顺序从工作区加载五个可选的 Markdown 文件:
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]
def _load_bootstrap_files(self) -> str:
parts = []
for filename in self.BOOTSTRAP_FILES:
file_path = self.workspace / filename
if file_path.exists():
content = file_path.read_text(encoding="utf-8")
parts.append(f"## {filename}\n\n{content}")
return "\n\n".join(parts) if parts else ""
每个文件都有特定的职能,下面详细展开:AGENTS.md 是 Agent 的行为指令手册,类似于 Cursor 中的 .cursorrules。它定义了 Agent 在各种场景下应该如何行动。nanobot 的默认模板中包含了关于定时提醒和心跳任务的指导:
# Agent Instructions
You are a helpful AI assistant. Be concise, accurate, and friendly.
## Scheduled Reminders
Before scheduling reminders, check available skills and follow skill guidance first.
Use the built-in `cron` tool to create/list/remove jobs.
Get USER_ID and CHANNEL from the current session.
**Do NOT just write reminders to MEMORY.md** — that won't trigger actual notifications.
## Heartbeat Tasks
`HEARTBEAT.md` is checked on the configured heartbeat interval.
When the user asks for a recurring/periodic task, update `HEARTBEAT.md`
instead of creating a one-time cron reminder.
这个文件的一个妙处在于那句 "Do NOT just write reminders to MEMORY.md"——这是为了防止一个常见的 Agent 误解:用户说"提醒我明天开会"时,Agent 可能会把这条信息写进记忆文件就觉得完事了,但实际上需要创建一个真正的定时任务才能在到点时触发通知。这是用"告诉 AI 不要做什么"来修正行为偏差的经典做法。
|
文件 |
作用 |
源码 |
|---|---|---|
| SOUL.md |
定义了 Agent 的性格、价值观和沟通风格; |
# Soul I am nanobot 🐈, a personal AI assistant. ## Personality - Helpful and friendly - Concise and to the point - Curious and eager to learn ## Values - Accuracy over speed - User privacy and safety - Transparency in actions ## Communication Style - Be clear and direct - Explain reasoning when helpful - Ask clarifying questions when needed |
| USER.md |
记录了用户画像信息,帮助 Agent 更好地个性化服务 |
# User Profile ## Basic Information - **Name**: (your name) - **Timezone**: (your timezone, e.g., UTC+8) - **Language**: (preferred language) ## Work Context - **Primary Role**: (your role, e.g., developer, researcher) - **Main Projects**: (what you're working on) - **Tools You Use**: (IDEs, languages, frameworks) |
| TOOLS.md |
补充了工具的非显式约束,虽然工具的参数和描述已经通过 Function Calling 的 schema 提供了,但有些约束是 schema 无法表达的(比如"危险命令会被拦截")。 |
# Tool Usage Notes ## exec — Safety Limits - Commands have a configurable timeout (default 60s) - Dangerous commands are blocked (rm -rf, format, dd, shutdown, etc.) - Output is truncated at 10,000 characters - `restrictToWorkspace` config can limit file access to the workspace |
| IDENTITY.md |
引导文件,留给用户做完全自定义的身份覆盖——如果用户想让 nanobot 扮演一个完全不同的角色,可以在这个文件里写入新的身份定义。 |
五个 Bootstrap Files 的加载顺序是精心安排的:先是行为指令(AGENTS),然后是性格定义(SOUL),接着是用户信息(USER),再是工具补充(TOOLS),最后是身份覆盖(IDENTITY)。 后面的文件可以补充或覆盖前面的设定,但整体保持了一个从"通用"到"个性化"的渐进过程。
长期记忆
从 MEMORY.md 文件加载。记忆系统的详细设计我们在后面的章节重点展开。
Skills
采用了一种"两级加载"策略:标记为 always=true 的技能全文注入 System Prompt(因为它们总是需要),而其他技能只放一个 XML 格式的摘要清单。
当 Agent 需要使用某个技能时,通过 read_file 工具按需加载完整内容。这是一个非常聪明的 Token 优化策略——在不牺牲能力的前提下,最大限度减少 System Prompt 的长度。
除了 System Prompt,还有一段运行时上下文会被注入到用户消息前面:
@staticmethod
def _build_runtime_context(channel: str | None, chat_id: str | None) -> str:
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
tz = time.strftime("%Z") or "UTC"
lines = [f"Current Time: {now} ({tz})"]
if channel and chat_id:
lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"]
return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines)
这段上下文以一个特殊标记 [Runtime Context — metadata only, not instructions] 开头,明确告诉 LLM 这只是元数据,不是用户指令。
为什么要把运行时信息放在 user 消息里而不是 system prompt 中?因为时间和渠道信息每轮对话都会变化,如果放在 system prompt 中会破坏 Prompt Cache 的前缀匹配。
放在 user 消息里,前面的 system prompt 和 history 部分就能稳定地命中缓存。
这个小技巧同时还能有效防止 Prompt Injection——恶意用户不能通过在消息中伪造运行时上下文来欺骗 Agent。
最终,完整的消息列表被组装为标准的 OpenAI Chat 格式:
def build_messages(self, history, current_message, media=None, channel=None, chat_id=None):
runtime_ctx = self._build_runtime_context(channel, chat_id)
user_content = self._build_user_content(current_message, media)
# 合并运行时上下文和用户内容为单条 user 消息
if isinstance(user_content, str):
merged = f"{runtime_ctx}\n\n{user_content}"
else:
merged = [{"type": "text", "text": runtime_ctx}] + user_content
return [
{"role": "system", "content": self.build_system_prompt(skill_names)},
*history, # 历史对话
{"role": "user", "content": merged}, # 当前消息(含运行时上下文)
]
这里把运行时上下文和用户消息合并为一条 user 消息,而不是分成两条,是因为某些 LLM Provider 不允许连续出现相同 role 的消息(比如两条连续的 user 消息)。
如果用户发送了图片,_build_user_content 会把图片编码为 base64 的 image_url 格式,构成多模态内容列表。
2.2.6 ReAct 循环
上下文组装完毕后,进入 Agent 的核心推理循环 _run_agent_loop。这是一个经典的 ReAct(Reasoning + Acting)模式实现:
async def _run_agent_loop(self, initial_messages, on_progress=None):
messages = initial_messages
iteration = 0
final_content = None
tools_used: list[str] = []
while iteration < self.max_iterations:
iteration += 1
# 1. 调用 LLM
response = await self.provider.chat(
messages=messages,
tools=self.tools.get_definitions(),
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
reasoning_effort=self.reasoning_effort,
)
if response.has_tool_calls:
# 2. LLM 决定使用工具 —— 执行并将结果加入上下文
for tool_call in response.tool_calls:
tools_used.append(tool_call.name)
result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
else:
# 3. LLM 给出最终回复 —— 退出循环
final_content = self._strip_think(response.content)
break
return final_content, tools_used, messages
整个循环的逻辑非常直观:把消息列表和所有可用工具的定义发给 LLM,如果 LLM 返回了工具调用请求(has_tool_calls),就执行相应的工具,把结果追加到消息列表中,然后再次调用 LLM;如果 LLM 返回的是纯文本回复(没有工具调用),说明它已经完成了思考,循环结束。
这个循环的上限是 max_iterations(默认 40 次),防止 Agent 陷入无限循环。当达到上限时,会返回一个友好的提示:"I reached the maximum number of tool call iterations. You can try breaking the task into smaller steps."
注意这里还有一个 _strip_think 方法,用于清理某些模型(如 DeepSeek)在回复中嵌入的 <think>...</think> 块。
同时,如果 LLM 返回了错误(finish_reason == "error"),不会将错误消息保存到会话历史中,避免"毒化"后续的上下文。
这个处理来自一个实际踩到的坑——错误消息一旦进入 session history,会导致后续请求持续返回 400 错误。
2.2.7 工具注册和执行
工具的执行通过 ToolRegistry 统一管理。每个工具继承 Tool 基类,注册到 Registry 中:
# nanobot/agent/tools/registry.py
asyncdef execute(self, name: str, params: dict[str, Any]) -> str:
_HINT = "\n\n[Analyze the error above and try a different approach.]"
tool = self._tools.get(name)
ifnot tool:
returnf"Error: Tool '{name}' not found. Available: {', '.join(self.tool_names)}"
try:
errors = tool.validate_params(params)
if errors:
returnf"Error: Invalid parameters for tool '{name}': " + "; ".join(errors) + _HINT
result = await tool.execute(**params)
if isinstance(result, str) and result.startswith("Error"):
return result + _HINT
return result
except Exception as e:
returnf"Error executing {name}: {str(e)}" + _HINT
这里有个温暖的小设计:每当工具执行出错时,返回的错误消息末尾都会追加一句 [Analyze the error above and try a different approach.]。
这不是给人看的,而是给 LLM 看的——它温柔地提醒 LLM "分析一下哪里出了问题,换个思路再试试"。在实践中,这个小小的 hint 能显著提升 Agent 的纠错能力。
nanobot 默认注册的工具包括:
def _register_default_tools(self) -> None:
for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(ExecTool(...))
self.tools.register(WebSearchTool(api_key=self.brave_api_key, proxy=self.web_proxy))
self.tools.register(WebFetchTool(proxy=self.web_proxy))
self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
self.tools.register(SpawnTool(manager=self.subagents))
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
文件操作、Shell 执行、Web 搜索/抓取、跨渠道消息发送、子代理生成、定时任务——这些工具覆盖了日常使用的绝大多数场景。而且由于 Registry 的设计,MCP 外部工具在注册后和内建工具完全等价。
2.2.8 会话管理
每轮对话结束后,新产生的消息需要被保存到会话中。nanobot 的会话设计有一个核心原则——append-only(只追加不修改)。
# nanobot/session/manager.py
@dataclass
class Session:
"""
A conversation session.
Important: Messages are append-only for LLM cache efficiency.
"""
key: str # channel:chat_id
messages: list[dict] # 所有历史消息
last_consolidated: int = 0 # 已被整合到记忆的消息数
为什么是 append-only?因为现代 LLM API(如 Claude 和 GPT)都有 Prompt Cache 机制:如果两次请求的前缀相同,Provider 可以复用之前的 KV Cache,大幅减少首 Token 延迟和成本。
如果我们修改或删除历史消息,就会破坏这个前缀匹配,Cache 全部失效。所以 nanobot 的做法是永远只追加,即使做了记忆整合(consolidation),也不会修改已有的消息列表,而是通过 last_consolidated 指针来标记哪些消息已经被"归档"。
用图来说明 last_consolidated 指针的作用:
Session.messages 数组:
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ m0 │ m1 │ m2 │ m3 │ m4 │ m5 │ m6 │ m7 │ m8 │ m9 │
└─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┴─────┘
▲ ▲
│ │
last_consolidated = 4 最新消息位置
◄─ 已整合到 MEMORY.md ─► ◄── 活跃上下文 ──►
(不再发给 LLM) (作为 history 发给 LLM)
会话持久化使用 JSONL 格式(每行一个 JSON 对象),第一行是元数据,后续每行是一条消息:
{"_type": "metadata", "key": "cli:direct", "created_at": "2026-03-01T17:27:02.381065", "updated_at": "2026-03-05T22:44:17.518718", "metadata": {}, "last_consolidated": 58}
{"role": "user", "content": "你好,请做一下自我介绍", "timestamp": "2026-03-01T17:27:20.804618"}
{"role": "assistant", "content": "你好!我是 **nanobot** 🐈,一个轻量级的AI助手,很高兴认识你!\n\n### 我的特点:\n- **乐于助人且友好**:我会尽力用简洁准确的方式帮你解决问题\n- **能力多样**:\n - 文件操作:读写编辑文件、浏览目录\n - 系统交互:执行shell命令、管理任务\n - 网络能力:网页搜索、内容抓取\n - 技能扩展:支持查天气、设置定时任务等(还能安装更多技能)\n- **注重靠谱**:我会优先保证准确性,并且透明地告诉你我在做什么\n\n### 我能帮你做这些:\n- 日常信息查询(比如天气、知识科普)\n- 文件处理和自动化脚本\n- 技术问题排查\n- 网页内容整理\n- 定时任务提醒\n\n有任何需求或问题,随时告诉我吧! 😊", "timestamp": "2026-03-01T17:27:20.804628"}
{"role": "user", "content": "你好,今天深圳天气怎么样", "timestamp": "2026-03-01T17:28:53.205910"}
{"role": "assistant", "content": "🌤️ 深圳今日天气(3月1日 周日)\n\n**当前状况**:阴天 \n**气温**:24°C(最高26°C) \n**风速**:13 km/h 西北风 \n**能见度**:10 km \n**降水量**:0.0 mm\n\n**今日分时段预报**:\n- 早晨:局部小雨,19°C,湿度82%,风速18-22 km/h\n- 中午:局部小雨,21°C,湿度83%,风速17-19 km/h\n- 傍晚:局部小雨,21°C,湿度80%,风速13-16 km/h\n- 夜间:多云,21°C,风速10-14 km/h", "timestamp": "2026-03-01T17:28:53.205931"}
在保存新消息时,_save_turn 方法做了几件重要的清理工作。
-
截断过长的工具结果——把超过 500 字符的工具输出截断并追加
... (truncated)标记。这是因为工具(比如exec、read_file)可能返回大量文本,如果全部保存会让 session 文件迅速膨胀。 -
替换 base64 图片——把内嵌的 base64 编码图片替换为
[image]占位符,一张图片可能有几百 KB 的 base64 文本,替换后只有几个字符。 -
剥离运行时上下文——检测到
_RUNTIME_CONTEXT_TAG标记的内容会被剥离,因为时间戳等信息在下一轮对话中已经过时。 -
跳过空的 assistant 消息——如果 assistant 消息既没有文本也没有工具调用,就不保存,因为空消息会"毒化"后续的上下文。
def _save_turn(self, session, messages, skip):
for m in messages[skip:]:
entry = dict(m)
role, content = entry.get("role"), entry.get("content")
if role == "assistant"andnot content andnot entry.get("tool_calls"):
continue# 跳过空 assistant 消息
if role == "tool"and isinstance(content, str) and len(content) > self._TOOL_RESULT_MAX_CHARS:
entry["content"] = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)"
elif role == "user":
if isinstance(content, str) and content.startswith(ContextBuilder._RUNTIME_CONTEXT_TAG):
parts = content.split("\n\n", 1)
if len(parts) > 1and parts[1].strip():
entry["content"] = parts[1]
else:
continue
entry.setdefault("timestamp", datetime.now().isoformat())
session.messages.append(entry)
get_history 方法在提取历史消息时,还有一个精巧的对齐逻辑——始终从一条 user 角色的消息开始返回。
这样做的原因是:如果历史消息的开头是一个孤立的 tool 消息(没有对应的 assistant 消息带 tool_calls),LLM API 会直接报错。对齐到 user 消息可以保证消息序列始终是合法的。
3. 记忆系统
记忆系统是 nanobot 设计中最值得深入探讨的部分。
传统的聊天机器人通常只有"短期记忆"——当前会话的上下文窗口。一旦对话太长或者开启新会话,之前的信息就全部丢失了。
nanobot 的记忆系统通过一种优雅的两层架构,同时解决了长期记忆和可检索性的问题。
3.1 双层记忆架构
nanobot 的记忆分为两层,物理上就是两个 Markdown 文件:
# nanobot/agent/memory.py
class MemoryStore:
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
def __init__(self, workspace: Path):
self.memory_dir = ensure_dir(workspace / "memory")
self.memory_file = self.memory_dir / "MEMORY.md"
self.history_file = self.memory_dir / "HISTORY.md"
用图来展示两层记忆的定位和关系:

MEMORY.md 是长期事实记忆,类似于人的"陈述性记忆"。 它保存用户的偏好、重要决策、关键信息等结构化知识。每次更新时是全量覆盖写入,由 LLM 负责合并新旧信息。例如,经过几次对话后,MEMORY.md 可能看起来像这样:
## 用户偏好
- 偏好使用 Python 编写代码
- 工作时区:UTC+8
## 项目信息
- 正在开发一个名为 "nanobot" 的 AI Agent 项目
- 使用 Python 3.12 + asyncio 架构
HISTORY.md 是事件日志,类似于人的"情景记忆"。它是只追加的、带时间戳的摘要记录,专门设计为 grep 友好的格式:
[2026-03-05 14:30] 用户询问了如何配置飞书渠道。讨论了 App ID 和 App Secret 的获取方式,成功完成了配置。
[2026-03-06 10:15] 帮助用户编写了一个 Python 脚本来分析 CSV 数据,使用了 pandas 库,输出结果保存在 output.json 中。
这种设计的巧妙之处在于:MEMORY.md 注入到 System Prompt 中,让 Agent 每次对话都"记得"用户的长期偏好;
而 HISTORY.md 虽然不注入 Prompt(太长了),但 Agent 可以通过 exec 工具运行 grep 命令来搜索历史记录。两层记忆相互补充,一个负责"常识性"的高频信息,一个负责"可检索"的低频信息。
3.2 记忆整合
记忆整合(consolidation)不是随时都在发生的,nanobot 精确地定义了两种触发时机:
-
自动触发——未整合消息数达到阈值。 在
_process_message方法中,每次处理用户消息时都会检查当前会话中未整合消息的数量。
当这个数量达到 memory_window(默认 100 条)时,自动启动一个异步的后台整合任务:
# nanobot/agent/loop.py — _process_message 方法中
unconsolidated = len(session.messages) - session.last_consolidated
if (unconsolidated >= self.memory_window and session.key notin self._consolidating):
self._consolidating.add(session.key)
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
asyncdef _consolidate_and_unlock():
try:
asyncwith lock:
await self._consolidate_memory(session)
finally:
self._consolidating.discard(session.key)
_task = asyncio.create_task(_consolidate_and_unlock())
self._consolidation_tasks.add(_task)
注意这里的几个保护机制:_consolidating 集合防止同一个会话重复触发整合;_consolidation_locks 字典为每个会话提供独立的锁,防止并发整合;
_consolidation_tasks 集合保持对 task 的强引用,防止被 GC 回收。整合是异步后台执行的——不会阻塞当前消息的处理,用户的这次对话照常继续。
-
手动触发——用户发送
/new命令。 当用户想开始一个全新的对话时,nanobot 会先对当前所有未整合的消息做一次全量归档(archive_all=True),确保信息不丢失,然后才清空会话:
if cmd == "/new":
snapshot = session.messages[session.last_consolidated:]
if snapshot:
temp = Session(key=session.key)
temp.messages = list(snapshot)
if not await self._consolidate_memory(temp, archive_all=True):
return OutboundMessage(content="Memory archival failed, session not cleared.")
session.clear()
self.sessions.save(session)
self.sessions.invalidate(session.key)
return OutboundMessage(content="New session started.")
与自动整合不同,/new 触发的整合是同步阻塞的——必须等到整合成功后才清空会话。如果归档失败,会话不会被清空,用户会看到一条错误提示。这是一个关键的防御性设计。
另外注意 invalidate 调用——它从内存缓存中移除旧的 session 对象,确保下次使用时从磁盘重新加载干净的会话。
理解了触发时机后,我们来看整合过程本身。下面用图展示完整的整合流程:
整合过程的核心是一次 LLM 调用,但方式非常有创意——nanobot 定义了一个虚拟的 save_memory 工具:
_SAVE_MEMORY_TOOL = [{
"type": "function",
"function": {
"name": "save_memory",
"parameters": {
"type": "object",
"properties": {
"history_entry": {
"type": "string",
"description": "A paragraph (2-5 sentences) summarizing key events. "
"Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.",
},
"memory_update": {
"type": "string",
"description": "Full updated long-term memory as markdown. Include all existing "
"facts plus new ones. Return unchanged if nothing new.",
},
},
"required": ["history_entry", "memory_update"],
},
},
}]
然后把旧对话和当前记忆内容一起发给一个专门的 "memory consolidation agent",让它通过调用这个虚拟工具来"决定"应该记住什么:
async def consolidate(self, session, provider, model, *, archive_all=False, memory_window=50):
# 确定要整合的消息范围
if archive_all:
old_messages = session.messages
keep_count = 0
else:
keep_count = memory_window // 2
old_messages = session.messages[session.last_consolidated:-keep_count]
# 格式化为带时间戳的文本
lines = []
for m in old_messages:
ifnot m.get("content"):
continue
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}: {m['content']}")
current_memory = self.read_long_term()
prompt = f"""Process this conversation and call the save_memory tool with your consolidation.
## Current Long-term Memory
{current_memory or "(empty)"}
## Conversation to Process
{chr(10).join(lines)}"""
response = await provider.chat(
messages=[
{"role": "system", "content": "You are a memory consolidation agent. "
"Call the save_memory tool with your consolidation of the conversation."},
{"role": "user", "content": prompt},
],
tools=_SAVE_MEMORY_TOOL,
model=model,
)
# 从工具调用中提取结果并写入
args = response.tool_calls[0].arguments
if entry := args.get("history_entry"):
self.append_history(entry) # 追加到 HISTORY.md
if update := args.get("memory_update"):
if update != current_memory:
self.write_long_term(update) # 覆盖 MEMORY.md
# 更新指针
session.last_consolidated = 0if archive_all else len(session.messages) - keep_count
为什么要通过虚拟工具调用,而不是直接让 LLM 输出文本然后解析?因为工具调用的输出格式是确定的 JSON,远比自由文本更容易解析和校验。
LLM 在工具调用模式下的输出格式化能力也更强。这个"虚拟工具调用"技巧在 nanobot 中被多次使用,Heartbeat 服务中也有类似的设计。
整合时采用的"保留一半"策略(keep_count = memory_window // 2)也值得关注——只整合前一半的旧消息,后一半保留在活跃上下文中。这确保了不会出现"记忆断层"——刚聊过的内容不会突然被整合掉。
对于 memory_update 字段,LLM 被要求"返回完整的更新后的长期记忆,包含所有现有事实加上新事实",这保证了 MEMORY.md 始终是一个完整的状态快照,而不是需要手动合并的增量。
3.3 记忆注入
记忆最终在 System Prompt 的第三层被注入:
memory = self.memory.get_memory_context()
if memory:
parts.append(f"# Memory\n\n{memory}")
get_memory_context() 很简单——读取 MEMORY.md 的内容,加上一个 "## Long-term Memory" 标题。这意味着 Agent 在每次对话开始时,都会在 System Prompt 中看到完整的长期记忆,自然而然地把这些知识融入到回答中,就像一个了解你的老朋友。
4. Skills
nanobot 的技能系统提供了一种轻量级的能力扩展机制。每个技能就是一个 Markdown 文件(SKILL.md),放在特定目录下即可被自动发现。
4.1 Skills 发现和加载
技能有两个来源:工作区自定义技能(workspace/skills/)和内置技能(nanobot/skills/),工作区技能优先级更高,可以覆盖同名的内置技能:
技能加载优先级:
workspace/skills/weather/SKILL.md ◄── 优先使用(用户自定义)
nanobot/skills/weather/SKILL.md ◄── 被同名工作区技能覆盖
nanobot/skills/github/SKILL.md ◄── 无同名覆盖,正常加载
每个技能可以在 YAML frontmatter 中声明依赖,比如需要某个命令行工具或环境变量。_check_requirements 会在加载时检查这些依赖是否满足,不满足的技能会被标记为不可用(但仍然展示在目录中——Agent 可以尝试帮用户安装缺失的依赖)。
4.2 渐进式加载
技能在 System Prompt 中以 XML 摘要的形式呈现,Agent 只有在需要时才通过 read_file 加载完整内容:
def build_skills_summary(self) -> str:
lines = ["<skills>"]
for s in all_skills:
lines.append(f" <skill available=\"{str(available).lower()}\">")
lines.append(f" <name>{name}</name>")
lines.append(f" <description>{desc}</description>")
lines.append(f" <location>{path}</location>")
lines.append(" </skill>")
lines.append("</skills>")
return "\n".join(lines)
使用 XML 而不是 JSON 或 Markdown 列表来表示技能清单,是因为 LLM 对 XML 结构化数据的理解能力通常更好,尤其是在需要从中提取特定字段(如 location)时。
5. MCP 集成
MCP(Model Context Protocol)是 Anthropic 提出的 AI 工具协议标准,nanobot 对它的支持非常优雅——外部 MCP 工具注册后和内建工具完全等价,Agent 无需知道一个工具到底是内建的还是来自外部 MCP 服务器。
# nanobot/agent/tools/mcp.py
class MCPToolWrapper(Tool):
"""将 MCP 服务器的工具包装为 nanobot 原生 Tool。"""
def __init__(self, session, server_name: str, tool_def, tool_timeout: int = 30):
self._session = session
self._original_name = tool_def.name
self._name = f"mcp_{server_name}_{tool_def.name}"
self._description = tool_def.description or tool_def.name
self._parameters = tool_def.inputSchema or {"type": "object", "properties": {}}
asyncdef execute(self, **kwargs: Any) -> str:
result = await asyncio.wait_for(
self._session.call_tool(self._original_name, arguments=kwargs),
timeout=self._tool_timeout,
)
parts = []
for block in result.content:
if isinstance(block, types.TextContent):
parts.append(block.text)
else:
parts.append(str(block))
return"\n".join(parts) or"(no output)"
MCP 的连接流程可以用图来展示:
工具名称遵循 mcp_{server_name}_{tool_name} 的命名规则,避免不同 MCP 服务器之间的名称冲突。连接过程是懒加载的——只有在第一条消息到来时才会尝试连接 MCP 服务器,而不是在启动时就全部初始化。如果连接失败,会在下一条消息到来时重试。
MCP 同时支持两种传输方式:stdio(通过子进程通信,适合本地工具)和 HTTP(通过 Streamable HTTP 协议,适合远程服务)。
在 HTTP 模式下,nanobot 显式创建了一个没有超时限制的 httpx.AsyncClient,防止 httpx 默认的 5 秒超时抢先于更高层的工具超时。
6. Subagent
当用户提出一个耗时较长的任务时,Agent 可以通过 spawn 工具创建一个子代理(subagent)在后台执行,立即返回用户控制权。
# nanobot/agent/subagent.py
async def spawn(self, task, label=None, origin_channel="cli", origin_chat_id="direct", session_key=None):
task_id = str(uuid.uuid4())[:8]
bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin)
)
self._running_tasks[task_id] = bg_task
return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."
Subagent 的完整生命周期如下图所示:

Subagent 的几个关键设计特点值得注意。首先,它拥有精简的工具集——没有 message(不能直接发消息给用户)和 spawn(不能再生成子代理)。这种限制是有意为之的,防止子代理产生意外的副作用或者无限递归。
其次,最大迭代次数限制为 15 次(主 Agent 是 40 次),因为后台任务应该是相对短小的。
子代理完成后的结果通知机制很巧妙——它不是直接发消息给用户,而是将结果作为一条 system 渠道的 InboundMessage 注入回消息总线:
async def _announce_result(self, task_id, label, task, result, origin, status):
announce_content = f"""[Subagent '{label}' completed]
Task: {task}
Result: {result}
Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like "subagent" or task IDs."""
msg = InboundMessage(
channel="system",
sender_id="subagent",
chat_id=f"{origin['channel']}:{origin['chat_id']}",
content=announce_content,
)
await self.bus.publish_inbound(msg)
这条系统消息会被主 Agent 当作普通消息处理,经过一轮 LLM 推理后,用自然语言把结果转述给用户。
"Summarize this naturally for the user" 和 "Do not mention technical details like subagent or task IDs" 这些指令确保了用户看到的不是冰冷的任务报告,而是自然流畅的对话回复。这就是好的产品设计——技术细节被隐藏在幕后。
Subagent 还支持按会话批量取消(cancel_by_session),当用户发送 /stop 时,对应会话的所有子代理任务会被一起取消。
7. Cron 与 Heartbeat
到目前为止,我们看到的都是"用户主动发消息 → Agent 响应"的被动模式。但 nanobot 还有两个机制让 Agent 能够主动工作:定时任务(Cron)和心跳(Heartbeat)。
8.1 Cron 系统
Cron 系统允许 Agent 自己设置定时任务。用户可以说"每天早上 9 点给我发一份市场简报",Agent 会通过 cron 工具创建一个定时任务。 Cron 支持三种调度方式:

任务数据存储在 jobs.json 中,CronService 的调度核心是 _arm_timer 方法——它计算出最近一个待执行任务的时间,用 asyncio.sleep 精确等待到那个时刻。支持外部修改 jobs.json 后自动热重载,因为每次触发时会检查文件的 mtime。
定时任务触发后的执行流程很有意思——它不是简单地执行预定义的操作,而是把任务指令发给 Agent,让 Agent 自行决定怎么做:
async def on_cron_job(job: CronJob) -> str | None:
reminder_note = (
"[Scheduled Task] Timer finished.\n\n"
f"Task '{job.name}' has been triggered.\n"
f"Scheduled instruction: {job.payload.message}"
)
# 禁止在 cron 任务中再创建 cron(通过 ContextVar 标记)
cron_tool = agent.tools.get("cron")
cron_token = None
if isinstance(cron_tool, CronTool):
cron_token = cron_tool.set_cron_context(True)
try:
response = await agent.process_direct(
reminder_note,
session_key=f"cron:{job.id}",
channel=job.payload.channel or"cli",
chat_id=job.payload.to or"direct",
)
finally:
if isinstance(cron_tool, CronTool) and cron_token isnotNone:
cron_tool.reset_cron_context(cron_token)
这意味着定时任务可以是任意复杂的自然语言指令,比如"搜索今天的比特币价格并分析趋势"。Agent 会像处理普通用户消息一样,使用所有可用的工具来完成任务。每个 cron 任务有独立的 session(cron:{job_id}),所以任务之间的上下文互不干扰。
安全机制方面——CronTool 使用 Python 的 ContextVar 来标记当前是否处于 cron 执行上下文中。ContextVar 的特性是协程安全的——即使有多个任务并发执行,每个协程看到的值也是独立的。当 Agent 在 cron 上下文中尝试调用 cron add 时,会直接返回错误 "cannot schedule new jobs from within a cron job execution",从根本上阻止了任务无限增殖的可能。
执行完成后,如果任务配置了 deliver=True 且有目标渠道,结果会通过 MessageBus 发送给用户。这里还有一个细节——如果 Agent 在处理过程中已经通过 message 工具主动发过消息了(message_tool._sent_in_turn),就不再重复发送,避免用户收到两条相同的消息。
8.2 Heartbeat 服务
Heartbeat 是一个更加"智能"的周期性唤醒机制。它每隔一段时间(默认 30 分钟)检查工作区中的 HEARTBEAT.md 文件,判断是否有需要处理的任务。
关键的创新在于判断逻辑——它不是用正则表达式或关键词匹配来判断文件内容,而是引入了一个两阶段 LLM 决策过程:

async def _decide(self, content: str) -> tuple[str, str]:
"""Phase 1: 让 LLM 通过虚拟工具调用来决定是否需要执行。"""
response = await self.provider.chat(
messages=[
{"role": "system", "content": "You are a heartbeat agent. "
"Call the heartbeat tool to report your decision."},
{"role": "user", "content": f"Review the following HEARTBEAT.md and decide "
f"whether there are active tasks.\n\n{content}"},
],
tools=_HEARTBEAT_TOOL,
model=self.model,
)
args = response.tool_calls[0].arguments
return args.get("action", "skip"), args.get("tasks", "")
LLM 通过调用虚拟的 heartbeat 工具返回决策——skip(无事可做)或 run(有任务需要执行)。只有当决策为 run 时,才会触发完整的 Agent 执行流程(Phase 2)。这种两阶段设计的好处是:大多数时候 HEARTBEAT.md 中没有待办任务,Phase 1 的轻量调用就足够判断了(只消耗少量 Token),避免了不必要地启动完整的 Agent 循环。
HEARTBEAT.md 的模板设计也很友好——分为"Active Tasks"和"Completed"两个区域,用户只需要把待办事项写在"Active Tasks"下面,完成的任务移到"Completed"下面。Agent 会自动理解这些任务并执行。
8. Gateway:从飞书消息到用户回复的完整链路
现在让我们把所有模块串联起来,完整地追踪一条飞书消息从发送到收到回复的全部旅程。
当用户通过 nanobot gateway 命令启动系统时,Gateway 模式会依次启动所有服务:
nanobot gateway
├── CronService.start() # 启动定时任务调度
├── HeartbeatService.start() # 启动心跳服务
├── AgentLoop.run() # 启动 Agent 主循环
└── ChannelManager.start_all() # 启动所有渠道(含飞书)
├── FeishuChannel.start() # WebSocket 长连接
├── _dispatch_outbound() # 启动出站消息分发器
└── ...其他渠道
整个请求链路如下:

_set_tool_context 是一个容易被忽视但非常重要的环节——它在处理每条消息前,把当前的 channel 和 chat_id 信息注入到 message、spawn 和 cron 工具中。
这样 Agent 在使用这些工具时就知道应该把消息发到哪里、子代理完成后应该通知谁、定时任务触发后应该发送到哪个渠道的哪个聊天。
9. 总结
nanobot 用约 4000 行 Python 代码,实现了一个功能完备的 AI Agent 系统。它的每一个设计决策都在追求一个平衡点:足够简单以便于理解和维护,又足够完整以覆盖真实的使用场景。
Node 社群
我组建了一个氛围特别好的 Node.js 社群,里面有很多 Node.js小伙伴,如果你对Node.js学习感兴趣的话(后续有计划也可以),我们可以一起进行Node.js相关的交流、学习、共建。下方加 考拉 好友回复「Node」即可。
“分享、点赞、在看” 支持一波👍
更多推荐

所有评论(0)