LangGraph 扩展开发:自定义节点与插件生态的构建方法
LangChain生态中的LangGraph,是构建状态驱动的、循环可控的Agent应用的革命性工具——它不再是简单的链式调用,而是用图计算的思维,让Agent拥有“分支判断、循环重试、状态共享”等类似人类解决问题的能力。但随着业务场景的复杂化,LangGraph官方节点库(如ToolNodeLLMNode)往往无法完全满足需求:可能需要对接公司内部定制化的业务API、实现领域特定的状态转换逻辑、
LangGraph 扩展开发:自定义节点与插件生态的构建方法
关键词
LangGraph、自定义节点、插件生态、Agent框架、图计算、工具扩展、LangChain
摘要
LangChain生态中的LangGraph,是构建状态驱动的、循环可控的Agent应用的革命性工具——它不再是简单的链式调用,而是用图计算的思维,让Agent拥有“分支判断、循环重试、状态共享”等类似人类解决问题的能力。但随着业务场景的复杂化,LangGraph官方节点库(如ToolNode、LLMNode、ConditionNode)往往无法完全满足需求:可能需要对接公司内部定制化的业务API、实现领域特定的状态转换逻辑、甚至开发整套可复用的插件包快速搭建同类场景的Agent。
本文将从0到1拆解LangGraph的核心原理,带你理解为什么官方节点“够用但不够快、不够专”;然后用生动的“厨房做菜”类比解释状态、节点、边、子图的关系;接着手把手教你实现三大类自定义节点(工具增强型、状态转换型、领域逻辑型);再深入到插件生态的构建全流程(从单插件封装,到插件仓库的设计与部署,再到插件的版本管理与安全机制);最后结合电商售后智能客服、医疗知识图谱问答、科研论文写作助手三个真实案例,展示如何把自定义节点和插件组合成强大的Agent系统。
全文约10500字,包含20+行Mermaid图表、300+行Python代码、多个数学模型解释,适合已经掌握LangChain基础用法、想深入开发Agent扩展的开发者阅读。
正文
1. 背景介绍:从链式调用到图计算,LangGraph为什么需要扩展?
核心概念
LangChain、LangGraph、状态驱动Agent、LLM节点、节点扩展、插件生态
问题背景
2022年底ChatGPT引爆大语言模型(LLM),随后LangChain的出现,让开发者可以像“搭积木”一样快速串联LLM、向量库、搜索引擎等工具,构建出第一代“问答型、单流程”的应用——比如给PDF加索引的问答机器人、简单的日程安排助手。这些应用虽然简单,但解决了从0到1的问题,很快就火遍了整个AI开发圈。
但随着应用从“玩具级”走向“生产级”,开发者很快遇到了LangChain原始链式调用的三大致命缺陷:
- 无法循环重试:比如让LLM写Python代码,代码执行报错后,链式调用只能“戛然而止”,不能把错误信息反馈给LLM让它重新修改;
- 状态管理混乱:如果一个流程需要多个节点共享同一个数据(比如电商客服需要先获取用户ID、再查订单状态、再调用售后政策库,这些数据都需要在节点间传递),链式调用只能靠
input/output层层传递,代码变得冗长且容易出错; - 分支逻辑硬编码脆弱:链式调用虽然有
SimpleSequentialChain和RouterChain两种分支方式,但RouterChain的规则是用LLM生成自然语言标签再匹配,不够灵活也不够可靠,硬编码分支又会让代码难以维护。
为了解决这些问题,LangChain团队在2023年6月推出了LangGraph——这是一个基于有限状态机(FSM)和图计算的框架,它把Agent的整个工作流程抽象成一张有向图,每个节点是一个执行单元(可以是LLM调用、工具执行、状态修改等),每条边是节点间的状态转移规则(可以是固定顺序、条件判断、循环跳转等),整个Agent的运行状态保存在一个共享的状态字典里,所有节点都可以读取和修改这个字典。
LangGraph推出后,迅速成为了Agent开发的主流框架——截至2024年6月,GitHub上LangGraph的Stars已经突破30k,LangChain官方也把它作为了新一代Agent开发的核心工具。但就像所有的开源框架一样,LangGraph的**官方节点库(LangGraph Prebuilt Nodes)**只覆盖了80%的通用场景,剩下的20%(甚至更多的生产级场景)需要开发者自己扩展:
- 比如电商公司需要对接内部的ERP系统查订单、CRM系统查用户信息、售后系统提交工单;
- 比如医疗行业需要对接内部的知识图谱API、影像识别API、电子病历系统API;
- 比如游戏公司需要对接内部的游戏服务器API、玩家数据API、支付系统API;
- 甚至有些通用场景,官方节点的实现也不够优化——比如官方的
ToolNode每次调用工具前都会把整个状态字典传给LLM作为上下文,对于状态很大的Agent,这会严重增加Token消耗和LLM的响应时间。
为了解决这些生产级的扩展需求,开发者需要掌握三大技能:
- 理解LangGraph的核心原理:只有理解了状态、节点、边、子图的底层实现,才能写出高效、稳定的扩展代码;
- 开发自定义节点:针对特定的业务需求,开发可复用的执行单元;
- 构建插件生态:把多个相关的自定义节点、状态定义、边规则打包成一个插件,方便自己和团队其他开发者快速使用。
目标读者
- 已经掌握Python基础语法和LangChain基础用法的开发者;
- 想用LangGraph构建生产级Agent应用的开发者;
- 想为LangGraph生态贡献插件的开源爱好者;
- 想深入理解Agent框架底层原理的AI爱好者。
核心问题或挑战
在深入讲解技术细节之前,我们先提出三个贯穿全文的核心问题,这些问题也是生产级Agent开发中最常见的挑战:
- 自定义节点应该如何设计?:是应该继承官方的节点基类?还是应该自己实现一个完全独立的节点?如何确保自定义节点的可复用性、可测试性和安全性?
- 状态管理应该如何优化?:共享状态字典的大小会影响Agent的性能和Token消耗,如何设计状态的粒度?如何避免状态污染?如何实现状态的持久化?
- 插件生态应该如何构建?:单插件应该包含哪些内容?插件仓库应该如何设计和部署?如何管理插件的版本?如何确保插件的安全性?
接下来的内容,我们将一步步解决这些问题。
2. 核心概念解析:用“厨房做菜”理解LangGraph的图计算思维
核心概念
共享状态、节点(Node)、边(Edge)、子图(Subgraph)、入口节点(Entry Point)、出口节点(Exit Point)、条件边(Conditional Edge)、循环边(Loop Edge)、状态键(State Key)、状态更新规则(State Update Rule)
问题背景
很多开发者刚接触LangGraph时,都会被“状态驱动的有向图”这个概念搞得晕头转向——到底什么是状态?节点和边在图里到底是怎么工作的?共享状态和链式调用的层层传递到底有什么区别?
为了解释这些抽象的概念,我们用一个大家都熟悉的**“厨房做番茄炒蛋”**的例子来类比——假设我们要请朋友来家里吃饭,做番茄炒蛋是其中的一道菜,我们可以把整个做番茄炒蛋的流程抽象成一张LangGraph的有向图。
核心概念的生活化类比
我们先定义好“厨房做菜”这个类比里的各个角色,然后对应到LangGraph的核心概念:
| 厨房做菜的角色/元素 | LangGraph的核心概念 | 详细说明 |
|---|---|---|
| 厨师的大脑和手上的食材清单、工具清单 | 共享状态字典 | 保存整个流程中所有需要共享的信息——比如食材清单(有没有番茄、有没有鸡蛋、有没有盐)、工具清单(有没有锅、有没有铲子、有没有刀)、当前进度(有没有切番茄、有没有打鸡蛋、有没有下锅炒)、朋友的口味偏好(喜欢甜口还是咸口) |
| 厨师做的每一个具体动作(切番茄、打鸡蛋、尝咸淡、加调料) | 节点(Node) | 执行单元——每个节点都可以读取共享状态字典里的信息,也可以修改共享状态字典里的信息,还可以执行一些外部操作(比如打开冰箱拿鸡蛋、打开水龙头洗番茄) |
| 厨师动作之间的逻辑关系(切完番茄才能打鸡蛋?还是可以同时切番茄和打鸡蛋?尝咸淡后如果太咸就加白糖,太淡就加盐,如果刚好就出锅) | 边(Edge) | 节点间的状态转移规则——可以是固定顺序的普通边,也可以是根据状态判断的条件边,还可以是回到之前节点的循环边 |
| 把做番茄炒蛋的流程单独拿出来,作为“做一桌菜”这个大流程里的一部分 | 子图(Subgraph) | 可复用的图组件——可以把子图当成一个“超级节点”,嵌入到更大的图里,这样可以把复杂的大流程拆分成多个简单的小流程,方便维护和测试 |
| 准备开始做番茄炒蛋(比如先看一下朋友的口味偏好) | 入口节点(Entry Point) | 整个图的起始节点——Agent运行时会从入口节点开始执行 |
| 番茄炒蛋做好了,可以端上桌了 | 出口节点(Exit Point) | 整个图的结束节点——Agent运行到出口节点时会停止 |
共享状态字典里的每一个具体信息(比如has_tomato: True、friend_taste: "sweet"、current_progress: "cutting_tomato") |
状态键(State Key) | 状态字典的基本单元——每个状态键都有一个对应的值,值的类型可以是字符串、数字、布尔值、列表、字典等Python的任意数据类型 |
厨师修改食材清单的规则(比如如果用了一个番茄,就把has_tomato的值从True改成False;如果加了一勺盐,就把salt_level的值从0改成1) |
状态更新规则(State Update Rule) | 控制节点如何修改共享状态字典——默认情况下,LangGraph使用**字典合并(Dictionary Merge)**的规则,也就是节点返回的状态字典会和原来的共享状态字典合并,相同的键会被覆盖,不同的键会保留;但开发者也可以自定义状态更新规则,比如用列表追加、数值累加等。 |
现在,我们把“厨房做番茄炒蛋”的具体流程画成一张Mermaid的有向图,对应到LangGraph的核心概念:
从这张图里,我们可以清楚地看到LangGraph的几个核心优势:
- 并行执行:我们可以把“切番茄”和“打鸡蛋”这两个独立的任务放在一个并行子图里执行,节省时间——这在链式调用里是很难实现的;
- 循环重试:如果食材和工具不齐全,我们可以回到“准备食材和工具”的节点重新检查;如果尝咸淡不符合口味,我们可以回到“下锅炒”的节点重新调整——这在链式调用里是完全不可能的;
- 条件判断灵活:我们可以根据共享状态字典里的
friend_taste、salt_level、sugar_level等信息,灵活地调整边的转移规则——这比LangChain原始的RouterChain可靠得多; - 状态共享清晰:所有需要共享的信息都保存在一个共享状态字典里,所有节点都可以读取和修改——这比链式调用的层层传递要清晰得多,也不容易出错。
概念间的关系和相互作用
现在,我们已经用生活化的类比解释了LangGraph的核心概念,接下来我们用更专业的方式来梳理这些概念间的关系和相互作用。
概念核心属性维度对比
为了更清楚地理解各个概念的区别,我们先做一个概念核心属性维度对比表:
| 核心概念 | 核心属性1:类型 | 核心属性2:是否可执行 | 核心属性3:是否可共享 | 核心属性4:是否可复用 | 核心属性5:是否有输入输出 |
|---|---|---|---|---|---|
| 共享状态字典 | 数据结构(通常是TypedDict) | 否 | 是(所有节点共享) | 是(可以在不同的图里复用) | 是(所有节点的输入和输出都是它的子集) |
| 节点(Node) | 执行单元(函数或类的实例) | 是 | 否(每个节点独立,但可以读取共享状态) | 是(可以在不同的图里复用) | 是(输入是共享状态字典的子集,输出是状态更新字典) |
| 边(Edge) | 状态转移规则(函数或固定字符串) | 否 | 否(每条边属于特定的两个节点) | 是(可以在不同的图里复用规则函数) | 否(只有输入——当前节点的输出状态,没有输出——只有下一个节点的ID或END) |
| 子图(Subgraph) | 可复用的图组件(本质上也是一个节点) | 是 | 是(内部节点共享子图的局部状态,也可以访问外部图的全局状态) | 是(可以嵌入到任意大的图里) | 是(输入是外部图的全局状态的子集,输出是状态更新字典) |
| 入口节点(Entry Point) | 特殊的节点(通常是图的第一个节点) | 是 | 否 | 否(每个图只有一个固定的入口节点,但可以动态指定) | 是(输入是初始化状态,输出是状态更新字典) |
| 出口节点(Exit Point) | 特殊的节点(通常是END常量,也可以是自定义节点) |
否(END是常量,自定义节点可以是) |
否 | 是(END可以在所有图里复用) |
否(END没有输入输出) |
概念联系的ER实体关系图
接下来,我们用一张Mermaid ER实体关系图来梳理这些概念间的联系:
从这张ER图里,我们可以看到:
- 共享状态字典是整个LangGraph的核心——所有的节点、子图都围绕着它工作;
- **图(Graph)**是LangGraph的最高级容器——它包含了全局共享状态、所有的节点、边、子图、入口节点和出口节点;
- **子图(Subgraph)**本质上也是一个图——它包含了局部共享状态、内部节点、内部边、内部入口节点和内部出口节点,同时也可以作为一个超级节点嵌入到更大的图里;
- 状态更新规则控制着节点如何修改共享状态——默认是字典合并,但开发者可以自定义。
概念交互关系图
最后,我们用一张Mermaid交互关系图来展示这些概念在Agent运行时的交互过程:
从这张交互关系图里,我们可以清楚地看到Agent运行时的完整流程:
- 用户初始化Agent,传入初始化状态;
- LangGraph运行时创建全局共享状态字典,合并初始化状态;
- LangGraph运行时从入口节点开始执行,依次执行每个节点;
- 每个节点读取全局共享状态,执行内部逻辑,返回状态更新字典,合并到全局共享状态;
- 每个节点返回下一个节点的ID(或
END); - 如果遇到条件节点,根据条件判断返回不同的下一个节点ID;
- 如果遇到循环边,回到之前的节点重新执行;
- 当执行到
END常量时,Agent停止运行,返回最终的全局共享状态。
3. 技术原理与实现:深入LangGraph的底层源码
核心概念
BaseState、BaseNode、StateGraph、END、GraphRuntime、状态更新器(StateUpdater)、边解析器(EdgeResolver)
问题背景
上一章我们用生活化的类比和图表解释了LangGraph的核心概念,但要写出高效、稳定的自定义节点和插件,我们还需要深入理解LangGraph的底层源码实现——比如共享状态字典是怎么定义的?节点是怎么执行的?边是怎么解析的?状态是怎么更新的?
在这一章里,我们将基于LangGraph 0.2.x版本的源码(这是截至2024年6月的最新稳定版本),一步步拆解LangGraph的核心组件。为了方便讲解,我们会简化一些源码细节,但会保留核心的逻辑。
核心组件的底层实现
LangGraph的核心组件主要有五个:
- BaseState:共享状态字典的基类——所有的共享状态字典都必须继承自它;
- BaseNode:节点的基类——所有的自定义节点都必须继承自它;
- StateGraph:图的构建类——开发者用它来添加节点、边、子图,构建完整的Agent工作流程;
- GraphRuntime:图的运行时类——负责执行Agent工作流程,管理共享状态、节点执行、边解析、状态更新;
- END:出口节点常量——当节点返回
END时,Agent停止运行。
接下来,我们分别讲解这五个核心组件的底层实现。
1. BaseState:共享状态字典的基类
在LangGraph 0.2.x版本之前,共享状态字典通常是用Python的TypedDict来定义的——TypedDict可以给字典的键加上类型提示,方便代码的可读性和静态类型检查(比如用mypy)。但TypedDict有一个缺点:它是一个静态的类型定义,不能在运行时动态添加或删除键,也不能自定义状态更新规则。
为了解决这些问题,LangGraph 0.2.x版本推出了BaseState基类——它本质上是一个继承自dict的Python类,但添加了以下几个核心功能:
- 类型安全的初始化:可以在运行时检查传入的初始化状态是否符合类型定义;
- 自定义状态更新规则:可以重写
__add__方法(或者用@add_updater装饰器),为不同的状态键定义不同的更新规则; - 状态序列化与反序列化:可以轻松地将状态字典序列化为JSON或YAML,也可以反序列化回来;
- 状态历史记录:可以记录状态的每一次更新,方便调试和回溯。
为了更清楚地理解BaseState的实现,我们来看一个简化版的源码:
from typing import Any, Dict, Type, TypeVar, Optional, Callable
from pydantic import BaseModel, Field, ValidationError
# 定义一个泛型类型变量,用于类型提示
T = TypeVar('T', bound='BaseState')
class BaseState(dict):
"""
LangGraph共享状态字典的基类
本质上是一个继承自dict的Python类,但添加了类型安全、自定义更新规则、序列化等功能
"""
# 类变量:用于存储类型定义(可选,用Pydantic BaseModel定义)
schema: Optional[Type[BaseModel]] = None
# 类变量:用于存储自定义状态更新规则(键是状态键,值是更新函数)
updaters: Dict[str, Callable[[Any, Any], Any]] = {}
def __init__(self, *args, **kwargs):
"""
初始化共享状态字典
首先调用dict的__init__方法,然后如果有schema,就验证传入的状态是否符合类型定义
"""
super().__init__(*args, **kwargs)
if self.schema is not None:
self._validate()
def _validate(self) -> None:
"""
验证当前状态是否符合schema的类型定义
如果不符合,就抛出ValidationError
"""
try:
# 用Pydantic BaseModel验证当前状态
self.schema(**self)
except ValidationError as e:
raise ValueError(f"State validation failed: {e}") from e
def __add__(self: T, other: Dict[str, Any]) -> T:
"""
重写__add__方法,实现自定义状态更新规则
默认情况下,使用字典合并的规则(相同的键覆盖,不同的键保留)
如果某个状态键有自定义的updater,就用updater来更新
"""
# 创建一个新的状态字典,复制当前状态
new_state = self.__class__(self)
# 遍历other的键值对
for key, value in other.items():
# 如果key有自定义的updater
if key in self.updaters:
# 如果key在当前状态里,就用updater更新
if key in new_state:
new_state[key] = self.updaters[key](new_state[key], value)
# 如果key不在当前状态里,就直接赋值
else:
new_state[key] = value
# 如果key没有自定义的updater,就用默认的字典合并规则
else:
new_state[key] = value
# 如果有schema,就验证新状态是否符合类型定义
if new_state.schema is not None:
new_state._validate()
# 返回新状态
return new_state
@classmethod
def add_updater(cls, key: str, updater: Callable[[Any, Any], Any]) -> None:
"""
类方法:为某个状态键添加自定义的更新规则
"""
cls.updaters[key] = updater
def to_json(self) -> str:
"""
将状态字典序列化为JSON字符串
"""
import json
return json.dumps(self)
@classmethod
def from_json(cls: Type[T], json_str: str) -> T:
"""
从JSON字符串反序列化为状态字典
"""
import json
return cls(json.loads(json_str))
现在,我们用这个简化版的BaseState来定义一个“厨房做番茄炒蛋”的共享状态字典:
from pydantic import BaseModel, Field
from typing import Literal
# 第一步:定义Pydantic BaseModel作为schema,用于类型安全
class TomatoEggStateSchema(BaseModel):
friend_taste: Literal["sweet", "salty", "spicy"] = Field(..., description="朋友的口味偏好")
salt_level: int = Field(default=0, ge=0, le=10, description="盐的水平(0-10)")
sugar_level: int = Field(default=0, ge=0, le=10, description="糖的水平(0-10)")
current_progress: Literal["start", "prepare", "cut_and_beat", "cook", "taste", "serve"] = Field(default="start", description="当前进度")
has_tomato: bool = Field(default=True, description="有没有番茄")
has_egg: bool = Field(default=True, description="有没有鸡蛋")
has_salt: bool = Field(default=True, description="有没有盐")
has_sugar: bool = Field(default=True, description="有没有糖")
has_pot: bool = Field(default=True, description="有没有锅")
has_spatula: bool = Field(default=True, description="有没有铲子")
has_knife: bool = Field(default=True, description="有没有刀")
message_log: list[str] = Field(default_factory=list, description="消息日志")
# 第二步:定义继承自BaseState的共享状态字典类
class TomatoEggState(BaseState):
# 绑定schema
schema = TomatoEggStateSchema
# 为message_log添加自定义更新规则:列表追加
@classmethod
def _append_message_log(cls, old_value: list[str], new_value: str | list[str]) -> list[str]:
if isinstance(new_value, str):
return old_value + [new_value]
elif isinstance(new_value, list):
return old_value + new_value
else:
raise ValueError(f"Invalid value type for message_log: {type(new_value)}")
# 注册自定义更新规则
updaters = {
"message_log": _append_message_log
}
# 测试一下我们定义的共享状态字典
if __name__ == "__main__":
# 初始化状态
initial_state = TomatoEggState(
friend_taste="sweet",
message_log="开始做番茄炒蛋"
)
print("初始状态:")
print(initial_state)
print("\n")
# 测试默认的字典合并规则
updated_state1 = initial_state + {"current_progress": "prepare", "message_log": "准备食材和工具"}
print("第一次更新后的状态(测试默认合并和自定义追加):")
print(updated_state1)
print("\n")
# 测试自定义的列表追加规则(传入列表)
updated_state2 = updated_state1 + {"message_log": ["检查有没有番茄", "检查有没有鸡蛋", "检查有没有盐、糖、锅、铲子、刀"]}
print("第二次更新后的状态(测试自定义追加列表):")
print(updated_state2)
print("\n")
# 测试类型安全
try:
invalid_state = updated_state2 + {"salt_level": 11} # salt_level的最大值是10
except ValueError as e:
print("类型安全测试失败(预期的结果):")
print(e)
运行这段测试代码,我们会看到:
- 初始状态成功创建,符合schema的类型定义;
- 第一次更新后的状态,
current_progress被覆盖成了"prepare"(默认的字典合并规则),message_log被追加了一条新消息(自定义的更新规则); - 第二次更新后的状态,
message_log被追加了三条新消息(自定义的更新规则支持传入列表); - 类型安全测试成功,当我们试图把
salt_level设置成11时,抛出了ValueError。
2. BaseNode:节点的基类
在LangGraph 0.2.x版本之前,节点通常是用Python的函数来定义的——函数的输入是共享状态字典,输出是状态更新字典或下一个节点的ID(或END)。但函数式节点有一个缺点:它不能保存状态(比如LLM的API密钥、工具的配置等),每次执行都要重新初始化,这在生产级场景里会严重影响性能。
为了解决这些问题,LangGraph 0.2.x版本推出了BaseNode基类——它本质上是一个Python的抽象基类(ABC),所有的自定义节点都必须继承自它,并实现抽象方法__call__。BaseNode添加了以下几个核心功能:
- 可保存状态:可以在节点的
__init__方法里初始化一些持久化的状态(比如LLM的API密钥、工具的配置、数据库连接等); - 类型安全的输入输出:可以用Pydantic BaseModel定义节点的输入和输出类型,方便代码的可读性和静态类型检查;
- 可测试性:可以轻松地对节点进行单元测试,不需要启动整个Agent工作流程;
- 可组合性:可以把多个节点组合成一个更复杂的节点(比如子图)。
为了更清楚地理解BaseNode的实现,我们来看一个简化版的源码:
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type, TypeVar
from pydantic import BaseModel, Field, ValidationError
# 定义泛型类型变量
S = TypeVar('S', bound='BaseState')
I = TypeVar('I', bound=BaseModel)
O = TypeVar('O', bound=BaseModel)
class BaseNode(ABC):
"""
LangGraph节点的抽象基类
所有的自定义节点都必须继承自它,并实现抽象方法__call__
"""
# 类变量:节点的名称(可选,默认是类名)
name: str = None
# 类变量:节点的输入类型(可选,用Pydantic BaseModel定义)
input_schema: Optional[Type[I]] = None
# 类变量:节点的输出类型(可选,用Pydantic BaseModel定义,输出是状态更新字典的子集)
output_schema: Optional[Type[O]] = None
def __init__(self, name: Optional[str] = None):
"""
初始化节点
可以在这里初始化一些持久化的状态(比如LLM的API密钥、工具的配置、数据库连接等)
"""
if name is not None:
self.name = name
elif self.name is None:
self.name = self.__class__.__name__
@abstractmethod
def _execute(self, state: S) -> Dict[str, Any]:
"""
抽象方法:节点的核心执行逻辑
输入是共享状态字典,输出是状态更新字典
子类必须实现这个方法
"""
pass
def __call__(self, state: S) -> Dict[str, Any]:
"""
节点的调用入口
首先验证输入状态是否符合input_schema,然后调用_execute方法,最后验证输出状态是否符合output_schema
"""
# 第一步:验证输入状态是否符合input_schema(如果有)
if self.input_schema is not None:
try:
# 从共享状态字典里提取input_schema需要的键
input_data = {k: state[k] for k in self.input_schema.__fields__.keys() if k in state}
self.input_schema(**input_data)
except ValidationError as e:
raise ValueError(f"Input validation failed for node {self.name}: {e}") from e
# 第二步:调用_execute方法,执行核心逻辑
output_state = self._execute(state)
# 第三步:验证输出状态是否符合output_schema(如果有)
if self.output_schema is not None:
try:
self.output_schema(**output_state)
except ValidationError as e:
raise ValueError(f"Output validation failed for node {self.name}: {e}") from e
# 第四步:返回输出状态
return output_state
现在,我们用这个简化版的BaseNode来定义一个“厨房做番茄炒蛋”里的**“准备食材和工具”节点**:
from typing import Literal
# 第一步:定义节点的输入类型(用Pydantic BaseModel)
class PrepareNodeInput(BaseModel):
has_tomato: bool = Field(..., description="有没有番茄")
has_egg: bool = Field(..., description="有没有鸡蛋")
has_salt: bool = Field(..., description="有没有盐")
has_sugar: bool = Field(..., description="有没有糖")
has_pot: bool = Field(..., description="有没有锅")
has_spatula: bool = Field(..., description="有没有铲子")
has_knife: bool = Field(..., description="有没有刀")
# 第二步:定义节点的输出类型(用Pydantic BaseModel,是状态更新字典的子集)
class PrepareNodeOutput(BaseModel):
current_progress: Literal["prepare", "check"] = Field(..., description="当前进度")
all_items_ready: bool = Field(..., description="所有食材和工具是否齐全")
message_log: str = Field(..., description="消息日志")
# 第三步:定义继承自BaseNode的PrepareNode类
class PrepareNode(BaseNode):
# 绑定节点名称、输入类型、输出类型
name = "prepare_node"
input_schema = PrepareNodeInput
output_schema = PrepareNodeOutput
def _execute(self, state: TomatoEggState) -> Dict[str, Any]:
"""
节点的核心执行逻辑:检查所有食材和工具是否齐全
"""
# 检查所有食材和工具
all_items_ready = (
state["has_tomato"] and
state["has_egg"] and
state["has_salt"] and
state["has_sugar"] and
state["has_pot"] and
state["has_spatula"] and
state["has_knife"]
)
# 生成消息日志
if all_items_ready:
message_log = "所有食材和工具都准备好了,下一步是切番茄和打鸡蛋"
current_progress = "check"
else:
missing_items = []
if not state["has_tomato"]:
missing_items.append("番茄")
if not state["has_egg"]:
missing_items.append("鸡蛋")
if not state["has_salt"]:
missing_items.append("盐")
if not state["has_sugar"]:
missing_items.append("糖")
if not state["has_pot"]:
missing_items.append("锅")
if not state["has_spatula"]:
missing_items.append("铲子")
if not state["has_knife"]:
missing_items.append("刀")
message_log = f"缺少以下食材和工具:{', '.join(missing_items)},需要去买或借"
current_progress = "prepare"
# 返回状态更新字典
return {
"current_progress": current_progress,
"all_items_ready": all_items_ready,
"message_log": message_log
}
# 测试一下我们定义的PrepareNode
if __name__ == "__main__":
# 初始化状态
initial_state = TomatoEggState(
friend_taste="sweet",
message_log="开始做番茄炒蛋"
)
print("初始状态:")
print(initial_state)
print("\n")
# 初始化节点
prepare_node = PrepareNode()
# 第一次测试:所有食材和工具都齐全
output_state1 = prepare_node(initial_state)
print("第一次测试(所有食材和工具都齐全)的输出状态:")
print(output_state1)
print("\n")
# 第二次测试:缺少番茄和刀
initial_state2 = initial_state + {"has_tomato": False, "has_knife": False}
output_state2 = prepare_node(initial_state2)
print("第二次测试(缺少番茄和刀)的输出状态:")
print(output_state2)
print("\n")
# 第三次测试:类型安全(缺少has_egg)
try:
initial_state3 = initial_state.copy()
del initial_state3["has_egg"]
output_state3 = prepare_node(initial_state3)
except ValueError as e:
print("第三次测试(类型安全,缺少has_egg)的结果(预期的失败):")
print(e)
运行这段测试代码,我们会看到:
- 第一次测试成功,所有食材和工具都齐全,输出状态符合output_schema;
- 第二次测试成功,缺少番茄和刀,输出状态符合output_schema;
- 第三次测试成功,当我们删除了
has_egg键时,抛出了ValueError,因为input_schema要求必须有has_egg键。
3. StateGraph:图的构建类
StateGraph是LangGraph的图构建类——开发者用它来添加节点、边、子图,构建完整的Agent工作流程。StateGraph的核心功能有:
- 添加节点:可以添加函数式节点、类式节点(继承自BaseNode)、子图;
- 添加边:可以添加普通边(固定顺序)、条件边(根据状态判断)、循环边;
- 设置入口节点:可以设置图的入口节点;
- 编译图:可以将构建好的图编译成一个可运行的Agent(本质上是GraphRuntime的实例)。
为了更清楚地理解StateGraph的实现,我们来看一个简化版的源码:
from typing import Any, Dict, Optional, Callable, List, Union
from collections import defaultdict
# 定义END常量
END = "__end__"
class StateGraph:
"""
LangGraph的图构建类
开发者用它来添加节点、边、子图,构建完整的Agent工作流程
"""
def __init__(self, state_schema: Type[S]):
"""
初始化StateGraph
输入是共享状态字典的类(继承自BaseState)
"""
self.state_schema = state_schema
# 存储所有节点:键是节点ID,值是节点(函数或类的实例)
self.nodes: Dict[str, Union[Callable[[S], Dict[str, Any]], BaseNode]] = {}
# 存储所有边:键是起始节点ID,值是一个列表,列表里的元素是边的信息(可以是下一个节点ID,也可以是条件函数)
self.edges: Dict[str, List[Union[str, Callable[[S], str]]]] = defaultdict(list)
# 存储入口节点ID
self.entry_point: Optional[str] = None
def add_node(self, node_id: str, node: Union[Callable[[S], Dict[str, Any]], BaseNode]) -> "StateGraph":
"""
添加节点
输入是节点ID和节点(函数或类的实例)
返回self,方便链式调用
"""
if node_id in self.nodes:
raise ValueError(f"Node with ID {node_id} already exists")
if node_id == END:
raise ValueError(f"Node ID cannot be {END}")
self.nodes[node_id] = node
return self
def add_edge(self, start_node_id: str, end_node_id: str) -> "StateGraph":
"""
添加普通边(固定顺序)
输入是起始节点ID和结束节点ID
返回self,方便链式调用
"""
if start_node_id not in self.nodes and start_node_id != END:
raise ValueError(f"Start node with ID {start_node_id} does not exist")
if end_node_id not in self.nodes and end_node_id != END:
raise ValueError(f"End node with ID {end_node_id} does not exist")
self.edges[start_node_id].append(end_node_id)
return self
更多推荐

所有评论(0)