目录

一、LangGraph 是什么

二、为什么需要 LangGraph

三、核心概念详解

3.1 图(Graph)

3.2 节点(Node)

3.3 边(Edge)

3.4 状态(State)

四、LangGraph 和 LangChain 的关系

五、安装

六、基础案例:多功能对话机器人

6.1 需求分析

6.2 流程设计

6.3 完整代码实现

6.4 可视化输出图

七、进阶篇

7.1 设置 Config 配置

基本用法

7.2 thread_id:会话身份标识

7.3 加入记忆:InMemorySaver

7.4 持久化记忆:SQLite 存储

7.5 人工介入(Human-in-the-Loop)

7.6 并行执行

基础并行

并行节点共享列表状态时的陷阱

7.7 子图(Subgraph)

基本用法

主图与子图之间的数据传递

7.8 流式输出

7.9 异步操作

ainvoke:异步调用

astream:异步流式输出

八、总结


一、LangGraph 是什么

LangGraph 是由 LangChain 团队开发的一个 Python 库,专门用于构建有状态的、多步骤的、由大语言模型驱动的应用程序

它的核心思想很直观:把应用的工作流建模为一张"图"。图里有节点(执行具体功能)和边(定义流转关系),数据(状态)在图中流转和更新。

打个比方,如果 LangChain 的 Chain 是一条单行道,那 LangGraph 就是一个立交桥——有多条路径、有岔路口、有环路,车辆(数据)可以根据路况(条件)选择不同的方向。

你可以把它理解成一个专门为 LLM 应用设计的"工作流引擎",或者把它看成代码版的 Coze / Dify——只不过用纯代码来定义流程,灵活性更高。


二、为什么需要 LangGraph

在 LangGraph 出现之前,我们用 LangChain 构建的主要是"链"(Chain)——输入经过检索、然后生成答案、最后输出,一条直线走到底。这种模式对简单的问答场景够用了,但一旦遇到下面这些情况就会捉襟见肘:

场景一:需要根据中间结果做决策的多步流程

比如用户问了一个问题,系统需要先判断这个问题属于哪个领域,再决定调用哪个专业工具来回答。这种"分叉"逻辑在纯链式结构中很难表达。

场景二:多轮对话中的状态管理

聊天机器人需要记住用户之前说了什么、已经执行了哪些操作、当前处于什么阶段。这些信息需要在多轮交互中持久保存和更新。

场景三:多角色协作系统

模拟一个项目团队,产品经理提需求、工程师写代码、测试员做测试,多个角色交替执行任务,彼此之间需要传递和共享信息。

场景四:需要人工介入的审批流程

比如一个自动化的申请处理系统,机器跑完自动审批步骤后,需要暂停等人来确认,确认完再继续后面的流程。

这些场景的共同特征是:需要条件分支、循环执行、状态持久化、以及流程中断与恢复。这正是 LangGraph 着力解决的核心问题。


三、核心概念详解

学 LangGraph 其实就是在理解四个东西:图、节点、边、状态。把这四个概念吃透了,上手就很快。

3.1 图(Graph)

图是整个应用的骨架。它由节点和边组成,负责管理和调度整个工作流的执行顺序。

在代码层面,我们通过 StateGraph 类来创建图:

from langgraph.graph import StateGraph

graph_builder = StateGraph(你的状态类型)

3.2 节点(Node)

节点是图中的基本功能单元,本质上就是一个 Python 函数。

每个节点做的事情很统一:接收当前的共享状态 → 执行业务逻辑 → 返回更新后的状态。

比如一个节点负责调用大模型做翻译,另一个节点负责调搜索引擎做信息检索,还有一个节点负责做意图分类——每个节点各司其职。

def my_node(state):
    # 做点事情,修改 state
    state["某个字段"] = "新的值"
    return state

add_node 把节点注册到图上:

graph_builder.add_node("节点名称", 对应的函数) 

3.3 边(Edge)

边连接节点,定义了执行完一个节点之后,下一步该去哪里。

LangGraph 里有两种边:

普通边——无条件跳转,执行完 A 就去 B:

graph_builder.add_edge("节点A", "节点B") 

条件边——根据当前状态的值动态决定走向:

def routing_logic(state):
    if state["某个条件"]:
        return "节点A"
    else:
        return "节点B"

graph_builder.add_conditional_edges("当前节点", routing_logic)

条件边是 LangGraph 实现分支逻辑的关键机制。

3.4 状态(State)

状态是在整个图中流转和更新的共享数据。它是所有节点之间传递信息的载体。

LangGraph 推荐用 TypedDict 或 Pydantic 模型来定义状态的结构,这样每个字段都有明确的类型标注,代码更清晰,也方便 IDE 做类型检查:

from typing import TypedDict

class MyState(TypedDict):
    input: str
    output: str
    is_done: bool

状态的设计是 LangGraph 应用的核心。你需要想清楚整个流程中需要传递哪些数据,然后把它们都定义在状态里。


