LangGraph 扩展开发:自定义节点与插件生态的构建方法


关键词

LangGraph、自定义节点、插件生态、Agent框架、图计算、工具扩展、LangChain


摘要

LangChain生态中的LangGraph,是构建状态驱动的、循环可控的Agent应用的革命性工具——它不再是简单的链式调用,而是用图计算的思维,让Agent拥有“分支判断、循环重试、状态共享”等类似人类解决问题的能力。但随着业务场景的复杂化,LangGraph官方节点库(如ToolNodeLLMNodeConditionNode)往往无法完全满足需求:可能需要对接公司内部定制化的业务API、实现领域特定的状态转换逻辑、甚至开发整套可复用的插件包快速搭建同类场景的Agent。

本文将从0到1拆解LangGraph的核心原理,带你理解为什么官方节点“够用但不够快、不够专”;然后用生动的“厨房做菜”类比解释状态、节点、边、子图的关系;接着手把手教你实现三大类自定义节点(工具增强型、状态转换型、领域逻辑型);再深入到插件生态的构建全流程(从单插件封装,到插件仓库的设计与部署,再到插件的版本管理与安全机制);最后结合电商售后智能客服、医疗知识图谱问答、科研论文写作助手三个真实案例,展示如何把自定义节点和插件组合成强大的Agent系统。

全文约10500字,包含20+行Mermaid图表、300+行Python代码、多个数学模型解释,适合已经掌握LangChain基础用法、想深入开发Agent扩展的开发者阅读。


正文


1. 背景介绍:从链式调用到图计算,LangGraph为什么需要扩展?

核心概念

LangChain、LangGraph、状态驱动Agent、LLM节点、节点扩展、插件生态

问题背景

2022年底ChatGPT引爆大语言模型(LLM),随后LangChain的出现,让开发者可以像“搭积木”一样快速串联LLM、向量库、搜索引擎等工具,构建出第一代“问答型、单流程”的应用——比如给PDF加索引的问答机器人、简单的日程安排助手。这些应用虽然简单,但解决了从0到1的问题,很快就火遍了整个AI开发圈。

但随着应用从“玩具级”走向“生产级”,开发者很快遇到了LangChain原始链式调用的三大致命缺陷

  1. 无法循环重试:比如让LLM写Python代码,代码执行报错后,链式调用只能“戛然而止”,不能把错误信息反馈给LLM让它重新修改;
  2. 状态管理混乱:如果一个流程需要多个节点共享同一个数据(比如电商客服需要先获取用户ID、再查订单状态、再调用售后政策库,这些数据都需要在节点间传递),链式调用只能靠input/output层层传递,代码变得冗长且容易出错;
  3. 分支逻辑硬编码脆弱:链式调用虽然有SimpleSequentialChainRouterChain两种分支方式,但RouterChain的规则是用LLM生成自然语言标签再匹配,不够灵活也不够可靠,硬编码分支又会让代码难以维护。

为了解决这些问题,LangChain团队在2023年6月推出了LangGraph——这是一个基于有限状态机(FSM)图计算的框架,它把Agent的整个工作流程抽象成一张有向图,每个节点是一个执行单元(可以是LLM调用、工具执行、状态修改等),每条边是节点间的状态转移规则(可以是固定顺序、条件判断、循环跳转等),整个Agent的运行状态保存在一个共享的状态字典里,所有节点都可以读取和修改这个字典。

LangGraph推出后,迅速成为了Agent开发的主流框架——截至2024年6月,GitHub上LangGraph的Stars已经突破30k,LangChain官方也把它作为了新一代Agent开发的核心工具。但就像所有的开源框架一样,LangGraph的**官方节点库(LangGraph Prebuilt Nodes)**只覆盖了80%的通用场景,剩下的20%(甚至更多的生产级场景)需要开发者自己扩展:

  • 比如电商公司需要对接内部的ERP系统查订单、CRM系统查用户信息、售后系统提交工单;
  • 比如医疗行业需要对接内部的知识图谱API、影像识别API、电子病历系统API;
  • 比如游戏公司需要对接内部的游戏服务器API、玩家数据API、支付系统API;
  • 甚至有些通用场景,官方节点的实现也不够优化——比如官方的ToolNode每次调用工具前都会把整个状态字典传给LLM作为上下文,对于状态很大的Agent,这会严重增加Token消耗和LLM的响应时间。

