LangGraph 从入门到进阶:构建有状态的 LLM 工作流应用
本文介绍了LangGraph这一由LangChain团队开发的Python库,它通过图结构工作流编排能力,为构建复杂的大语言模型应用提供了强大支持。文章从基础概念入手,详细讲解了图、节点、边和状态四个核心要素,并通过多功能对话机器人的案例演示了条件分支的实现。进阶部分涵盖了配置管理、会话标识、记忆持久化、人工介入、并行执行、子图封装、流式输出和异步操作等实用功能。LangGraph弥补了LangC
目录
一、LangGraph 是什么
LangGraph 是由 LangChain 团队开发的一个 Python 库,专门用于构建有状态的、多步骤的、由大语言模型驱动的应用程序。
它的核心思想很直观:把应用的工作流建模为一张"图"。图里有节点(执行具体功能)和边(定义流转关系),数据(状态)在图中流转和更新。
打个比方,如果 LangChain 的 Chain 是一条单行道,那 LangGraph 就是一个立交桥——有多条路径、有岔路口、有环路,车辆(数据)可以根据路况(条件)选择不同的方向。
你可以把它理解成一个专门为 LLM 应用设计的"工作流引擎",或者把它看成代码版的 Coze / Dify——只不过用纯代码来定义流程,灵活性更高。
二、为什么需要 LangGraph
在 LangGraph 出现之前,我们用 LangChain 构建的主要是"链"(Chain)——输入经过检索、然后生成答案、最后输出,一条直线走到底。这种模式对简单的问答场景够用了,但一旦遇到下面这些情况就会捉襟见肘:
场景一:需要根据中间结果做决策的多步流程
比如用户问了一个问题,系统需要先判断这个问题属于哪个领域,再决定调用哪个专业工具来回答。这种"分叉"逻辑在纯链式结构中很难表达。
场景二:多轮对话中的状态管理
聊天机器人需要记住用户之前说了什么、已经执行了哪些操作、当前处于什么阶段。这些信息需要在多轮交互中持久保存和更新。
场景三:多角色协作系统
模拟一个项目团队,产品经理提需求、工程师写代码、测试员做测试,多个角色交替执行任务,彼此之间需要传递和共享信息。
场景四:需要人工介入的审批流程
比如一个自动化的申请处理系统,机器跑完自动审批步骤后,需要暂停等人来确认,确认完再继续后面的流程。
这些场景的共同特征是:需要条件分支、循环执行、状态持久化、以及流程中断与恢复。这正是 LangGraph 着力解决的核心问题。
三、核心概念详解
学 LangGraph 其实就是在理解四个东西:图、节点、边、状态。把这四个概念吃透了,上手就很快。
3.1 图(Graph)
图是整个应用的骨架。它由节点和边组成,负责管理和调度整个工作流的执行顺序。
在代码层面,我们通过 StateGraph 类来创建图:
from langgraph.graph import StateGraph
graph_builder = StateGraph(你的状态类型)
3.2 节点(Node)
节点是图中的基本功能单元,本质上就是一个 Python 函数。
每个节点做的事情很统一:接收当前的共享状态 → 执行业务逻辑 → 返回更新后的状态。
比如一个节点负责调用大模型做翻译,另一个节点负责调搜索引擎做信息检索,还有一个节点负责做意图分类——每个节点各司其职。
def my_node(state):
# 做点事情,修改 state
state["某个字段"] = "新的值"
return state
用 add_node 把节点注册到图上:
graph_builder.add_node("节点名称", 对应的函数)
3.3 边(Edge)
边连接节点,定义了执行完一个节点之后,下一步该去哪里。
LangGraph 里有两种边:
普通边——无条件跳转,执行完 A 就去 B:
graph_builder.add_edge("节点A", "节点B")
条件边——根据当前状态的值动态决定走向:
def routing_logic(state):
if state["某个条件"]:
return "节点A"
else:
return "节点B"
graph_builder.add_conditional_edges("当前节点", routing_logic)
条件边是 LangGraph 实现分支逻辑的关键机制。
3.4 状态(State)
状态是在整个图中流转和更新的共享数据。它是所有节点之间传递信息的载体。
LangGraph 推荐用 TypedDict 或 Pydantic 模型来定义状态的结构,这样每个字段都有明确的类型标注,代码更清晰,也方便 IDE 做类型检查:
from typing import TypedDict
class MyState(TypedDict):
input: str
output: str
is_done: bool
状态的设计是 LangGraph 应用的核心。你需要想清楚整个流程中需要传递哪些数据,然后把它们都定义在状态里。
四、LangGraph 和 LangChain 的关系
很多人初学的时候会搞混这两个东西,这里做个梳理。
LangChain 提供的是基础组件:LLM 封装、Prompt 模板、Tool 定义、Chain 等。它是构建 LLM 应用的"零件库"。
LangGraph 在 LangChain 的基础上,提供了一套图结构的工作流编排能力。它不是要取代 LangChain,而是在 LangChain 之上加了一层"编排层"。
用一个实际的类比来说:
- LangChain 像是给了你各种食材和厨具(LLM、Tool、Prompt)
- LangGraph 则是给了你一张菜谱和烹饪流程图(先炒菜再炖汤,如果口味偏辣就多加辣椒)
两者通常是配合使用的。在 LangGraph 的节点里面,你经常会调用 LangChain 的 LLM、Tool 或 Chain。
| 维度 | LangChain | LangGraph |
|---|---|---|
| 执行模式 | 线性链式,单向流动 | 图结构,支持分支和循环 |
| 状态管理 | 无内置状态管理 | 内置状态定义和流转机制 |
| 条件分支 | 需要自己写逻辑 | 原生支持条件边 |
| 人工介入 | 不支持 | 支持 interrupt 机制 |
| 并行执行 | 不支持 | 支持并行节点 |
| 适用场景 | 简单问答、单步任务 | 复杂 Agent、多步工作流 |
简单说:LangChain 是"直线型"流程,LangGraph 是"地图型"流程。
五、安装
安装 LangGraph 及其依赖:
pip install langchain pip install langgraph
如果需要使用 OpenAI 兼容接口(比如 DeepSeek):
pip install langchain-openai
六、基础案例:多功能对话机器人
6.1 需求分析
我们要实现一个多功能对话机器人,功能如下:
- 用户输入的内容如果是一个翻译请求(比如"把这句话翻译成英文"),机器人自动识别语言并进行中英互译,只输出译文。
- 如果用户输入的是普通问题(比如"什么是大模型"),机器人用中文给出回答。
核心逻辑就是:先做意图分类 → 根据分类结果走不同的分支 → 输出结果。
6.2 流程设计
整个流程用文字描述如下:
用户输入
│
▼
意图识别节点(判断是翻译还是问答)
│
├── 是翻译 ──▶ 抽取翻译内容 ──▶ 翻译 ──▶ 输出译文
│
└── 是问答 ──▶ 回答问题 ──▶ 输出答案
这是一个典型的条件分支图结构,正好用 LangGraph 来实现。
6.3 完整代码实现
第一步:导入依赖
from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
from langchain_core.messages import SystemMessage, HumanMessage
第二步:配置 LLM
这里以 DeepSeek 为例,你也可以换成其他兼容 OpenAI 接口的模型:
llm = ChatOpenAI(
model="deepseek-chat",
base_url="https://api.deepseek.com",
api_key="sk-你的api密钥"
)
第三步:定义状态
class AgentState(dict):
input: str # 用户输入
is_translate_or_not: bool # 是否是翻译任务
translate_sentence: str # 提取出来的待翻译句子
translate_result: str # 翻译结果
qa_result: str # 问答结果
output: str # 最终输出
第四步:定义各个节点
意图识别节点——让 LLM 判断用户输入属于翻译还是问答:
def identify_intent_node(state: AgentState) -> AgentState:
"""判断用户输入是翻译任务还是普通问答"""
user_input = state['input']
messages = [
SystemMessage(content=(
"你是一个意图分类助手,用来判断用户输入是'翻译任务'还是'普通回答任务'。\n"
"翻译任务的特征:用户明确要求翻译文章、句子、单词,或要求将中文翻译成英文/英文翻译成中文。\n"
"普通回答任务的特征:用户是直接提问、咨询信息、讨论话题,而不是要求翻译。\n"
"请你只回答:翻译 或 普通回答,不要补充其他内容。"
)),
HumanMessage(content=user_input)
]
response = llm.invoke(messages).content.strip()
state['is_translate_or_not'] = "翻译" in response
return state
抽取翻译内容节点——从用户输入中提取出真正需要翻译的文本:
def extract_translate_sentence_node(state: AgentState) -> AgentState:
"""提取待翻译的句子"""
user_input = state['input']
prompt = (
"你是翻译助手。请先自动识别语言,然后提取待翻译的句子。"
"请将待翻译的句子提取出来,并输出。"
"请不要输出其他内容。\n"
f"待翻译内容:{user_input}"
)
result = llm.invoke([HumanMessage(content=prompt)]).content
state['translate_sentence'] = result
return state
翻译节点——对提取出来的文本做中英互译:
def translate_node(state: AgentState) -> AgentState:
"""执行翻译"""
sentence = state["translate_sentence"]
prompt = (
"你是翻译助手。请先自动识别语言,然后在中文与英文之间互译。"
"只输出译文本身,不要额外解释。\n"
f"待翻译内容:{sentence}"
)
result = llm.invoke([HumanMessage(content=prompt)]).content
state['translate_result'] = result
state['output'] = result
return state
问答节点——直接回答用户的问题:
def qa_node(state: AgentState) -> AgentState:
"""回答用户问题"""
question = state["input"]
prompt = (
"你是一个问答助手,请回答问题。"
"请使用中文回答。\n"
f"问题:{question}"
)
result = llm.invoke([HumanMessage(content=prompt)]).content
state['qa_result'] = result
state['output'] = result
return state
第五步:构建图
def build_langgraph():
graph_builder = StateGraph(AgentState)
# 注册节点
graph_builder.add_node("意图识别节点", identify_intent_node)
graph_builder.add_node("抽取翻译内容节点", extract_translate_sentence_node)
graph_builder.add_node("翻译节点", translate_node)
graph_builder.add_node("问答节点", qa_node)
# 定义边
graph_builder.add_edge(START, "意图识别节点")
# 条件边:根据意图识别结果走不同分支
def route_by_intent(state: AgentState):
if state['is_translate_or_not']:
return "抽取翻译内容节点"
else:
return "问答节点"
graph_builder.add_conditional_edges("意图识别节点", route_by_intent)
graph_builder.add_edge("抽取翻译内容节点", "翻译节点")
graph_builder.add_edge("翻译节点", END)
graph_builder.add_edge("问答节点", END)
graph = graph_builder.compile()
return graph
第六步:测试
if __name__ == "__main__":
graph = build_langgraph()
# 测试翻译任务
print("=== 翻译测试 ===")
state = graph.invoke({"input": "请将这句话翻译成英文:你好,世界!"})
print("翻译结果:", state['output'])
# 测试普通问答任务
print("\n=== 问答测试 ===")
state = graph.invoke({"input": "解释下什么是大模型"})
print("回答结果:", state['output'])
6.4 可视化输出图
LangGraph 内置了图的可视化能力,可以将构建好的图导出为图片:
from langgraph.graph.state import CompiledStateGraph
def output_pic_graph(graph: CompiledStateGraph, filename: str = "graph.png"):
"""将图导出为图片"""
try:
mermaid_code = graph.get_graph().draw_mermaid_png()
with open(filename, 'wb') as f:
f.write(mermaid_code)
print(f"图已保存到 {filename}")
except Exception as e:
print(f"导出失败:{e}")
调用 output_pic_graph(graph) 就会在当前目录生成一张 Mermaid 格式的流程图。前提是需要安装 pip install pygraphviz(部分环境下需要额外安装系统级依赖)。
如果安装 graphviz 有困难,也可以直接在终端打印 Mermaid 代码:
print(graph.get_graph().draw_mermaid())
然后把输出的 Mermaid 代码粘贴到支持 Mermaid 的编辑器(比如 Typora、飞书文档、CSDN 的 Mermaid 代码块)中即可渲染。
七、进阶篇
基础案例跑通之后,我们来看 LangGraph 的进阶功能。这些功能在实际项目中非常高频。
7.1 设置 Config 配置
基本用法
LangGraph 支持在运行时通过 config 参数向图或节点传递配置信息。这个机制很实用——比如你想在不修改代码的情况下切换 LLM 的模型参数、设置 API Key、或者传入用户级别的配置。
Config 的本质就是一个字典,通过 RunnableConfig 来封装:
from langgraph.types import RunnableConfig
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class State(TypedDict):
input: str
step1: str
step2: str
result: str
def step1(state: State, config: RunnableConfig):
prefix = config.get("configurable", {}).get("prefix", "")
text = f"{prefix}{state['input']}"
state["step1"] = text
print(f"[Step1] 使用 prefix = {prefix}")
return state
def step2(state: State, config: RunnableConfig):
suffix = config.get("configurable", {}).get("suffix", "")
result = f"{state['step1']}{suffix}"
state["step2"] = result
state["result"] = result
print(f"[Step2] 使用 suffix = {suffix}")
return state
def build_graph():
graph_builder = StateGraph(State)
graph_builder.add_node("step1", step1)
graph_builder.add_node("step2", step2)
graph_builder.add_edge(START, "step1")
graph_builder.add_edge("step1", "step2")
graph_builder.add_edge("step2", END)
return graph_builder.compile()
graph = build_graph()
# 运行时传入 config
config = RunnableConfig(configurable={
"prefix": "[前缀] ",
"suffix": " [后缀]"
})
result = graph.invoke({"input": "你好,LangGraph!"}, config=config)
print("最终输出:", result)
注意看节点函数的签名:它多了一个 config: RunnableConfig 参数。LangGraph 在调用节点时会自动把 config 注入进去,你不需要手动传递。
实际项目中,config 经常用来做这些事情:
- 给 LLM 传入不同的 API Key 和 base_url
- 设置模型的 temperature、max_tokens 等参数
- 传入用户身份信息做权限控制
- 设置 thread_id 来标识会话
7.2 thread_id:会话身份标识
在所有 config 参数中,thread_id 是最特殊也最重要的一个。
它相当于给每个会话分配了一个唯一编号。LangGraph 靠它来识别"这是同一个用户的连续对话"还是"这是一个全新的会话"。
打个比方:你去银行办业务,第一次去取了号(thread_id),办到一半离开,回来时再报同一个号码,柜员就能从上次中断的地方继续。如果你重新取号,那就是一笔新业务。
thread_id 的典型应用场景:
- 连续对话记忆:记住用户之前说了什么
- 中断恢复:流程暂停后(比如等人工审批),通过同一个 thread_id 恢复执行
- 异步任务:一个复杂任务分多次完成,thread_id 确保每次都接上之前的进度
使用方式:
config = RunnableConfig(configurable={"thread_id": "用户A的会话"})
同一个 thread_id 配合记忆模块(后面会讲),就能实现跨轮次的状态保持。
7.3 加入记忆:InMemorySaver
没有记忆的 LangGraph 图,每次运行都是独立的,不会保留上次的状态。加上记忆之后,同一个 thread_id 的多次调用可以共享状态。
实现方式非常简单,只需要两步:
第一步,创建一个 checkpointer(记忆存储器):
from langgraph.checkpoint.memory import InMemorySaver
memory = InMemorySaver()
第二步,在编译图时传入:
graph = graph_builder.compile(checkpointer=memory)
这样就搞定了。来看一个完整的累加器例子,直观感受记忆的效果:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import RunnableConfig
class State(dict):
total: int # 累加总和
new_value: int # 本次输入的数字
def add_node(state: State):
old_total = state.get("total", 0)
new_value = state["new_value"]
new_total = old_total + new_value
state["total"] = new_total
return state
def build_graph():
graph_builder = StateGraph(State)
graph_builder.add_node("add", add_node)
graph_builder.add_edge(START, "add")
graph_builder.add_edge("add", END)
memory = InMemorySaver()
graph = graph_builder.compile(checkpointer=memory)
return graph
graph = build_graph()
# 第一次调用:thread_id = "12345"
config = RunnableConfig(configurable={"thread_id": "12345"})
result1 = graph.invoke({"new_value": 5}, config=config)
print("结果1:", result1) # total = 5
# 第二次调用:同一个 thread_id,状态会从上次的结果继续
result2 = graph.invoke({"new_value": 7}, config=config)
print("结果2:", result2) # total = 12(5 + 7)
# 第三次调用:不同的 thread_id,全新状态
config2 = RunnableConfig(configurable={"thread_id": "67890"})
result3 = graph.invoke({"new_value": 10}, config=config2)
print("结果3:", result3) # total = 10
输出结果:
结果1: {'total': 5, 'new_value': 5}
结果2: {'total': 12, 'new_value': 7}
结果3: {'total': 10, 'new_value': 10}
第二次调用因为 thread_id 相同,所以 total 从 5 开始累加,得到了 12。第三次用了不同的 thread_id,所以是从 0 开始的全新状态。
7.4 持久化记忆:SQLite 存储
InMemorySaver 把记忆存在内存里,有两个明显的问题:
- 1.重启丢失——程序一关,记忆全没了
- 2.内存瓶颈——用户多了,内存会被撑爆
生产环境通常需要把记忆持久化到磁盘上。LangGraph 官方提供了基于 SQLite 的方案。
安装依赖:
pip install langgraph-checkpoint-sqlite
完整示例:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import RunnableConfig
from langgraph.checkpoint.sqlite import SqliteSaver
class SqliteManager:
"""SQLite 记忆管理器,负责数据库的初始化和关闭"""
def __init__(self):
self.memory_ctx = SqliteSaver.from_conn_string("checkpoints.db")
self.memory = self.memory_ctx.__enter__()
def close(self):
self.memory_ctx.__exit__(None, None, None)
class State(TypedDict, total=False):
total: int
new_value: int
def add_node(state: State) -> State:
old_total = state.get("total", 0)
new_value = state.get("new_value", 0)
new_total = old_total + new_value
return {"total": new_total, "new_value": new_value}
def build_graph(memory):
graph_builder = StateGraph(State)
graph_builder.add_node("add_node", add_node)
graph_builder.add_edge(START, "add_node")
graph_builder.add_edge("add_node", END)
return graph_builder.compile(checkpointer=memory)
# 初始化
sqlite_manager = SqliteManager()
graph = build_graph(sqlite_manager.memory)
if __name__ == "__main__":
try:
# 第一次调用
config1 = RunnableConfig(configurable={"thread_id": "aaabbb"})
result1 = graph.invoke({"new_value": 100}, config=config1)
print("第一次结果:", result1)
# 同一个 thread_id,状态从上次继续
config2 = RunnableConfig(configurable={"thread_id": "aaabbb"})
result2 = graph.invoke({"new_value": 20}, config=config2)
print("第二次结果:", result2)
# 不同 thread_id
config3 = RunnableConfig(configurable={"thread_id": "cccddd"})
result3 = graph.invoke({"new_value": 50}, config=config3)
print("第三次结果:", result3)
finally:
sqlite_manager.close()
运行后会在当前目录生成一个 checkpoints.db 文件,所有的状态变更都记录在里面。即使程序重启,只要连接同一个数据库文件,之前的记忆就还在。
需要注意 SqliteSaver 使用了上下文管理器,用完记得调用 close() 释放连接。
7.5 人工介入(Human-in-the-Loop)
这是 LangGraph 很有特色的一个能力——在流程执行过程中暂停,等待人工输入,然后从中断处继续执行。
这个功能在审批流程、内容审核、人机协作等场景中非常实用。
实现思路:
- 1.在需要暂停的节点中调用
interrupt()函数 - 2.第一次运行图时,执行到 interrupt 节点会自动暂停,并返回中断信息
- 3.第二次运行时,通过
Command(resume=...)传入人工的输入,图从上次暂停的地方继续执行
前提条件: 必须配置 thread_id 和记忆模块(checkpointer),否则图无法记住上次执行到了哪里。
完整示例:审批流程
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import RunnableConfig, interrupt, Command
class State(dict):
task: str
approval: str
def create_task(state: State):
"""自动步骤:创建一个待审批的任务"""
task = "审批预算申请(金额:5000元)"
print(f"系统创建任务:{task}")
state["task"] = task
return state
def human_approval(state: State):
"""人工步骤:等待人工审批"""
print("等待人工审批中...")
# interrupt 会暂停图的执行,等待外部传入数据
approval = interrupt("请输入审批结果(批准 / 拒绝):")
state["approval"] = approval
return state
def finalize(state: State):
"""后续步骤:根据审批结果做处理"""
if state["approval"] == "批准":
print(f"审批通过:{state['task']}")
else:
print(f"审批拒绝:{state['task']}")
return state
def build_graph():
graph_builder = StateGraph(State)
graph_builder.add_node("create_task", create_task)
graph_builder.add_node("human_approval", human_approval)
graph_builder.add_node("finalize", finalize)
graph_builder.add_edge(START, "create_task")
graph_builder.add_edge("create_task", "human_approval")
graph_builder.add_edge("human_approval", "finalize")
graph_builder.add_edge("finalize", END)
memory = InMemorySaver()
return graph_builder.compile(checkpointer=memory)
graph = build_graph()
# 第一次运行:执行到 interrupt 处暂停
config = RunnableConfig(configurable={"thread_id": "12345"})
print("=== 第一次运行 ===")
state = graph.invoke({}, config=config)
print("运行状态:", state)
# 输出中会包含 __interrupt__ 字段,说明图暂停了
# 第二次运行:传入人工输入,从暂停处继续
print("\n=== 第二次运行 ===")
user_decision = input("是否批准?(批准/拒绝):")
state = graph.invoke(Command(resume=user_decision), config=config)
print("最终状态:", state)
运行输出:
=== 第一次运行 ===
系统创建任务:审批预算申请(金额:5000元)
等待人工审批中...
运行状态: {'task': '审批预算申请(金额:5000元)', '__interrupt__': [...]}
=== 第二次运行 ===
是否批准?(批准/拒绝):批准
等待人工审批中...
审批通过:审批预算申请(金额:5000元)
最终状态: {'task': '审批预算申请(金额:5000元)', 'approval': '批准'}
第一次运行走到了 human_approval 节点就停住了。第二次用 Command(resume=...) 把人工输入传进去,图从暂停的地方恢复执行,走完 finalize 节点。
这个机制可以做的事情很多:人工审核内容、人工选择下一步策略、人工确认关键操作等。
7.6 并行执行
LangGraph 支持节点的并行执行,这在需要同时处理多个独立任务时非常有用。
基础并行
只需要把多个节点作为边的目标,LangGraph 会自动并行执行它们。
来看一个做饭的例子——炒菜和热米饭可以同时进行,两样都好了再上菜:
import time
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class MyState(TypedDict):
make_vegetables: bool
rice_heated: bool
dish_ready: bool
def start_cooking(state: MyState):
print("开始做饭!")
return state
def make_vegetables(state: MyState):
print("正在炒菜...")
time.sleep(3)
state["make_vegetables"] = True
print("炒完菜了")
return state
def heat_rice(state: MyState):
print("微波炉正在加热米饭...")
time.sleep(2)
state["rice_heated"] = True
print("米饭加热完成")
return state
def serve_dish(state: MyState):
print("菜和饭都准备好了,上菜!")
state["dish_ready"] = True
return state
def build_graph():
graph_builder = StateGraph(MyState)
graph_builder.add_node("start", start_cooking)
graph_builder.add_node("make_vegetables", make_vegetables)
graph_builder.add_node("heat_rice", heat_rice)
graph_builder.add_node("serve", serve_dish)
graph_builder.add_edge(START, "start")
# start 之后,两个任务并行
graph_builder.add_edge("start", "make_vegetables")
graph_builder.add_edge("start", "heat_rice")
# 两个都完成后,汇总到 serve
graph_builder.add_edge(["make_vegetables", "heat_rice"], "serve")
graph_builder.add_edge("serve", END)
return graph_builder.compile()
graph = build_graph()
result = graph.invoke({})
print("最终结果:", result)
输出(注意炒菜和热米饭是交替打印的,说明确实是并行执行):
开始做饭!
微波炉正在加热米饭...
正在炒菜...
米饭加热完成
炒完菜了
菜和饭都准备好了,上菜!
最终结果: {'make_vegetables': True, 'rice_heated': True, 'dish_ready': True}
总耗时约 3 秒(取最长的那个),而不是 5 秒(3+2)。这就是并行的价值。
并行节点共享列表状态时的陷阱
当并行节点需要往同一个列表字段追加数据时,会遇到并发写入的问题。直接 append 可能导致数据丢失。
来看一个反面例子——三个厨师并行做菜,都想往 menu 列表里加菜:
# 错误写法:直接 append 会有并发问题
class MyState(TypedDict):
menu: List[str]
all_ready: bool
解决方法是用 Annotated 给列表字段指定一个合并策略:
from typing import TypedDict, List, Annotated
from operator import add
class MyState(TypedDict):
# add 是内置的合并策略:把多个并行节点的输出列表拼接起来
menu: Annotated[List[str], add]
all_ready: bool
add 策略的含义是:当多个并行节点都往同一个字段写数据时,把它们的输出做列表拼接(而不是互相覆盖)。
你也可以自定义合并策略:
def my_merge(old_value, new_value):
"""自定义合并逻辑:始终使用新值"""
return new_value
class MyState(TypedDict):
menu: Annotated[List[str], my_merge]
all_ready: bool
完整的三个厨师并行做菜示例:
import time
from typing import TypedDict, List, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
class MyState(TypedDict):
menu: Annotated[List[str], add]
all_ready: bool
def start_cooking(state: MyState):
print("厨房准备开始!")
return state
def chef_a(state: MyState):
print("A 厨师在做:番茄炒蛋...")
time.sleep(2)
return {"menu": ["番茄炒蛋"]}
def chef_b(state: MyState):
print("B 厨师在做:宫保鸡丁...")
time.sleep(3)
return {"menu": ["宫保鸡丁"]}
def chef_c(state: MyState):
print("C 厨师在做:鱼香肉丝...")
time.sleep(1)
return {"menu": ["鱼香肉丝"]}
def serve_dish(state: MyState):
print("\n所有菜都准备好了!菜单如下:")
for dish in state["menu"]:
print(f" - {dish}")
return {"all_ready": True}
def build_graph():
graph_builder = StateGraph(MyState)
graph_builder.add_node("start", start_cooking)
graph_builder.add_node("chef_a", chef_a)
graph_builder.add_node("chef_b", chef_b)
graph_builder.add_node("chef_c", chef_c)
graph_builder.add_node("serve", serve_dish)
graph_builder.add_edge(START, "start")
graph_builder.add_edge("start", "chef_a")
graph_builder.add_edge("start", "chef_b")
graph_builder.add_edge("start", "chef_c")
graph_builder.add_edge(["chef_a", "chef_b", "chef_c"], "serve")
graph_builder.add_edge("serve", END)
return graph_builder.compile()
if __name__ == "__main__":
graph = build_graph()
result = graph.invoke({})
print("\n最终结果:", result)
输出:
厨房准备开始!
A 厨师在做:番茄炒蛋...
C 厨师在做:鱼香肉丝...
B 厨师在做:宫保鸡丁...
所有菜都准备好了!菜单如下:
- 鱼香肉丝
- 番茄炒蛋
- 宫保鸡丁
最终结果: {'menu': ['鱼香肉丝', '番茄炒蛋', '宫保鸡丁'], 'all_ready': True}
注意:并行节点的完成顺序是不确定的,所以列表中菜品的顺序每次运行可能不一样。这是正常的。
7.7 子图(Subgraph)
当你的工作流变得复杂时,可以把一部分逻辑封装成一个子图,然后在主图中把子图当作一个普通节点来使用。这和编程中"提取函数"的思想是一样的——把一段逻辑打包,提高可复用性和可读性。
基本用法
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
# ===== 子图 =====
class SubState(TypedDict):
msg: str
def say_hello(state: SubState):
print("子图:你好,我是子流程!")
return state
def say_bye(state: SubState):
print("子图:再见!")
return state
def build_subgraph():
g = StateGraph(SubState)
g.add_node("hello", say_hello)
g.add_node("bye", say_bye)
g.add_edge(START, "hello")
g.add_edge("hello", "bye")
g.add_edge("bye", END)
return g.compile()
# ===== 主图 =====
class MainState(TypedDict):
done: bool
def start_main(state: MainState):
print("主图开始运行!")
return state
def finish_main(state: MainState):
print("主图:全部完成!")
state["done"] = True
return state
def build_main_graph():
subgraph = build_subgraph() # 创建子图实例
g = StateGraph(MainState)
g.add_node("start", start_main)
g.add_node("sub", subgraph) # 把子图当作节点
g.add_node("finish", finish_main)
g.add_edge(START, "start")
g.add_edge("start", "sub")
g.add_edge("sub", "finish")
g.add_edge("finish", END)
return g.compile()
if __name__ == "__main__":
graph = build_main_graph()
result = graph.invoke({})
print("结果:", result)
主图与子图之间的数据传递
如果子图和主图的状态结构不同,直接连接可能会有字段不匹配的问题。更稳妥的做法是把子图包装成一个普通的节点函数,在函数内部手动构造子图输入、调用子图、再把结果合并回主图状态:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
# ===== 子图 =====
class SubState(TypedDict):
name: str
def say_hello(state: SubState):
name = state["name"]
print(f"子图:你好,{name},我是子流程!")
return state
def say_bye(state: SubState):
name = state["name"]
print(f"子图:{name},再见!")
return state
def build_subgraph():
g = StateGraph(SubState)
g.add_node("hello", say_hello)
g.add_node("bye", say_bye)
g.add_edge(START, "hello")
g.add_edge("hello", "bye")
g.add_edge("bye", END)
return g.compile()
# ===== 主图 =====
class MainState(TypedDict):
done: bool
sub_output: dict
def start_main(state: MainState):
print("主图开始运行!")
return state
def run_subgraph(state: MainState):
"""包装子图:构造输入 → 调用子图 → 合并结果"""
subgraph = build_subgraph()
# 从主图状态中构造子图输入
sub_input = {"name": "小帅"}
# 调用子图
result = subgraph.invoke(sub_input)
# 将子图结果合并回主图状态
state["sub_output"] = result
return state
def finish_main(state: MainState):
print("主图:全部完成!")
state["done"] = True
return state
def build_main_graph():
g = StateGraph(MainState)
g.add_node("start", start_main)
g.add_node("sub", run_subgraph)
g.add_node("finish", finish_main)
g.add_edge(START, "start")
g.add_edge("start", "sub")
g.add_edge("sub", "finish")
g.add_edge("finish", END)
return g.compile()
if __name__ == "__main__":
graph = build_main_graph()
result = graph.invoke({})
print("结果:", result)
输出:
主图开始运行!
子图:你好,小帅,我是子流程!
子图:小帅,再见!
主图:全部完成!
结果: {'done': True, 'sub_output': {'name': '小帅'}}
这种"包装节点"的方式让你对数据的流向有完全的控制权,推荐在主图和子图状态结构不同的时候使用。
7.8 流式输出
在实际的产品中,用户通常不想等到全部处理完才看到结果,而是希望"边生成边显示"。LangGraph 的 stream 方法支持按节点粒度做流式输出。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class MyState(TypedDict):
question: str
thought: str
answer: str
def think_node(state: MyState):
print("思考中...")
state["thought"] = "是怎么染色的呢?"
return state
def answer_node(state: MyState):
print("回答完成!")
state["answer"] = "是用彩笔染色的。"
return state
def build_graph():
graph_builder = StateGraph(MyState)
graph_builder.add_node("思考节点", think_node)
graph_builder.add_node("回答节点", answer_node)
graph_builder.add_edge(START, "思考节点")
graph_builder.add_edge("思考节点", "回答节点")
graph_builder.add_edge("回答节点", END)
return graph_builder.compile()
if __name__ == "__main__":
graph = build_graph()
for chunk in graph.stream({"question": "为什么天空是蓝色的?"}):
print(chunk)
print("---")
输出:
思考中...
{'思考节点': {'question': '为什么天空是蓝色的?', 'thought': '是怎么染色的呢?'}}
---
回答完成!
{'回答节点': {'question': '为什么天空是蓝色的?', 'thought': '是怎么染色的呢?', 'answer': '是用彩笔染色的。'}}
---
每次循环拿到的 chunk 是一个字典,key 是刚执行完的节点名,value 是该节点输出的状态快照。
在实际应用中,你可以把每个 chunk 推送给前端,实现逐步展示的效果。如果你在节点内部调用 LLM,还可以进一步做 token 级别的流式输出(配合 LLM 的 streaming 参数)。
7.9 异步操作
LangGraph 原生支持异步调用,通过 ainvoke 和 astream 两个方法实现。
异步在以下场景特别有价值:
- 需要同时运行多个图实例(比如同时处理多个用户的请求)
- 节点中有 I/O 密集型操作(调用 API、读写数据库)
- 集成到 FastAPI 等异步框架中
ainvoke:异步调用
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class MyState(TypedDict):
question: str
thought: str
answer: str
def think_node(state: MyState):
print("思考中...")
state["thought"] = "是怎么染色的呢?"
return state
def answer_node(state: MyState):
print("回答完成!")
state["answer"] = "是用彩笔染色的。"
return state
def build_graph():
graph_builder = StateGraph(MyState)
graph_builder.add_node("思考节点", think_node)
graph_builder.add_node("回答节点", answer_node)
graph_builder.add_edge(START, "思考节点")
graph_builder.add_edge("思考节点", "回答节点")
graph_builder.add_edge("回答节点", END)
return graph_builder.compile()
async def main(state: MyState):
graph = build_graph()
result = await graph.ainvoke(state)
return result
if __name__ == "__main__":
result = asyncio.run(main({"question": "为什么天空是蓝色的?"}))
print("结果:", result)
astream:异步流式输出
import asyncio
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class MyState(TypedDict):
question: str
thought: str
answer: str
def think_node(state: MyState):
print("思考中...")
state["thought"] = "是怎么染色的呢?"
return state
def answer_node(state: MyState):
print("回答完成!")
state["answer"] = "是用彩笔染色的。"
return state
def build_graph():
graph_builder = StateGraph(MyState)
graph_builder.add_node("思考节点", think_node)
graph_builder.add_node("回答节点", answer_node)
graph_builder.add_edge(START, "思考节点")
graph_builder.add_edge("思考节点", "回答节点")
graph_builder.add_edge("回答节点", END)
return graph_builder.compile()
async def main(state: MyState):
graph = build_graph()
result = None
async for event in graph.astream(state):
print("收到事件:", event)
result = event
return result
if __name__ == "__main__":
result = asyncio.run(main({"question": "为什么天空是蓝色的?"}))
print("最终结果:", result)
在实际项目中,如果你用 FastAPI 来搭建后端服务,异步调用几乎是必须的——FastAPI 的路由函数本身就是异步的,用 ainvoke 可以无缝集成,不会阻塞事件循环。
八、总结
最后回顾一下 LangGraph 的核心能力:
| 能力 | 说明 | 关键 API |
|---|---|---|
| 图结构编排 | 节点 + 边定义工作流 | StateGraph, add_node, add_edge |
| 条件分支 | 根据状态动态路由 | add_conditional_edges |
| 状态管理 | 节点间共享和更新数据 | TypedDict / Pydantic 定义状态 |
| Config 配置 | 运行时传入动态参数 | RunnableConfig |
| 记忆持久化 | 跨轮次保持状态 | InMemorySaver, SqliteSaver |
| 人工介入 | 流程暂停等待人工输入 | interrupt, Command(resume=...) |
| 并行执行 | 多个节点同时运行 | add_edge([A, B], C) |
| 子图 | 模块化封装子流程 | 子图实例作为节点添加 |
| 流式输出 | 按节点粒度逐步返回 | graph.stream() |
| 异步调用 | 非阻塞执行 | ainvoke, astream |
LangGraph 本质上是在 LangChain 之上提供了一层图结构的编排能力,让 LLM 应用从"一问一答"进化到"有状态、有分支、有循环、能协作"的复杂智能体。如果你的项目只是简单的问答,LangChain 的 Chain 就够了;但一旦涉及多步决策、状态管理、人工干预等需求,LangGraph 就是更合适的选择。
这篇文章记录了我学习 LangGraph 的过程,涵盖了从基础到进阶的完整内容。如果你也在做大模型应用开发,希望这些笔记能给你一些参考。
更多推荐


所有评论(0)