四、LangGraph 和 LangChain 的关系

很多人初学的时候会搞混这两个东西,这里做个梳理。

LangChain 提供的是基础组件:LLM 封装、Prompt 模板、Tool 定义、Chain 等。它是构建 LLM 应用的"零件库"。

LangGraph 在 LangChain 的基础上,提供了一套图结构的工作流编排能力。它不是要取代 LangChain,而是在 LangChain 之上加了一层"编排层"。

用一个实际的类比来说:

  • LangChain 像是给了你各种食材和厨具(LLM、Tool、Prompt)
  • LangGraph 则是给了你一张菜谱和烹饪流程图(先炒菜再炖汤,如果口味偏辣就多加辣椒)

两者通常是配合使用的。在 LangGraph 的节点里面,你经常会调用 LangChain 的 LLM、Tool 或 Chain。

维度 LangChain LangGraph
执行模式 线性链式,单向流动 图结构,支持分支和循环
状态管理 无内置状态管理 内置状态定义和流转机制
条件分支 需要自己写逻辑 原生支持条件边
人工介入 不支持 支持 interrupt 机制
并行执行 不支持 支持并行节点
适用场景 简单问答、单步任务 复杂 Agent、多步工作流

简单说:LangChain 是"直线型"流程,LangGraph 是"地图型"流程。


五、安装

安装 LangGraph 及其依赖:

pip install langchain pip install langgraph 

如果需要使用 OpenAI 兼容接口(比如 DeepSeek):

pip install langchain-openai 

六、基础案例:多功能对话机器人

6.1 需求分析

我们要实现一个多功能对话机器人,功能如下:

  • 用户输入的内容如果是一个翻译请求(比如"把这句话翻译成英文"),机器人自动识别语言并进行中英互译,只输出译文。
  • 如果用户输入的是普通问题(比如"什么是大模型"),机器人用中文给出回答。

核心逻辑就是:先做意图分类 → 根据分类结果走不同的分支 → 输出结果。

6.2 流程设计

整个流程用文字描述如下:

用户输入
  │
  ▼
意图识别节点(判断是翻译还是问答)
  │
  ├── 是翻译 ──▶ 抽取翻译内容 ──▶ 翻译 ──▶ 输出译文
  │
  └── 是问答 ──▶ 回答问题 ──▶ 输出答案

这是一个典型的条件分支图结构,正好用 LangGraph 来实现。

6.3 完整代码实现

第一步:导入依赖

from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
from langchain_core.messages import SystemMessage, HumanMessage

第二步:配置 LLM

这里以 DeepSeek 为例,你也可以换成其他兼容 OpenAI 接口的模型:

llm = ChatOpenAI(
    model="deepseek-chat",
    base_url="https://api.deepseek.com",
    api_key="sk-你的api密钥"
)

第三步:定义状态

class AgentState(dict):
    input: str                   # 用户输入
    is_translate_or_not: bool    # 是否是翻译任务
    translate_sentence: str      # 提取出来的待翻译句子
    translate_result: str        # 翻译结果
    qa_result: str               # 问答结果
    output: str                  # 最终输出

第四步:定义各个节点

意图识别节点——让 LLM 判断用户输入属于翻译还是问答:

def identify_intent_node(state: AgentState) -> AgentState:
    """判断用户输入是翻译任务还是普通问答"""
    user_input = state['input']
    messages = [
        SystemMessage(content=(
            "你是一个意图分类助手,用来判断用户输入是'翻译任务'还是'普通回答任务'。\n"
            "翻译任务的特征:用户明确要求翻译文章、句子、单词,或要求将中文翻译成英文/英文翻译成中文。\n"
            "普通回答任务的特征:用户是直接提问、咨询信息、讨论话题,而不是要求翻译。\n"
            "请你只回答:翻译 或 普通回答,不要补充其他内容。"
        )),
        HumanMessage(content=user_input)
    ]
    response = llm.invoke(messages).content.strip()
    state['is_translate_or_not'] = "翻译" in response
    return state

抽取翻译内容节点——从用户输入中提取出真正需要翻译的文本:

def extract_translate_sentence_node(state: AgentState) -> AgentState:
    """提取待翻译的句子"""
    user_input = state['input']
    prompt = (
        "你是翻译助手。请先自动识别语言,然后提取待翻译的句子。"
        "请将待翻译的句子提取出来,并输出。"
        "请不要输出其他内容。\n"
        f"待翻译内容:{user_input}"
    )
    result = llm.invoke([HumanMessage(content=prompt)]).content
    state['translate_sentence'] = result
    return state

翻译节点——对提取出来的文本做中英互译:

def translate_node(state: AgentState) -> AgentState:
    """执行翻译"""
    sentence = state["translate_sentence"]
    prompt = (
        "你是翻译助手。请先自动识别语言,然后在中文与英文之间互译。"
        "只输出译文本身,不要额外解释。\n"
        f"待翻译内容:{sentence}"
    )
    result = llm.invoke([HumanMessage(content=prompt)]).content
    state['translate_result'] = result
    state['output'] = result
    return state

问答节点——直接回答用户的问题:

def qa_node(state: AgentState) -> AgentState:
    """回答用户问题"""
    question = state["input"]
    prompt = (
        "你是一个问答助手,请回答问题。"
        "请使用中文回答。\n"
        f"问题:{question}"
    )
    result = llm.invoke([HumanMessage(content=prompt)]).content
    state['qa_result'] = result
    state['output'] = result
    return state

第五步:构建图

def build_langgraph():
    graph_builder = StateGraph(AgentState)

    # 注册节点
    graph_builder.add_node("意图识别节点", identify_intent_node)
    graph_builder.add_node("抽取翻译内容节点", extract_translate_sentence_node)
    graph_builder.add_node("翻译节点", translate_node)
    graph_builder.add_node("问答节点", qa_node)

    # 定义边
    graph_builder.add_edge(START, "意图识别节点")

    # 条件边:根据意图识别结果走不同分支
    def route_by_intent(state: AgentState):
        if state['is_translate_or_not']:
            return "抽取翻译内容节点"
        else:
            return "问答节点"

    graph_builder.add_conditional_edges("意图识别节点", route_by_intent)
    graph_builder.add_edge("抽取翻译内容节点", "翻译节点")
    graph_builder.add_edge("翻译节点", END)
    graph_builder.add_edge("问答节点", END)

    graph = graph_builder.compile()
    return graph

第六步:测试

if __name__ == "__main__":
    graph = build_langgraph()

    # 测试翻译任务
    print("=== 翻译测试 ===")
    state = graph.invoke({"input": "请将这句话翻译成英文:你好,世界!"})
    print("翻译结果:", state['output'])

    # 测试普通问答任务
    print("\n=== 问答测试 ===")
    state = graph.invoke({"input": "解释下什么是大模型"})
    print("回答结果:", state['output'])

6.4 可视化输出图

LangGraph 内置了图的可视化能力,可以将构建好的图导出为图片:

from langgraph.graph.state import CompiledStateGraph

def output_pic_graph(graph: CompiledStateGraph, filename: str = "graph.png"):
    """将图导出为图片"""
    try:
        mermaid_code = graph.get_graph().draw_mermaid_png()
        with open(filename, 'wb') as f:
            f.write(mermaid_code)
        print(f"图已保存到 {filename}")
    except Exception as e:
        print(f"导出失败:{e}")

调用 output_pic_graph(graph) 就会在当前目录生成一张 Mermaid 格式的流程图。前提是需要安装 pip install pygraphviz(部分环境下需要额外安装系统级依赖)。

如果安装 graphviz 有困难,也可以直接在终端打印 Mermaid 代码:

print(graph.get_graph().draw_mermaid())

然后把输出的 Mermaid 代码粘贴到支持 Mermaid 的编辑器(比如 Typora、飞书文档、CSDN 的 Mermaid 代码块)中即可渲染。


七、进阶篇

基础案例跑通之后,我们来看 LangGraph 的进阶功能。这些功能在实际项目中非常高频。

7.1 设置 Config 配置

基本用法

LangGraph 支持在运行时通过 config 参数向图或节点传递配置信息。这个机制很实用——比如你想在不修改代码的情况下切换 LLM 的模型参数、设置 API Key、或者传入用户级别的配置。

Config 的本质就是一个字典,通过 RunnableConfig 来封装:

from langgraph.types import RunnableConfig
from langgraph.graph import StateGraph, START, END
from typing import TypedDict


class State(TypedDict):
    input: str
    step1: str
    step2: str
    result: str


def step1(state: State, config: RunnableConfig):
    prefix = config.get("configurable", {}).get("prefix", "")
    text = f"{prefix}{state['input']}"
    state["step1"] = text
    print(f"[Step1] 使用 prefix = {prefix}")
    return state


def step2(state: State, config: RunnableConfig):
    suffix = config.get("configurable", {}).get("suffix", "")
    result = f"{state['step1']}{suffix}"
    state["step2"] = result
    state["result"] = result
    print(f"[Step2] 使用 suffix = {suffix}")
    return state


def build_graph():
    graph_builder = StateGraph(State)
    graph_builder.add_node("step1", step1)
    graph_builder.add_node("step2", step2)
    graph_builder.add_edge(START, "step1")
    graph_builder.add_edge("step1", "step2")
    graph_builder.add_edge("step2", END)
    return graph_builder.compile()


graph = build_graph()

# 运行时传入 config
config = RunnableConfig(configurable={
    "prefix": "[前缀] ",
    "suffix": " [后缀]"
})

result = graph.invoke({"input": "你好,LangGraph!"}, config=config)
print("最终输出:", result)