为了解决这些生产级的扩展需求,开发者需要掌握三大技能

  1. 理解LangGraph的核心原理:只有理解了状态、节点、边、子图的底层实现,才能写出高效、稳定的扩展代码;
  2. 开发自定义节点:针对特定的业务需求,开发可复用的执行单元;
  3. 构建插件生态:把多个相关的自定义节点、状态定义、边规则打包成一个插件,方便自己和团队其他开发者快速使用。
目标读者
  • 已经掌握Python基础语法和LangChain基础用法的开发者;
  • 想用LangGraph构建生产级Agent应用的开发者;
  • 想为LangGraph生态贡献插件的开源爱好者;
  • 想深入理解Agent框架底层原理的AI爱好者。
核心问题或挑战

在深入讲解技术细节之前,我们先提出三个贯穿全文的核心问题,这些问题也是生产级Agent开发中最常见的挑战:

  1. 自定义节点应该如何设计?:是应该继承官方的节点基类?还是应该自己实现一个完全独立的节点?如何确保自定义节点的可复用性、可测试性和安全性?
  2. 状态管理应该如何优化?:共享状态字典的大小会影响Agent的性能和Token消耗,如何设计状态的粒度?如何避免状态污染?如何实现状态的持久化?
  3. 插件生态应该如何构建?:单插件应该包含哪些内容?插件仓库应该如何设计和部署?如何管理插件的版本?如何确保插件的安全性?

接下来的内容,我们将一步步解决这些问题。


2. 核心概念解析:用“厨房做菜”理解LangGraph的图计算思维

核心概念

共享状态、节点(Node)、边(Edge)、子图(Subgraph)、入口节点(Entry Point)、出口节点(Exit Point)、条件边(Conditional Edge)、循环边(Loop Edge)、状态键(State Key)、状态更新规则(State Update Rule)

问题背景

很多开发者刚接触LangGraph时,都会被“状态驱动的有向图”这个概念搞得晕头转向——到底什么是状态?节点和边在图里到底是怎么工作的?共享状态和链式调用的层层传递到底有什么区别?

为了解释这些抽象的概念,我们用一个大家都熟悉的**“厨房做番茄炒蛋”**的例子来类比——假设我们要请朋友来家里吃饭,做番茄炒蛋是其中的一道菜,我们可以把整个做番茄炒蛋的流程抽象成一张LangGraph的有向图。

核心概念的生活化类比

我们先定义好“厨房做菜”这个类比里的各个角色,然后对应到LangGraph的核心概念:

厨房做菜的角色/元素 LangGraph的核心概念 详细说明
厨师的大脑和手上的食材清单、工具清单 共享状态字典 保存整个流程中所有需要共享的信息——比如食材清单(有没有番茄、有没有鸡蛋、有没有盐)、工具清单(有没有锅、有没有铲子、有没有刀)、当前进度(有没有切番茄、有没有打鸡蛋、有没有下锅炒)、朋友的口味偏好(喜欢甜口还是咸口)
厨师做的每一个具体动作(切番茄、打鸡蛋、尝咸淡、加调料) 节点(Node) 执行单元——每个节点都可以读取共享状态字典里的信息,也可以修改共享状态字典里的信息,还可以执行一些外部操作(比如打开冰箱拿鸡蛋、打开水龙头洗番茄)
厨师动作之间的逻辑关系(切完番茄才能打鸡蛋?还是可以同时切番茄和打鸡蛋?尝咸淡后如果太咸就加白糖,太淡就加盐,如果刚好就出锅) 边(Edge) 节点间的状态转移规则——可以是固定顺序的普通边,也可以是根据状态判断的条件边,还可以是回到之前节点的循环边
把做番茄炒蛋的流程单独拿出来,作为“做一桌菜”这个大流程里的一部分 子图(Subgraph) 可复用的图组件——可以把子图当成一个“超级节点”,嵌入到更大的图里,这样可以把复杂的大流程拆分成多个简单的小流程,方便维护和测试
准备开始做番茄炒蛋(比如先看一下朋友的口味偏好) 入口节点(Entry Point) 整个图的起始节点——Agent运行时会从入口节点开始执行
番茄炒蛋做好了,可以端上桌了 出口节点(Exit Point) 整个图的结束节点——Agent运行到出口节点时会停止
共享状态字典里的每一个具体信息(比如has_tomato: Truefriend_taste: "sweet"current_progress: "cutting_tomato" 状态键(State Key) 状态字典的基本单元——每个状态键都有一个对应的值,值的类型可以是字符串、数字、布尔值、列表、字典等Python的任意数据类型
厨师修改食材清单的规则(比如如果用了一个番茄,就把has_tomato的值从True改成False;如果加了一勺盐,就把salt_level的值从0改成1 状态更新规则(State Update Rule) 控制节点如何修改共享状态字典——默认情况下,LangGraph使用**字典合并(Dictionary Merge)**的规则,也就是节点返回的状态字典会和原来的共享状态字典合并,相同的键会被覆盖,不同的键会保留;但开发者也可以自定义状态更新规则,比如用列表追加、数值累加等。

现在,我们把“厨房做番茄炒蛋”的具体流程画成一张Mermaid的有向图,对应到LangGraph的核心概念:

渲染错误: Mermaid 渲染失败: Parse error on line 19: ...ate1[friend_taste: "sweet"\nsalt_level: -----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'STR'

从这张图里,我们可以清楚地看到LangGraph的几个核心优势:

  1. 并行执行:我们可以把“切番茄”和“打鸡蛋”这两个独立的任务放在一个并行子图里执行,节省时间——这在链式调用里是很难实现的;
  2. 循环重试:如果食材和工具不齐全,我们可以回到“准备食材和工具”的节点重新检查;如果尝咸淡不符合口味,我们可以回到“下锅炒”的节点重新调整——这在链式调用里是完全不可能的;
  3. 条件判断灵活:我们可以根据共享状态字典里的friend_tastesalt_levelsugar_level等信息,灵活地调整边的转移规则——这比LangChain原始的RouterChain可靠得多;
  4. 状态共享清晰:所有需要共享的信息都保存在一个共享状态字典里,所有节点都可以读取和修改——这比链式调用的层层传递要清晰得多,也不容易出错。
概念间的关系和相互作用

现在,我们已经用生活化的类比解释了LangGraph的核心概念,接下来我们用更专业的方式来梳理这些概念间的关系和相互作用。

概念核心属性维度对比

为了更清楚地理解各个概念的区别,我们先做一个概念核心属性维度对比表

核心概念 核心属性1:类型 核心属性2:是否可执行 核心属性3:是否可共享 核心属性4:是否可复用 核心属性5:是否有输入输出
共享状态字典 数据结构(通常是TypedDict) 是(所有节点共享) 是(可以在不同的图里复用) 是(所有节点的输入和输出都是它的子集)
节点(Node) 执行单元(函数或类的实例) 否(每个节点独立,但可以读取共享状态) 是(可以在不同的图里复用) 是(输入是共享状态字典的子集,输出是状态更新字典)
边(Edge) 状态转移规则(函数或固定字符串) 否(每条边属于特定的两个节点) 是(可以在不同的图里复用规则函数) 否(只有输入——当前节点的输出状态,没有输出——只有下一个节点的ID或END
子图(Subgraph) 可复用的图组件(本质上也是一个节点) 是(内部节点共享子图的局部状态,也可以访问外部图的全局状态) 是(可以嵌入到任意大的图里) 是(输入是外部图的全局状态的子集,输出是状态更新字典)
入口节点(Entry Point) 特殊的节点(通常是图的第一个节点) 否(每个图只有一个固定的入口节点,但可以动态指定) 是(输入是初始化状态,输出是状态更新字典)
出口节点(Exit Point) 特殊的节点(通常是END常量,也可以是自定义节点) 否(END是常量,自定义节点可以是) 是(END可以在所有图里复用) 否(END没有输入输出)
概念联系的ER实体关系图

接下来,我们用一张Mermaid ER实体关系图来梳理这些概念间的联系:

包含

被所有节点读取和修改

被所有子图读取和修改

拥有一个全局的

包含多个

包含多条

包含多个(作为超级节点)

拥有一个固定的

拥有多个(通常是END)

作为起点或终点

作为起点或终点(作为超级节点)

拥有一个局部的

包含多个内部节点

包含多条内部边

拥有一个固定的内部入口节点

拥有多个内部出口节点(通常是END,也可以返回外部图)

被一个或多个规则控制

使用一个或多个规则修改状态

SHARED_STATE

STATE_KEY

NODE

SUBGRAPH

GRAPH

EDGE

ENTRY_POINT

EXIT_POINT

STATE_UPDATE_RULE

从这张ER图里,我们可以看到:

  1. 共享状态字典是整个LangGraph的核心——所有的节点、子图都围绕着它工作;
  2. **图(Graph)**是LangGraph的最高级容器——它包含了全局共享状态、所有的节点、边、子图、入口节点和出口节点;
  3. **子图(Subgraph)**本质上也是一个图——它包含了局部共享状态、内部节点、内部边、内部入口节点和内部出口节点,同时也可以作为一个超级节点嵌入到更大的图里;
  4. 状态更新规则控制着节点如何修改共享状态——默认是字典合并,但开发者可以自定义。
概念交互关系图

最后,我们用一张Mermaid交互关系图来展示这些概念在Agent运行时的交互过程:

出口节点(END) 循环节点 条件节点 普通节点1 入口节点 全局共享状态字典 LangGraph 运行时 用户 出口节点(END) 循环节点 条件节点 普通节点1 入口节点 全局共享状态字典 LangGraph 运行时 用户 alt [条件成立(需要循环)] [条件不成立(不需要循环)] 初始化Agent(传入初始化状态) 创建全局共享状态字典,合并初始化状态 启动Agent,执行入口节点 读取全局共享状态 执行内部逻辑 返回状态更新字典,合并到全局共享状态 返回下一个节点的ID(NormalNode1) 执行普通节点1 读取全局共享状态 执行内部逻辑 返回状态更新字典,合并到全局共享状态 返回下一个节点的ID(ConditionNode) 执行条件节点 读取全局共享状态 执行条件判断逻辑 返回下一个节点的ID(LoopNode) 执行循环节点 读取全局共享状态 执行内部逻辑 返回状态更新字典,合并到全局共享状态 返回下一个节点的ID(NormalNode1) 再次执行普通节点1(循环开始) 返回下一个节点的ID(ExitNode) 执行出口节点(END) 返回END 读取最终的全局共享状态 返回最终结果

从这张交互关系图里,我们可以清楚地看到Agent运行时的完整流程:

  1. 用户初始化Agent,传入初始化状态;
  2. LangGraph运行时创建全局共享状态字典,合并初始化状态;
  3. LangGraph运行时从入口节点开始执行,依次执行每个节点;
  4. 每个节点读取全局共享状态,执行内部逻辑,返回状态更新字典,合并到全局共享状态;
  5. 每个节点返回下一个节点的ID(或END);
  6. 如果遇到条件节点,根据条件判断返回不同的下一个节点ID;
  7. 如果遇到循环边,回到之前的节点重新执行;
  8. 当执行到END常量时,Agent停止运行,返回最终的全局共享状态。

3. 技术原理与实现:深入LangGraph的底层源码

核心概念

BaseState、BaseNode、StateGraph、END、GraphRuntime、状态更新器(StateUpdater)、边解析器(EdgeResolver)

问题背景

上一章我们用生活化的类比和图表解释了LangGraph的核心概念,但要写出高效、稳定的自定义节点和插件,我们还需要深入理解LangGraph的底层源码实现——比如共享状态字典是怎么定义的?节点是怎么执行的?边是怎么解析的?状态是怎么更新的?

在这一章里,我们将基于LangGraph 0.2.x版本的源码(这是截至2024年6月的最新稳定版本),一步步拆解LangGraph的核心组件。为了方便讲解,我们会简化一些源码细节,但会保留核心的逻辑。

核心组件的底层实现

LangGraph的核心组件主要有五个:

  1. BaseState:共享状态字典的基类——所有的共享状态字典都必须继承自它;
  2. BaseNode:节点的基类——所有的自定义节点都必须继承自它;
  3. StateGraph:图的构建类——开发者用它来添加节点、边、子图,构建完整的Agent工作流程;
  4. GraphRuntime:图的运行时类——负责执行Agent工作流程,管理共享状态、节点执行、边解析、状态更新;
  5. END:出口节点常量——当节点返回END时,Agent停止运行。

接下来,我们分别讲解这五个核心组件的底层实现。

1. BaseState:共享状态字典的基类

在LangGraph 0.2.x版本之前,共享状态字典通常是用Python的TypedDict来定义的——TypedDict可以给字典的键加上类型提示,方便代码的可读性和静态类型检查(比如用mypy)。但TypedDict有一个缺点:它是一个静态的类型定义,不能在运行时动态添加或删除键,也不能自定义状态更新规则。

为了解决这些问题,LangGraph 0.2.x版本推出了BaseState基类——它本质上是一个继承自dict的Python类,但添加了以下几个核心功能:

  1. 类型安全的初始化:可以在运行时检查传入的初始化状态是否符合类型定义;
  2. 自定义状态更新规则:可以重写__add__方法(或者用@add_updater装饰器),为不同的状态键定义不同的更新规则;
  3. 状态序列化与反序列化:可以轻松地将状态字典序列化为JSON或YAML,也可以反序列化回来;
  4. 状态历史记录:可以记录状态的每一次更新,方便调试和回溯。

为了更清楚地理解BaseState的实现,我们来看一个简化版的源码:

from typing import Any, Dict, Type, TypeVar, Optional, Callable
from pydantic import BaseModel, Field, ValidationError

# 定义一个泛型类型变量,用于类型提示
T = TypeVar('T', bound='BaseState')

class BaseState(dict):
    """
    LangGraph共享状态字典的基类
    本质上是一个继承自dict的Python类,但添加了类型安全、自定义更新规则、序列化等功能
    """
    # 类变量:用于存储类型定义(可选,用Pydantic BaseModel定义)
    schema: Optional[Type[BaseModel]] = None
    # 类变量:用于存储自定义状态更新规则(键是状态键,值是更新函数)
    updaters: Dict[str, Callable[[Any, Any], Any]] = {}

    def __init__(self, *args, **kwargs):
        """
        初始化共享状态字典
        首先调用dict的__init__方法,然后如果有schema,就验证传入的状态是否符合类型定义
        """
        super().__init__(*args, **kwargs)
        if self.schema is not None:
            self._validate()

    def _validate(self) -> None:
        """
        验证当前状态是否符合schema的类型定义
        如果不符合,就抛出ValidationError
        """
        try:
            # 用Pydantic BaseModel验证当前状态
            self.schema(**self)
        except ValidationError as e:
            raise ValueError(f"State validation failed: {e}") from e

    def __add__(self: T, other: Dict[str, Any]) -> T:
        """
        重写__add__方法,实现自定义状态更新规则
        默认情况下,使用字典合并的规则(相同的键覆盖,不同的键保留)
        如果某个状态键有自定义的updater,就用updater来更新
        """
        # 创建一个新的状态字典,复制当前状态
        new_state = self.__class__(self)
        # 遍历other的键值对
        for key, value in other.items():
            # 如果key有自定义的updater
            if key in self.updaters:
                # 如果key在当前状态里,就用updater更新
                if key in new_state:
                    new_state[key] = self.updaters[key](new_state[key], value)
                # 如果key不在当前状态里,就直接赋值
                else:
                    new_state[key] = value
            # 如果key没有自定义的updater,就用默认的字典合并规则
            else:
                new_state[key] = value
        # 如果有schema,就验证新状态是否符合类型定义
        if new_state.schema is not None:
            new_state._validate()
        # 返回新状态
        return new_state

    @classmethod
    def add_updater(cls, key: str, updater: Callable[[Any, Any], Any]) -> None:
        """
        类方法:为某个状态键添加自定义的更新规则
        """
        cls.updaters[key] = updater

    def to_json(self) -> str:
        """
        将状态字典序列化为JSON字符串
        """
        import json
        return json.dumps(self)

    @classmethod
    def from_json(cls: Type[T], json_str: str) -> T:
        """
        从JSON字符串反序列化为状态字典
        """
        import json
        return cls(json.loads(json_str))

现在,我们用这个简化版的BaseState来定义一个“厨房做番茄炒蛋”的共享状态字典:

from pydantic import BaseModel, Field
from typing import Literal

# 第一步:定义Pydantic BaseModel作为schema,用于类型安全
class TomatoEggStateSchema(BaseModel):
    friend_taste: Literal["sweet", "salty", "spicy"] = Field(..., description="朋友的口味偏好")
    salt_level: int = Field(default=0, ge=0, le=10, description="盐的水平(0-10)")
    sugar_level: int = Field(default=0, ge=0, le=10, description="糖的水平(0-10)")
    current_progress: Literal["start", "prepare", "cut_and_beat", "cook", "taste", "serve"] = Field(default="start", description="当前进度")
    has_tomato: bool = Field(default=True, description="有没有番茄")
    has_egg: bool = Field(default=True, description="有没有鸡蛋")
    has_salt: bool = Field(default=True, description="有没有盐")
    has_sugar: bool = Field(default=True, description="有没有糖")
    has_pot: bool = Field(default=True, description="有没有锅")
    has_spatula: bool = Field(default=True, description="有没有铲子")
    has_knife: bool = Field(default=True, description="有没有刀")
    message_log: list[str] = Field(default_factory=list, description="消息日志")

# 第二步:定义继承自BaseState的共享状态字典类
class TomatoEggState(BaseState):
    # 绑定schema
    schema = TomatoEggStateSchema
    # 为message_log添加自定义更新规则:列表追加
    @classmethod
    def _append_message_log(cls, old_value: list[str], new_value: str | list[str]) -> list[str]:
        if isinstance(new_value, str):
            return old_value + [new_value]
        elif isinstance(new_value, list):
            return old_value + new_value
        else:
            raise ValueError(f"Invalid value type for message_log: {type(new_value)}")
    # 注册自定义更新规则
    updaters = {
        "message_log": _append_message_log
    }

# 测试一下我们定义的共享状态字典
if __name__ == "__main__":
    # 初始化状态
    initial_state = TomatoEggState(
        friend_taste="sweet",
        message_log="开始做番茄炒蛋"
    )
    print("初始状态:")
    print(initial_state)
    print("\n")

    # 测试默认的字典合并规则
    updated_state1 = initial_state + {"current_progress": "prepare", "message_log": "准备食材和工具"}
    print("第一次更新后的状态(测试默认合并和自定义追加):")
    print(updated_state1)
    print("\n")

    # 测试自定义的列表追加规则(传入列表)
    updated_state2 = updated_state1 + {"message_log": ["检查有没有番茄", "检查有没有鸡蛋", "检查有没有盐、糖、锅、铲子、刀"]}
    print("第二次更新后的状态(测试自定义追加列表):")
    print(updated_state2)
    print("\n")

    # 测试类型安全
    try:
        invalid_state = updated_state2 + {"salt_level": 11}  # salt_level的最大值是10
    except ValueError as e:
        print("类型安全测试失败(预期的结果):")
        print(e)

运行这段测试代码,我们会看到:

  1. 初始状态成功创建,符合schema的类型定义;
  2. 第一次更新后的状态,current_progress被覆盖成了"prepare"(默认的字典合并规则),message_log被追加了一条新消息(自定义的更新规则);
  3. 第二次更新后的状态,message_log被追加了三条新消息(自定义的更新规则支持传入列表);
  4. 类型安全测试成功,当我们试图把salt_level设置成11时,抛出了ValueError
2. BaseNode:节点的基类

在LangGraph 0.2.x版本之前,节点通常是用Python的函数来定义的——函数的输入是共享状态字典,输出是状态更新字典或下一个节点的ID(或END)。但函数式节点有一个缺点:它不能保存状态(比如LLM的API密钥、工具的配置等),每次执行都要重新初始化,这在生产级场景里会严重影响性能。

为了解决这些问题,LangGraph 0.2.x版本推出了BaseNode基类——它本质上是一个Python的抽象基类(ABC),所有的自定义节点都必须继承自它,并实现抽象方法__call__。BaseNode添加了以下几个核心功能:

  1. 可保存状态:可以在节点的__init__方法里初始化一些持久化的状态(比如LLM的API密钥、工具的配置、数据库连接等);
  2. 类型安全的输入输出:可以用Pydantic BaseModel定义节点的输入和输出类型,方便代码的可读性和静态类型检查;
  3. 可测试性:可以轻松地对节点进行单元测试,不需要启动整个Agent工作流程;
  4. 可组合性:可以把多个节点组合成一个更复杂的节点(比如子图)。

为了更清楚地理解BaseNode的实现,我们来看一个简化版的源码:

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type, TypeVar
from pydantic import BaseModel, Field, ValidationError

# 定义泛型类型变量
S = TypeVar('S', bound='BaseState')
I = TypeVar('I', bound=BaseModel)
O = TypeVar('O', bound=BaseModel)

class BaseNode(ABC):
    """
    LangGraph节点的抽象基类
    所有的自定义节点都必须继承自它,并实现抽象方法__call__
    """
    # 类变量:节点的名称(可选,默认是类名)
    name: str = None
    # 类变量:节点的输入类型(可选,用Pydantic BaseModel定义)
    input_schema: Optional[Type[I]] = None
    # 类变量:节点的输出类型(可选,用Pydantic BaseModel定义,输出是状态更新字典的子集)
    output_schema: Optional[Type[O]] = None

    def __init__(self, name: Optional[str] = None):
        """
        初始化节点
        可以在这里初始化一些持久化的状态(比如LLM的API密钥、工具的配置、数据库连接等)
        """
        if name is not None:
            self.name = name
        elif self.name is None:
            self.name = self.__class__.__name__

    @abstractmethod
    def _execute(self, state: S) -> Dict[str, Any]:
        """
        抽象方法:节点的核心执行逻辑
        输入是共享状态字典,输出是状态更新字典
        子类必须实现这个方法
        """
        pass

    def __call__(self, state: S) -> Dict[str, Any]:
        """
        节点的调用入口
        首先验证输入状态是否符合input_schema,然后调用_execute方法,最后验证输出状态是否符合output_schema
        """
        # 第一步:验证输入状态是否符合input_schema(如果有)
        if self.input_schema is not None:
            try:
                # 从共享状态字典里提取input_schema需要的键
                input_data = {k: state[k] for k in self.input_schema.__fields__.keys() if k in state}
                self.input_schema(**input_data)
            except ValidationError as e:
                raise ValueError(f"Input validation failed for node {self.name}: {e}") from e

        # 第二步:调用_execute方法,执行核心逻辑
        output_state = self._execute(state)

        # 第三步:验证输出状态是否符合output_schema(如果有)
        if self.output_schema is not None:
            try:
                self.output_schema(**output_state)
            except ValidationError as e:
                raise ValueError(f"Output validation failed for node {self.name}: {e}") from e

        # 第四步:返回输出状态
        return output_state

现在,我们用这个简化版的BaseNode来定义一个“厨房做番茄炒蛋”里的**“准备食材和工具”节点**:

from typing import Literal

# 第一步:定义节点的输入类型(用Pydantic BaseModel)
class PrepareNodeInput(BaseModel):
    has_tomato: bool = Field(..., description="有没有番茄")
    has_egg: bool = Field(..., description="有没有鸡蛋")
    has_salt: bool = Field(..., description="有没有盐")
    has_sugar: bool = Field(..., description="有没有糖")
    has_pot: bool = Field(..., description="有没有锅")
    has_spatula: bool = Field(..., description="有没有铲子")
    has_knife: bool = Field(..., description="有没有刀")

# 第二步:定义节点的输出类型(用Pydantic BaseModel,是状态更新字典的子集)
class PrepareNodeOutput(BaseModel):
    current_progress: Literal["prepare", "check"] = Field(..., description="当前进度")
    all_items_ready: bool = Field(..., description="所有食材和工具是否齐全")
    message_log: str = Field(..., description="消息日志")

# 第三步:定义继承自BaseNode的PrepareNode类
class PrepareNode(BaseNode):
    # 绑定节点名称、输入类型、输出类型
    name = "prepare_node"
    input_schema = PrepareNodeInput
    output_schema = PrepareNodeOutput

    def _execute(self, state: TomatoEggState) -> Dict[str, Any]:
        """
        节点的核心执行逻辑:检查所有食材和工具是否齐全
        """
        # 检查所有食材和工具
        all_items_ready = (
            state["has_tomato"] and
            state["has_egg"] and
            state["has_salt"] and
            state["has_sugar"] and
            state["has_pot"] and
            state["has_spatula"] and
            state["has_knife"]
        )
        # 生成消息日志
        if all_items_ready:
            message_log = "所有食材和工具都准备好了,下一步是切番茄和打鸡蛋"
            current_progress = "check"
        else:
            missing_items = []
            if not state["has_tomato"]:
                missing_items.append("番茄")
            if not state["has_egg"]:
                missing_items.append("鸡蛋")
            if not state["has_salt"]:
                missing_items.append("盐")
            if not state["has_sugar"]:
                missing_items.append("糖")
            if not state["has_pot"]:
                missing_items.append("锅")
            if not state["has_spatula"]:
                missing_items.append("铲子")
            if not state["has_knife"]:
                missing_items.append("刀")
            message_log = f"缺少以下食材和工具:{', '.join(missing_items)},需要去买或借"
            current_progress = "prepare"
        # 返回状态更新字典
        return {
            "current_progress": current_progress,
            "all_items_ready": all_items_ready,
            "message_log": message_log
        }

# 测试一下我们定义的PrepareNode
if __name__ == "__main__":
    # 初始化状态
    initial_state = TomatoEggState(
        friend_taste="sweet",
        message_log="开始做番茄炒蛋"
    )
    print("初始状态:")
    print(initial_state)
    print("\n")

    # 初始化节点
    prepare_node = PrepareNode()

    # 第一次测试:所有食材和工具都齐全
    output_state1 = prepare_node(initial_state)
    print("第一次测试(所有食材和工具都齐全)的输出状态:")
    print(output_state1)
    print("\n")

    # 第二次测试:缺少番茄和刀
    initial_state2 = initial_state + {"has_tomato": False, "has_knife": False}
    output_state2 = prepare_node(initial_state2)
    print("第二次测试(缺少番茄和刀)的输出状态:")
    print(output_state2)
    print("\n")

    # 第三次测试:类型安全(缺少has_egg)
    try:
        initial_state3 = initial_state.copy()
        del initial_state3["has_egg"]
        output_state3 = prepare_node(initial_state3)
    except ValueError as e:
        print("第三次测试(类型安全,缺少has_egg)的结果(预期的失败):")
        print(e)

运行这段测试代码,我们会看到:

  1. 第一次测试成功,所有食材和工具都齐全,输出状态符合output_schema;
  2. 第二次测试成功,缺少番茄和刀,输出状态符合output_schema;
  3. 第三次测试成功,当我们删除了has_egg键时,抛出了ValueError,因为input_schema要求必须有has_egg键。
3. StateGraph:图的构建类

StateGraph是LangGraph的图构建类——开发者用它来添加节点、边、子图,构建完整的Agent工作流程。StateGraph的核心功能有:

  1. 添加节点:可以添加函数式节点、类式节点(继承自BaseNode)、子图;
  2. 添加边:可以添加普通边(固定顺序)、条件边(根据状态判断)、循环边;
  3. 设置入口节点:可以设置图的入口节点;
  4. 编译图:可以将构建好的图编译成一个可运行的Agent(本质上是GraphRuntime的实例)。

为了更清楚地理解StateGraph的实现,我们来看一个简化版的源码:

from typing import Any, Dict, Optional, Callable, List, Union
from collections import defaultdict

# 定义END常量
END = "__end__"

class StateGraph:
    """
    LangGraph的图构建类
    开发者用它来添加节点、边、子图,构建完整的Agent工作流程
    """
    def __init__(self, state_schema: Type[S]):
        """
        初始化StateGraph
        输入是共享状态字典的类(继承自BaseState)
        """
        self.state_schema = state_schema
        # 存储所有节点:键是节点ID,值是节点(函数或类的实例)
        self.nodes: Dict[str, Union[Callable[[S], Dict[str, Any]], BaseNode]] = {}
        # 存储所有边:键是起始节点ID,值是一个列表,列表里的元素是边的信息(可以是下一个节点ID,也可以是条件函数)
        self.edges: Dict[str, List[Union[str, Callable[[S], str]]]] = defaultdict(list)
        # 存储入口节点ID
        self.entry_point: Optional[str] = None

    def add_node(self, node_id: str, node: Union[Callable[[S], Dict[str, Any]], BaseNode]) -> "StateGraph":
        """
        添加节点
        输入是节点ID和节点(函数或类的实例)
        返回self,方便链式调用
        """
        if node_id in self.nodes:
            raise ValueError(f"Node with ID {node_id} already exists")
        if node_id == END:
            raise ValueError(f"Node ID cannot be {END}")
        self.nodes[node_id] = node
        return self

    def add_edge(self, start_node_id: str, end_node_id: str) -> "StateGraph":
        """
        添加普通边(固定顺序)
        输入是起始节点ID和结束节点ID
        返回self,方便链式调用
        """
        if start_node_id not in self.nodes and start_node_id != END:
            raise ValueError(f"Start node with ID {start_node_id} does not exist")
        if end_node_id not in self.nodes and end_node_id != END:
            raise ValueError(f"End node with ID {end_node_id} does not exist")
        self.edges[start_node_id].append(end_node_id)
        return self

   
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