注意看节点函数的签名:它多了一个 config: RunnableConfig 参数。LangGraph 在调用节点时会自动把 config 注入进去,你不需要手动传递。

实际项目中,config 经常用来做这些事情:

  • 给 LLM 传入不同的 API Key 和 base_url
  • 设置模型的 temperature、max_tokens 等参数
  • 传入用户身份信息做权限控制
  • 设置 thread_id 来标识会话

7.2 thread_id:会话身份标识

在所有 config 参数中,thread_id 是最特殊也最重要的一个。

它相当于给每个会话分配了一个唯一编号。LangGraph 靠它来识别"这是同一个用户的连续对话"还是"这是一个全新的会话"。

打个比方:你去银行办业务,第一次去取了号(thread_id),办到一半离开,回来时再报同一个号码,柜员就能从上次中断的地方继续。如果你重新取号,那就是一笔新业务。

thread_id 的典型应用场景:

  • 连续对话记忆:记住用户之前说了什么
  • 中断恢复:流程暂停后(比如等人工审批),通过同一个 thread_id 恢复执行
  • 异步任务:一个复杂任务分多次完成,thread_id 确保每次都接上之前的进度

使用方式:

config = RunnableConfig(configurable={"thread_id": "用户A的会话"})

同一个 thread_id 配合记忆模块(后面会讲),就能实现跨轮次的状态保持。

7.3 加入记忆:InMemorySaver

没有记忆的 LangGraph 图,每次运行都是独立的,不会保留上次的状态。加上记忆之后,同一个 thread_id 的多次调用可以共享状态。

实现方式非常简单,只需要两步:

第一步,创建一个 checkpointer(记忆存储器):

from langgraph.checkpoint.memory import InMemorySaver

memory = InMemorySaver()

第二步,在编译图时传入:

graph = graph_builder.compile(checkpointer=memory) 

这样就搞定了。来看一个完整的累加器例子,直观感受记忆的效果:

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import RunnableConfig


class State(dict):
    total: int       # 累加总和
    new_value: int   # 本次输入的数字


def add_node(state: State):
    old_total = state.get("total", 0)
    new_value = state["new_value"]
    new_total = old_total + new_value
    state["total"] = new_total
    return state


def build_graph():
    graph_builder = StateGraph(State)
    graph_builder.add_node("add", add_node)
    graph_builder.add_edge(START, "add")
    graph_builder.add_edge("add", END)

    memory = InMemorySaver()
    graph = graph_builder.compile(checkpointer=memory)
    return graph


graph = build_graph()

# 第一次调用:thread_id = "12345"
config = RunnableConfig(configurable={"thread_id": "12345"})
result1 = graph.invoke({"new_value": 5}, config=config)
print("结果1:", result1)   # total = 5

# 第二次调用:同一个 thread_id,状态会从上次的结果继续
result2 = graph.invoke({"new_value": 7}, config=config)
print("结果2:", result2)   # total = 12(5 + 7)

# 第三次调用:不同的 thread_id,全新状态
config2 = RunnableConfig(configurable={"thread_id": "67890"})
result3 = graph.invoke({"new_value": 10}, config=config2)
print("结果3:", result3)   # total = 10

输出结果:

结果1: {'total': 5, 'new_value': 5}
结果2: {'total': 12, 'new_value': 7}
结果3: {'total': 10, 'new_value': 10}

第二次调用因为 thread_id 相同,所以 total 从 5 开始累加,得到了 12。第三次用了不同的 thread_id,所以是从 0 开始的全新状态。

7.4 持久化记忆:SQLite 存储

InMemorySaver 把记忆存在内存里,有两个明显的问题:

  1. 1.重启丢失——程序一关,记忆全没了
  2. 2.内存瓶颈——用户多了,内存会被撑爆

生产环境通常需要把记忆持久化到磁盘上。LangGraph 官方提供了基于 SQLite 的方案。

安装依赖:

pip install langgraph-checkpoint-sqlite 

完整示例:

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import RunnableConfig
from langgraph.checkpoint.sqlite import SqliteSaver


class SqliteManager:
    """SQLite 记忆管理器,负责数据库的初始化和关闭"""
    def __init__(self):
        self.memory_ctx = SqliteSaver.from_conn_string("checkpoints.db")
        self.memory = self.memory_ctx.__enter__()

    def close(self):
        self.memory_ctx.__exit__(None, None, None)


class State(TypedDict, total=False):
    total: int
    new_value: int


def add_node(state: State) -> State:
    old_total = state.get("total", 0)
    new_value = state.get("new_value", 0)
    new_total = old_total + new_value
    return {"total": new_total, "new_value": new_value}


def build_graph(memory):
    graph_builder = StateGraph(State)
    graph_builder.add_node("add_node", add_node)
    graph_builder.add_edge(START, "add_node")
    graph_builder.add_edge("add_node", END)
    return graph_builder.compile(checkpointer=memory)


# 初始化
sqlite_manager = SqliteManager()
graph = build_graph(sqlite_manager.memory)

if __name__ == "__main__":
    try:
        # 第一次调用
        config1 = RunnableConfig(configurable={"thread_id": "aaabbb"})
        result1 = graph.invoke({"new_value": 100}, config=config1)
        print("第一次结果:", result1)

        # 同一个 thread_id,状态从上次继续
        config2 = RunnableConfig(configurable={"thread_id": "aaabbb"})
        result2 = graph.invoke({"new_value": 20}, config=config2)
        print("第二次结果:", result2)

        # 不同 thread_id
        config3 = RunnableConfig(configurable={"thread_id": "cccddd"})
        result3 = graph.invoke({"new_value": 50}, config=config3)
        print("第三次结果:", result3)
    finally:
        sqlite_manager.close()

运行后会在当前目录生成一个 checkpoints.db 文件,所有的状态变更都记录在里面。即使程序重启,只要连接同一个数据库文件,之前的记忆就还在。

需要注意 SqliteSaver 使用了上下文管理器,用完记得调用 close() 释放连接。

7.5 人工介入(Human-in-the-Loop)

这是 LangGraph 很有特色的一个能力——在流程执行过程中暂停,等待人工输入,然后从中断处继续执行。

这个功能在审批流程、内容审核、人机协作等场景中非常实用。

实现思路:

  1. 1.在需要暂停的节点中调用 interrupt() 函数
  2. 2.第一次运行图时,执行到 interrupt 节点会自动暂停,并返回中断信息
  3. 3.第二次运行时,通过 Command(resume=...) 传入人工的输入,图从上次暂停的地方继续执行

前提条件: 必须配置 thread_id 和记忆模块(checkpointer),否则图无法记住上次执行到了哪里。

完整示例:审批流程

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import RunnableConfig, interrupt, Command


class State(dict):
    task: str
    approval: str


def create_task(state: State):
    """自动步骤:创建一个待审批的任务"""
    task = "审批预算申请(金额:5000元)"
    print(f"系统创建任务:{task}")
    state["task"] = task
    return state


def human_approval(state: State):
    """人工步骤:等待人工审批"""
    print("等待人工审批中...")
    # interrupt 会暂停图的执行,等待外部传入数据
    approval = interrupt("请输入审批结果(批准 / 拒绝):")
    state["approval"] = approval
    return state


def finalize(state: State):
    """后续步骤:根据审批结果做处理"""
    if state["approval"] == "批准":
        print(f"审批通过:{state['task']}")
    else:
        print(f"审批拒绝:{state['task']}")
    return state


def build_graph():
    graph_builder = StateGraph(State)
    graph_builder.add_node("create_task", create_task)
    graph_builder.add_node("human_approval", human_approval)
    graph_builder.add_node("finalize", finalize)

    graph_builder.add_edge(START, "create_task")
    graph_builder.add_edge("create_task", "human_approval")
    graph_builder.add_edge("human_approval", "finalize")
    graph_builder.add_edge("finalize", END)

    memory = InMemorySaver()
    return graph_builder.compile(checkpointer=memory)


graph = build_graph()

# 第一次运行:执行到 interrupt 处暂停
config = RunnableConfig(configurable={"thread_id": "12345"})
print("=== 第一次运行 ===")
state = graph.invoke({}, config=config)
print("运行状态:", state)
# 输出中会包含 __interrupt__ 字段,说明图暂停了

# 第二次运行:传入人工输入,从暂停处继续
print("\n=== 第二次运行 ===")
user_decision = input("是否批准?(批准/拒绝):")
state = graph.invoke(Command(resume=user_decision), config=config)
print("最终状态:", state)

运行输出:

=== 第一次运行 ===
系统创建任务:审批预算申请(金额:5000元)
等待人工审批中...
运行状态: {'task': '审批预算申请(金额:5000元)', '__interrupt__': [...]}

=== 第二次运行 ===
是否批准?(批准/拒绝):批准
等待人工审批中...
审批通过:审批预算申请(金额:5000元)
最终状态: {'task': '审批预算申请(金额:5000元)', 'approval': '批准'}

第一次运行走到了 human_approval 节点就停住了。第二次用 Command(resume=...) 把人工输入传进去,图从暂停的地方恢复执行,走完 finalize 节点。

这个机制可以做的事情很多:人工审核内容、人工选择下一步策略、人工确认关键操作等。

7.6 并行执行

LangGraph 支持节点的并行执行,这在需要同时处理多个独立任务时非常有用。

基础并行

只需要把多个节点作为边的目标,LangGraph 会自动并行执行它们。

来看一个做饭的例子——炒菜和热米饭可以同时进行,两样都好了再上菜:

import time
from typing import TypedDict
from langgraph.graph import StateGraph, START, END


class MyState(TypedDict):
    make_vegetables: bool
    rice_heated: bool
    dish_ready: bool


def start_cooking(state: MyState):
    print("开始做饭!")
    return state


def make_vegetables(state: MyState):
    print("正在炒菜...")
    time.sleep(3)
    state["make_vegetables"] = True
    print("炒完菜了")
    return state


def heat_rice(state: MyState):
    print("微波炉正在加热米饭...")
    time.sleep(2)
    state["rice_heated"] = True
    print("米饭加热完成")
    return state


def serve_dish(state: MyState):
    print("菜和饭都准备好了,上菜!")
    state["dish_ready"] = True
    return state


def build_graph():
    graph_builder = StateGraph(MyState)
    graph_builder.add_node("start", start_cooking)
    graph_builder.add_node("make_vegetables", make_vegetables)
    graph_builder.add_node("heat_rice", heat_rice)
    graph_builder.add_node("serve", serve_dish)

    graph_builder.add_edge(START, "start")
    # start 之后,两个任务并行
    graph_builder.add_edge("start", "make_vegetables")
    graph_builder.add_edge("start", "heat_rice")
    # 两个都完成后,汇总到 serve
    graph_builder.add_edge(["make_vegetables", "heat_rice"], "serve")
    graph_builder.add_edge("serve", END)

    return graph_builder.compile()


graph = build_graph()
result = graph.invoke({})
print("最终结果:", result)

输出(注意炒菜和热米饭是交替打印的,说明确实是并行执行):

开始做饭!
微波炉正在加热米饭...
正在炒菜...
米饭加热完成
炒完菜了
菜和饭都准备好了,上菜!
最终结果: {'make_vegetables': True, 'rice_heated': True, 'dish_ready': True}

总耗时约 3 秒(取最长的那个),而不是 5 秒(3+2)。这就是并行的价值。

并行节点共享列表状态时的陷阱

当并行节点需要往同一个列表字段追加数据时,会遇到并发写入的问题。直接 append 可能导致数据丢失。

来看一个反面例子——三个厨师并行做菜,都想往 menu 列表里加菜:

# 错误写法:直接 append 会有并发问题
class MyState(TypedDict):
    menu: List[str]
    all_ready: bool

解决方法是用 Annotated 给列表字段指定一个合并策略:

from typing import TypedDict, List, Annotated
from operator import add

class MyState(TypedDict):
    # add 是内置的合并策略:把多个并行节点的输出列表拼接起来
    menu: Annotated[List[str], add]
    all_ready: bool

add 策略的含义是:当多个并行节点都往同一个字段写数据时,把它们的输出做列表拼接(而不是互相覆盖)。

你也可以自定义合并策略:

def my_merge(old_value, new_value):
    """自定义合并逻辑:始终使用新值"""
    return new_value

class MyState(TypedDict):
    menu: Annotated[List[str], my_merge]
    all_ready: bool

完整的三个厨师并行做菜示例:

import time
from typing import TypedDict, List, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END


class MyState(TypedDict):
    menu: Annotated[List[str], add]
    all_ready: bool


def start_cooking(state: MyState):
    print("厨房准备开始!")
    return state


def chef_a(state: MyState):
    print("A 厨师在做:番茄炒蛋...")
    time.sleep(2)
    return {"menu": ["番茄炒蛋"]}


def chef_b(state: MyState):
    print("B 厨师在做:宫保鸡丁...")
    time.sleep(3)
    return {"menu": ["宫保鸡丁"]}


def chef_c(state: MyState):
    print("C 厨师在做:鱼香肉丝...")
    time.sleep(1)
    return {"menu": ["鱼香肉丝"]}


def serve_dish(state: MyState):
    print("\n所有菜都准备好了!菜单如下:")
    for dish in state["menu"]:
        print(f"  - {dish}")
    return {"all_ready": True}


def build_graph():
    graph_builder = StateGraph(MyState)
    graph_builder.add_node("start", start_cooking)
    graph_builder.add_node("chef_a", chef_a)
    graph_builder.add_node("chef_b", chef_b)
    graph_builder.add_node("chef_c", chef_c)
    graph_builder.add_node("serve", serve_dish)

    graph_builder.add_edge(START, "start")
    graph_builder.add_edge("start", "chef_a")
    graph_builder.add_edge("start", "chef_b")
    graph_builder.add_edge("start", "chef_c")
    graph_builder.add_edge(["chef_a", "chef_b", "chef_c"], "serve")
    graph_builder.add_edge("serve", END)

    return graph_builder.compile()


if __name__ == "__main__":
    graph = build_graph()
    result = graph.invoke({})
    print("\n最终结果:", result)

输出:

厨房准备开始!
A 厨师在做:番茄炒蛋...
C 厨师在做:鱼香肉丝...
B 厨师在做:宫保鸡丁...

所有菜都准备好了!菜单如下:
  - 鱼香肉丝
  - 番茄炒蛋
  - 宫保鸡丁

最终结果: {'menu': ['鱼香肉丝', '番茄炒蛋', '宫保鸡丁'], 'all_ready': True}

注意:并行节点的完成顺序是不确定的,所以列表中菜品的顺序每次运行可能不一样。这是正常的。

7.7 子图(Subgraph)

当你的工作流变得复杂时,可以把一部分逻辑封装成一个子图,然后在主图中把子图当作一个普通节点来使用。这和编程中"提取函数"的思想是一样的——把一段逻辑打包,提高可复用性和可读性。

基本用法
from langgraph.graph import StateGraph, START, END
from typing import TypedDict


# ===== 子图 =====
class SubState(TypedDict):
    msg: str


def say_hello(state: SubState):
    print("子图:你好,我是子流程!")
    return state


def say_bye(state: SubState):
    print("子图:再见!")
    return state


def build_subgraph():
    g = StateGraph(SubState)
    g.add_node("hello", say_hello)
    g.add_node("bye", say_bye)
    g.add_edge(START, "hello")
    g.add_edge("hello", "bye")
    g.add_edge("bye", END)
    return g.compile()


# ===== 主图 =====
class MainState(TypedDict):
    done: bool


def start_main(state: MainState):
    print("主图开始运行!")
    return state


def finish_main(state: MainState):
    print("主图:全部完成!")
    state["done"] = True
    return state


def build_main_graph():
    subgraph = build_subgraph()   # 创建子图实例

    g = StateGraph(MainState)
    g.add_node("start", start_main)
    g.add_node("sub", subgraph)   # 把子图当作节点
    g.add_node("finish", finish_main)

    g.add_edge(START, "start")
    g.add_edge("start", "sub")
    g.add_edge("sub", "finish")
    g.add_edge("finish", END)
    return g.compile()


if __name__ == "__main__":
    graph = build_main_graph()
    result = graph.invoke({})
    print("结果:", result)
主图与子图之间的数据传递

如果子图和主图的状态结构不同,直接连接可能会有字段不匹配的问题。更稳妥的做法是把子图包装成一个普通的节点函数,在函数内部手动构造子图输入、调用子图、再把结果合并回主图状态:

from langgraph.graph import StateGraph, START, END
from typing import TypedDict


# ===== 子图 =====
class SubState(TypedDict):
    name: str


def say_hello(state: SubState):
    name = state["name"]
    print(f"子图:你好,{name},我是子流程!")
    return state


def say_bye(state: SubState):
    name = state["name"]
    print(f"子图:{name},再见!")
    return state


def build_subgraph():
    g = StateGraph(SubState)
    g.add_node("hello", say_hello)
    g.add_node("bye", say_bye)
    g.add_edge(START, "hello")
    g.add_edge("hello", "bye")
    g.add_edge("bye", END)
    return g.compile()


# ===== 主图 =====
class MainState(TypedDict):
    done: bool
    sub_output: dict


def start_main(state: MainState):
    print("主图开始运行!")
    return state


def run_subgraph(state: MainState):
    """包装子图:构造输入 → 调用子图 → 合并结果"""
    subgraph = build_subgraph()

    # 从主图状态中构造子图输入
    sub_input = {"name": "小帅"}

    # 调用子图
    result = subgraph.invoke(sub_input)

    # 将子图结果合并回主图状态
    state["sub_output"] = result
    return state


def finish_main(state: MainState):
    print("主图:全部完成!")
    state["done"] = True
    return state


def build_main_graph():
    g = StateGraph(MainState)
    g.add_node("start", start_main)
    g.add_node("sub", run_subgraph)
    g.add_node("finish", finish_main)

    g.add_edge(START, "start")
    g.add_edge("start", "sub")
    g.add_edge("sub", "finish")
    g.add_edge("finish", END)
    return g.compile()


if __name__ == "__main__":
    graph = build_main_graph()
    result = graph.invoke({})
    print("结果:", result)

输出:

主图开始运行!
子图:你好,小帅,我是子流程!
子图:小帅,再见!
主图:全部完成!
结果: {'done': True, 'sub_output': {'name': '小帅'}}

这种"包装节点"的方式让你对数据的流向有完全的控制权,推荐在主图和子图状态结构不同的时候使用。

7.8 流式输出

在实际的产品中,用户通常不想等到全部处理完才看到结果,而是希望"边生成边显示"。LangGraph 的 stream 方法支持按节点粒度做流式输出。

from langgraph.graph import StateGraph, START, END
from typing import TypedDict


class MyState(TypedDict):
    question: str
    thought: str
    answer: str


def think_node(state: MyState):
    print("思考中...")
    state["thought"] = "是怎么染色的呢?"
    return state


def answer_node(state: MyState):
    print("回答完成!")
    state["answer"] = "是用彩笔染色的。"
    return state


def build_graph():
    graph_builder = StateGraph(MyState)
    graph_builder.add_node("思考节点", think_node)
    graph_builder.add_node("回答节点", answer_node)
    graph_builder.add_edge(START, "思考节点")
    graph_builder.add_edge("思考节点", "回答节点")
    graph_builder.add_edge("回答节点", END)
    return graph_builder.compile()


if __name__ == "__main__":
    graph = build_graph()
    for chunk in graph.stream({"question": "为什么天空是蓝色的?"}):
        print(chunk)
        print("---")

输出:

思考中...
{'思考节点': {'question': '为什么天空是蓝色的?', 'thought': '是怎么染色的呢?'}}
---
回答完成!
{'回答节点': {'question': '为什么天空是蓝色的?', 'thought': '是怎么染色的呢?', 'answer': '是用彩笔染色的。'}}
---

每次循环拿到的 chunk 是一个字典,key 是刚执行完的节点名,value 是该节点输出的状态快照。

在实际应用中,你可以把每个 chunk 推送给前端,实现逐步展示的效果。如果你在节点内部调用 LLM,还可以进一步做 token 级别的流式输出(配合 LLM 的 streaming 参数)。

7.9 异步操作

LangGraph 原生支持异步调用,通过 ainvokeastream 两个方法实现。

异步在以下场景特别有价值:

  • 需要同时运行多个图实例(比如同时处理多个用户的请求)
  • 节点中有 I/O 密集型操作(调用 API、读写数据库)
  • 集成到 FastAPI 等异步框架中
ainvoke:异步调用
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict


class MyState(TypedDict):
    question: str
    thought: str
    answer: str


def think_node(state: MyState):
    print("思考中...")
    state["thought"] = "是怎么染色的呢?"
    return state


def answer_node(state: MyState):
    print("回答完成!")
    state["answer"] = "是用彩笔染色的。"
    return state


def build_graph():
    graph_builder = StateGraph(MyState)
    graph_builder.add_node("思考节点", think_node)
    graph_builder.add_node("回答节点", answer_node)
    graph_builder.add_edge(START, "思考节点")
    graph_builder.add_edge("思考节点", "回答节点")
    graph_builder.add_edge("回答节点", END)
    return graph_builder.compile()


async def main(state: MyState):
    graph = build_graph()
    result = await graph.ainvoke(state)
    return result


if __name__ == "__main__":
    result = asyncio.run(main({"question": "为什么天空是蓝色的?"}))
    print("结果:", result)
astream:异步流式输出
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict


class MyState(TypedDict):
    question: str
    thought: str
    answer: str


def think_node(state: MyState):
    print("思考中...")
    state["thought"] = "是怎么染色的呢?"
    return state


def answer_node(state: MyState):
    print("回答完成!")
    state["answer"] = "是用彩笔染色的。"
    return state


def build_graph():
    graph_builder = StateGraph(MyState)
    graph_builder.add_node("思考节点", think_node)
    graph_builder.add_node("回答节点", answer_node)
    graph_builder.add_edge(START, "思考节点")
    graph_builder.add_edge("思考节点", "回答节点")
    graph_builder.add_edge("回答节点", END)
    return graph_builder.compile()


async def main(state: MyState):
    graph = build_graph()
    result = None
    async for event in graph.astream(state):
        print("收到事件:", event)
        result = event
    return result


if __name__ == "__main__":
    result = asyncio.run(main({"question": "为什么天空是蓝色的?"}))
    print("最终结果:", result)

在实际项目中,如果你用 FastAPI 来搭建后端服务,异步调用几乎是必须的——FastAPI 的路由函数本身就是异步的,用 ainvoke 可以无缝集成,不会阻塞事件循环。


八、总结

最后回顾一下 LangGraph 的核心能力:

能力 说明 关键 API
图结构编排 节点 + 边定义工作流 StateGraph, add_node, add_edge
条件分支 根据状态动态路由 add_conditional_edges
状态管理 节点间共享和更新数据 TypedDict / Pydantic 定义状态
Config 配置 运行时传入动态参数 RunnableConfig
记忆持久化 跨轮次保持状态 InMemorySaver, SqliteSaver
人工介入 流程暂停等待人工输入 interrupt, Command(resume=...)
并行执行 多个节点同时运行 add_edge([A, B], C)
子图 模块化封装子流程 子图实例作为节点添加
流式输出 按节点粒度逐步返回 graph.stream()
异步调用 非阻塞执行 ainvoke, astream

LangGraph 本质上是在 LangChain 之上提供了一层图结构的编排能力,让 LLM 应用从"一问一答"进化到"有状态、有分支、有循环、能协作"的复杂智能体。如果你的项目只是简单的问答,LangChain 的 Chain 就够了;但一旦涉及多步决策、状态管理、人工干预等需求,LangGraph 就是更合适的选择。

这篇文章记录了我学习 LangGraph 的过程,涵盖了从基础到进阶的完整内容。如果你也在做大模型应用开发,希望这些笔记能给你一些参考。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