【LangGraph】其他核⼼能⼒
🌈 个人主页:Zfox_
🔥 系列专栏:LangChain-AI 应用开发框架
一:🔥 运⾏时上下⽂(Runtime context)
🦋 什么是运行上下文?
🎀 上下文定义与分类
上下文(Context)是程序运行时可访问的数据和环境信息。在 LangGraph 中,上下文用于传递:
- 用户身份、配置参数
- 数据库连接、API 密钥
- 会话状态、历史记录等
上下文可按两个维度分类:
| 维度 | 类型 | 描述 | 示例 |
|---|---|---|---|
| 可变性 | 静态上下文 | 运行中不变的数据 | 用户ID、数据库连接 |
| 可变性 | 动态上下文 | 运行中会变化的数据 | 对话记录、中间结果 |
| 生命周期 | 运行时上下文 | 单次运行/线程有效 | 当前请求的临时数据 |
| 生命周期 | 跨会话上下文 | 多次会话持久化 | 用户偏好、历史记录 |
因此,在 LangGraph 中,包含三种上下文:
| 类型 | 可变性 | 生命周期 | 访问方式 |
|---|---|---|---|
| 静态运行时上下文 | 静态 | 单次运行/线程 | context 参数传入 |
| 动态运行时上下文 | 动态 | 单次运行 | 图状态对象 |
| 动态跨会话上下文 | 动态 | 跨会话 | 存储(Store) |
关于我们之前学习过的 checkpoints 和 store,则分别代表动态运行时上下文和动态跨会话上下文。接下来一起来看下静态运行时上下文的使用方式。
🎀 场景练习
做个小练习:根据具体场景,分析各种数据如何保存。例如我们需要开发一个智能旅行规划助手,该助手需要:
- 根据用户的母语提供个性化回答
- 根据用户的会员等级提供不同服务
- 连接到旅游数据库查询信息
- 根据季节推荐不同的活动
- 记住用户的历史查询,提供更精准的建议
这些信息如何保存?
| 数据类型 | 上下文类型 | 为什么 |
|---|---|---|
| 数据库连接 | 静态运行时上下文 | 每次查询都需要,单次运行中不变 |
| 用户语言偏好 | 静态运行时上下文 | 个性化回答,单次运行中不变 |
| 用户会员等级 | 静态运行时上下文 | 服务分级,单次运行中不变 |
| 当前季节 | 静态运行时上下文 | 推荐季节性活动,单次运行中不变 |
| 对话历史 | 动态运行时上下文 | 了解上下文,单次运行中变化 |
| 用户旅行偏好 | 跨会话上下文 | 长期记忆,跨会话 |
通过以上练习,应该能够理解 LangGraph 中运行上下文的概念,并在实际项目中正确使用不同类型的上下文来构建更智能、更个性化的 AI 应用。
记住:**良好的上下文管理是构建复杂、可维护 AI 应用的关键!**正确的上下文设计能让我们的AI应用:
- ✅ 更高效:避免重复传递不变数据
- ✅ 更智能:基于上下文提供个性化服务
- ✅ 更可维护:清晰的数据边界和职责分离
- ✅ 更易扩展:支持多用户、多场景、长期记忆
🦋 配置运行时上下文
🎀 定义上下文模式
首先需要定义一个上下文的数据结构(通常用 dataclass 或 TypedDict)。下面我们演示一个实际示例,在该参数中我们配置了三个参数:用户ID、LLM 和系统消息,以便在运行时使用。
from dataclasses import dataclass
@dataclass
class ContextSchema:
user_id: str
model_provider: str = "openai" # 默认值
system_message: str = "你是一个乐于助人的助手。"
🎀 在图中使用上下文模式
创建图时传入 context_schema 参数。
from langgraph.graph import StateGraph
builder = StateGraph(
State, # 状态模式
context_schema=ContextSchema # 添加上下文模式
)
🎀 在节点中访问上下文
节点函数可通过 runtime 参数访问上下文。
from langgraph.runtime import Runtime
def my_node(state: State, runtime: Runtime[ContextSchema]):
user_id = runtime.context.user_id
model_provider = runtime.context.model_provider
if model_provider == "openai":
# 使用 OpenAI 模型
pass
elif model_provider == "anthropic":
# 使用 Anthropic 模型
pass
return {"result": f"用户{user_id}处理完成"}
🎀 运行图时传入上下文
graph.invoke(
{"input": "Hello"},
context={
"user_id": "user123",
"model_provider": "anthropic",
"system_message": "请用中文回答"
}
)
🎀 【完整示例】
from dataclasses import dataclass
from langgraph.runtime import Runtime
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from langchain.messages import AnyMessage
@dataclass
class AppContext:
user_id: str
language: str = "en"
class AppState(TypedDict):
messages: list[AnyMessage]
user_name: str = ""
def greet_user(state: AppState, runtime: Runtime[AppContext]):
greeting = "Hello" if runtime.context.language == "en" else "你好"
user_name = state.get("user_name", "Guest")
return {"messages": [f"{greeting}, {user_name}!"]}
builder = StateGraph(AppState, context_schema=AppContext)
builder.add_node("greet", greet_user)
builder.add_edge(START, "greet")
builder.add_edge("greet", END)
graph = builder.compile()
# 英文用户
result_en = graph.invoke(
{"user_name": "Alice"}, context={"user_id": "123", "language": "en"}
)
# 输出: Hello, Alice!
# 中文用户
result_zh = graph.invoke(
{"user_name": "张三"}, context={"user_id": "456", "language": "zh"}
)
# 输出: 你好, 张三!
🦋 在工具中访问上下文
工具是调用外部系统、API、数据库交互或执行计算的功能。因此,对于用户身份、配置参数、数据库连接、API 密钥等这类调用 API 的参数信息和配置信息,则需要传递给工具。
上下文对工具的重要性:
- 个性化响应:根据用户上下文提供定制化回答
- 权限控制:基于用户身份限制工具访问
- 状态感知:工具可以根据当前状态决定行为
- 依赖注入:避免硬编码配置,提高可测试性
🎀 基本用法
工具可以通过 ToolRuntime 参数访问运行时信息。这个参数,为工具提供包括:
- State:图状态数据
- Context:静态上下文
- Store:持久化存储等
使用 ToolRuntime 时,只需在工具签名中添加 runtime: ToolRuntime,它会自动注入。调用时,无需手动传输。
定义一个带有运行时信息的工具如下所示:
from langchain.tools import tool, ToolRuntime
@tool
def get_user_info(runtime: ToolRuntime) -> str:
"""获取当前用户的信息"""
user_id = runtime.context.user_id # 访问上下文
user_state = runtime.state["user_name"] # 访问状态
return f"User {user_id}, state: {user_state}"
🎀 【完整示例】
构建一个支持搜索的 AI 系统,假设调用搜索 API 需要用户数据作为参数,则需要向工具中传入相关信息。关键步骤如下:
- 定义状态、上下文结构
- 定义工具节点(ToolNode)、定义 LLM 节点
- 构建并编译图,需加入状态和上下文参数
- 执行并验证结果
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage, HumanMessage
from langchain.tools import tool, ToolRuntime
from langchain.messages import AnyMessage
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
user_name: str = ""
@dataclass
class Context:
user_id: str
@tool
def search(runtime: ToolRuntime[Context]) -> str:
"""调用搜索工具"""
user_id = runtime.context.user_id # 访问上下文
user_name = runtime.state["user_name"] # 访问状态
print(f"日志记录:user_id: {user_id}, user_name: {user_name} 调用查询工具")
return f"查询天气:晴天,15-20度" # 模拟调用
# 绑定工具
model_with_tools = init_chat_model("gpt-4o-mini", temperature=0).bind_tools([search])
def llm_call(state: dict):
"""LLM决定是否调用工具"""
return {
"messages": [
model_with_tools.invoke(
[SystemMessage(content="你是一个乐于助人的助手,支持调用工具进行搜索。")] + state["messages"]
)
]
}
# 定义并编译图
builder = StateGraph(MessagesState, context_schema=Context)
builder.add_node("llm_call", llm_call)
builder.add_node("tool_node", ToolNode([search]))
builder.add_edge(START, "llm_call")
builder.add_conditional_edges(
"llm_call",
tools_condition, {
"tools": "tool_node",
"__end__": END,
},
)
builder.add_edge("tool_node", "llm_call")
graph = builder.compile()
for chunk in graph.stream(
{
"messages": [HumanMessage(content="今天西安的天气如何?")],
"user_name": "小明"
},
context={"user_id": "123"}
):
for node, update in chunk.items():
update["messages"][-1].pretty_print()
打印结果如下:
================================== Ai Message ==================================
Tool Calls:
search (call_YU9O02F9JBdiz3J7CMmUYLNE)
Call ID: call_YU9O02F9JBdiz3J7CMmUYLNE
Args:
日志记录:user_id: 123, user_name:小明调用查询工具
================================= Tool Message =================================
Name: search
查询天气:晴天,15-20度
================================== Ai Message ==================================
今天西安的天气是晴天,气温在15到20度之间。
二:🔥 流(Streaming)
🦋 概念
在LangChain篇章中,我们已经知道了流式传输用来逐步输出数据,无需等待全部处理完成。这可以提升用户体验,减少等待感,尤其适用于大语言模型(LLM)这类延迟较高的任务。就像看电影时,画面一帧帧播放,而不是等全部下载完再看。
在LangGraph篇章中,流式处理可将图运行的实时数据反馈显示到应用程序中,如:状态、LLM生成的文本、自定义数据等。且支持多种流模式。
🦋 五种流模式
LangGraph支持以下五种流模式:
| 模式 | 说明 | 适用场景 |
|---|---|---|
| values | 流式输出完整状态 | 需要知道每一步的完整状态 |
| updates | 流式输出状态变化 | 关注每一步更新了哪些字段 |
| messages | 流式输出LLM生成的token | 实时展示LLM生成内容 |
| custom | 流式输出自定义数据 | 自定义进度条、日志等 |
| debug | 输出所有调试信息 | 开发调试阶段 |
将一种或多种流模式作为列表传递给stream或astream方法是其使用姿势。
🦋 基础示例:流式输出状态值
from langgraph.graph import StateGraph, START
# 定义状态结构
class State(dict):
topic: str
joke: str
# 创建节点函数
def refine_topic(state):
return {"topic": state["topic"] + "和猫"}
def generate_joke(state):
return {"joke": f"这是一个关于{state['topic']}的笑话"}
# 构建图
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.compile()
)
# 流式输出状态更新
for chunk in graph.stream(
{"topic": "冰激凌"}, stream_mode="updates" # 只看更新部分
):
print(chunk)
for chunk in graph.stream(
{"topic": "冰激凌"}, stream_mode="values" # 每一步的完整状态
):
print(chunk)
stream_mode="updates" 时输出:
{'refine_topic': {'topic': '冰激凌和猫'}}
{'generate_joke': {'joke': '这是一个关于冰激凌和猫的笑话'}}
stream_mode="values" 时输出:
{'topic': '冰激凌'}
{'topic': '冰激凌和猫'}
{'topic': '冰激凌和猫', 'joke': '这是一个关于冰激凌和猫的笑话'}
🦋 流式传输自定义数据
LangGraph 不仅支持输出状态这类的数据,还支持从节点或工具中输出用户自定义的数据。步骤如下:
- 使用
get_stream_writer()访问流编写器并发出自定义数据。 - 调用
.stream()或.astream()时设置stream_mode="custom"以获取流中的自定义数据。还可以组合多种模式(如["updates"、"custom"]),但至少必须有一个是"custom"。
🎀 基本用法
2.4.1.1 从节点和工具中输出用户自定义数据
这里改造下【运行时上下文-在工具中访问上下文】的代码:
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage, HumanMessage
from langchain.tools import tool, ToolRuntime
from langchain.messages import AnyMessage
from langgraph.config import get_stream_writer
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
user_name: str = ""
@dataclass
class Context:
user_id: str
@tool
def search(runtime: ToolRuntime[Context]) -> str:
"""调用搜索工具"""
user_id = runtime.context.user_id # 访问上下文
user_name = runtime.state["user_name"] # 访问状态
# 获取流式写入器
writer = get_stream_writer()
# 发送开始信号
writer({
"type": "search_tool",
"status": "start",
"user_id": user_id,
"user_name": user_name
})
# 模拟搜索过程
writer({
"type": "search_tool",
"status": "searching",
"user_id": user_id,
"user_name": user_name
})
# 模拟处理时间
import time
time.sleep(2)
# 结束
writer({
"type": "search_tool",
"status": "end",
"user_id": user_id,
"user_name": user_name
})
return f"查询天气:晴天,15-20度" # 模拟调用
# 绑定工具
model_with_tools = init_chat_model("gpt-4o-mini", temperature=0).bind_tools([search])
def llm_call(state: dict):
"""LLM决定是否调用工具"""
# 获取流式写入器
writer = get_stream_writer()
# 发送开始处理的信号
writer({
"type": "llm_call",
"status": "start", "message": "开始调用LLM", "content": state["messages"][-1].content
})
result = model_with_tools.invoke(
[SystemMessage(content="你是一个乐于助人的助手,支持调用工具进行搜索。")] + state["messages"]
)
# 调用结束
writer({
"type": "llm_call",
"status": "end", "message": "调用LLM完成"
})
return {"messages": [result]}
# 定义并编译图
builder = StateGraph(MessagesState, context_schema=Context)
builder.add_node("llm_call", llm_call)
builder.add_node("tool_node", ToolNode([search]))
builder.add_edge(START, "llm_call")
builder.add_conditional_edges(
"llm_call",
tools_condition, {
"tools": "tool_node",
"__end__": END,
},
)
builder.add_edge("tool_node", "llm_call")
graph = builder.compile()
for chunk in graph.stream(
{
"messages": [HumanMessage(content="今天西安的天气如何?")],
"user_name": "小明"
},
context={"user_id": "123"},
stream_mode=["custom"]
):
print(chunk)
打印结果:
('custom', {'type': 'llm_call', 'status': 'start', 'message': '开始调用LLM', 'content': '今天西安的天气如何?'})
('custom', {'type': 'llm_call', 'status': 'end', 'message': '调用LLM完成'})
('custom', {'type': 'search_tool', 'status': 'start', 'user_id': '123', 'user_name': '小明'})
('custom', {'type': 'search_tool', 'status': 'searching', 'user_id': '123', 'user_name': '小明'})
('custom', {'type': 'search_tool', 'status': 'end', 'user_id': '123', 'user_name': '小明'})
('custom', {'type': 'llm_call', 'status': 'start', 'message': '开始调用LLM', 'content': '查询天气:晴天,15-20度'})
('custom', {'type': 'llm_call', 'status': 'end', 'message': '调用LLM完成'})
2.4.1.2 设置多种传输模式
还可以组合多种模式(如 ["updates"、"custom"]),但至少必须有一个是 "custom"。如下所示:
for chunk in graph.stream(
{
"messages": [HumanMessage(content="今天西安的天气如何?")],
"user_name": "小明"
},
context={"user_id": "123"},
stream_mode=["custom", "updates"]
):
print(chunk)
可以看到 LangGraph 运行将一种或多种流模式作为列表传递给 stream 或 astream。
🎀 应用场景
2.4.2.1 创建自定义监控面板
那么自定义流式数据到底有什么用?让我们改造一下代码。
改造一:将工具调用模拟为多步骤执行
@tool
def search(runtime: ToolRuntime[Context]) -> str:
"""调用搜索工具"""
user_id = runtime.context.user_id
user_name = runtime.state["user_name"]
writer = get_stream_writer()
writer({
"type": "search_tool",
"status": "start",
"user_id": user_id,
"user_name": user_name
})
search_steps = [
{"name": "搜索1", "time": 1, "result": "晴天,"},
{"name": "搜索2", "time": 2, "result": "15-20度"},
]
all_result = "查询天气:"
import time
for i, step in enumerate(search_steps, 1):
writer({
"type": "search_tool",
"status": "searching",
"step": step['name'],
"all_step": len(search_steps),
"cur_step": i,
"user_id": user_id,
"user_name": user_name
})
time.sleep(step['time'])
all_result += step['result']
writer({
"type": "search_tool",
"status": "end",
"user_id": user_id,
"user_name": user_name,
"result": all_result
})
return all_result
改造二:输出结果重定义(这里只处理了工具部分自定义数据流。其它同理自行改造)
# 运行并监控自定义流
print("图开始执行:")
for chunk in graph.stream(
{
"messages": [HumanMessage(content="今天西安的天气如何?")],
"user_name": "小明"
},
context={"user_id": "123"},
stream_mode=["custom", "updates"]
):
if chunk[0] == "custom":
info = chunk[-1]
if info.get("type") == "llm_call":
pass
elif info.get("type") == "search_tool":
status = info.get("status")
if status == "start":
print(f"用户id:{info['user_id']}, 用户名称:{info['user_name']}开始调用工具...")
elif status == "searching":
print(f"[{info['cur_step']}/{info['all_step']}] 正在处理: {info['step']}")
elif status == "end":
print(f"调用完成!结果: {info['result']}")
elif chunk[0] == "updates":
pass
打印结果:
图开始执行:
用户id:123, 用户名称:小明开始调用工具...
[1/2] 正在处理: 搜索1
[2/2] 正在处理: 搜索2
调用完成!结果: 查询天气:晴天,15-20度
可以看到,我们可以通过自定义数据创建自定义的监控面板!这个过程中,还可以计算中间进度([1/2]、[2/2])。
因此,自定义流式数据的用法,可以做到:
- 实时进度反馈:在长时间处理任务中显示进度
- 调试信息输出:输出中间计算结果
- 多源数据整合:同时流式输出不同类型的数据
- 自定义监控:创建自定义的监控面板
🦋 流式传输 LLM tokens
Token 是大语言模型处理文本的基本单位。因此 LangGraph 可以:
- 使用
stream_mode="messages"模式可以从 graph 的任何部分(包括节点、工具等)逐Token 流式传输 LLM 输出。 - 输出格式为
(message_chunk, metadata)元组。
2.5.1 基本用法
from typing import TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
# 定义状态
class State(TypedDict):
input: str
output: str
# 初始化模型
model = ChatOpenAI(model="gpt-4o-mini")
def llm_node(state: State):
"""生成答案的节点"""
return {"output": model.invoke([
{"role": "system", "content": "你是一个乐于助人的助手。"},
{"role": "user", "content": state["input"]}
])
}
# 构建图
builder = StateGraph(State)
builder.add_node(llm_node)
builder.add_edge(START, "llm_node")
graph = builder.compile()
# 流式输出 LLM Tokens
# 输出格式为(message_chunk, metadata) 元组。
for token_chunk, metadata in graph.stream(
{"input": "请解释什么是机器学习?"}, stream_mode="messages"
):
if token_chunk.content:
# 逐 Token 输出
print(token_chunk.content, end="", flush=True)
2.5.2 高级功能
2.5.2.1 按 Tags 过滤 Tokens
我们还可以将 tags 与 LLM 调用相关联,以按 LLM 调用筛选流式令牌。
from typing import TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI
# 初始化带标签的模型
joke_model = ChatOpenAI(
model="gpt-4o-mini", model_kwargs={"tags": ["joke"]} # 给模型添加标签
)
poem_model = ChatOpenAI(
model="gpt-4o-mini", model_kwargs={"tags": ["poem"]} # 给模型添加标签
)
class CreativeState(TypedDict):
topic: str
joke: str
poem: str
def generate_creative_content(state: CreativeState):
"""同时生成笑话和诗歌"""
topic = state["topic"]
print(f"\n生成关于 {topic} 的笑话:")
joke_response = joke_model.invoke([
{"role": "user", "content": f"讲一个关于 {topic} 的笑话"}
])
print(f"\n生成关于 {topic} 的诗歌:")
poem_response = poem_model.invoke([
{"role": "user", "content": f"写一首关于 {topic} 的短诗"}
])
return {
"joke": joke_response.content,
"poem": poem_response.content
}
# 构建图
builder = StateGraph(CreativeState)
builder.add_node("creative", generate_creative_content)
builder.add_edge(START, "creative")
graph = builder.compile()
# 流式输出并过滤
for token_chunk, metadata in graph.stream(
{"topic": "猫"}, stream_mode="messages"
):
# 只输出笑话相关的 Tokens
tags = metadata.get("tags", [])
if "joke" in tags:
print(token_chunk.content, end="", flush=True)
2.5.2.2 按节点名称过滤
可以指定特定节点流式传输Tokens,需按流式传输元数据中的 langgraph_node 字段筛选输出:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")
class State(TypedDict):
query: str
summary: str
translation: str
def generate_summary(state: State):
"""生成摘要"""
response = model.invoke([
{"role": "user", "content": f"请为以下内容生成摘要:{state['query']}"}
])
return {"summary": response.content}
def generate_translation(state: State):
"""生成翻译"""
response = model.invoke([
{"role": "user", "content": f"请将以下内容翻译成英文:{state['query']}"}
])
return {"translation": response.content}
# 构建并行处理图
builder = StateGraph(State)
builder.add_node("summarize", generate_summary)
builder.add_node("translate", generate_translation)
builder.add_edge(START, "summarize")
builder.add_edge(START, "translate")
builder.add_edge("summarize", END)
builder.add_edge("translate", END)
graph = builder.compile()
# 流式输出并只显示某个节点的 Tokens
target_node = "summarize" # 可以改为 "translate"
for token_chunk, metadata in graph.stream(
{"query": "人工智能是计算机科学的一个分支,致力于创造能够执行通常需要人类智能的任务的机器。"},
stream_mode="messages"
):
node_name = metadata.get("langgraph_node", "")
if token_chunk.content and node_name == target_node:
print(f"{token_chunk.content}", end="", flush=True)
三:🔥 子图(Subgraphs)
🦋 什么是子图?
在 LangGraph 中,子图是另一个图中的一个节点,可以独立开发和测试,也可以被多个主图复用。子图可用于:
- 模块化开发:不同团队可以独立开发不同部分
- 代码复用:相同逻辑的图只需开发一次
🦋 使用子图的两种方式
🎀 方式一:从节点调用子图(不同状态模式)
这种方式是从一个图(如主图)的节点内部调用另一个图(如子图)。因此其使用特点是:子图和主图的状态结构可以完全不同。
from typing_extensions import TypedDict
from langgraph.graph.state import StateGraph, START
# 1. 定义子图
class SubState(TypedDict):
# 注意,这些键都不与父图状态共享
sub_1: str
sub_2: str
def sub_node_1(state: SubState):
return {"sub_1": "sub_1"}
def sub_node_2(state: SubState):
return {"sub_2": state["sub_2"] + state["sub_1"]}
sub_builder = StateGraph(SubState)
sub_builder.add_node(sub_node_1)
sub_builder.add_node(sub_node_2)
sub_builder.add_edge(START, "sub_node_1")
sub_builder.add_edge("sub_node_1", "sub_node_2")
subgraph = sub_builder.compile()
# 2. 定义主图
class ParentState(TypedDict):
parent: str
def node_1(state: ParentState):
return {"parent": "hi! " + state["parent"]}
def node_2(state: ParentState):
# 将状态转换为子图状态
response = subgraph.invoke({"sub_2": state["parent"]})
# 将响应转换回父状态
return {"parent": response["sub_2"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
# 要在流式输出中包含子图的输出,可以在父图的 .stream() 方法中设置 subgraphs=True。
for chunk in graph.stream({"parent": "parent"}, subgraphs=True):
print(chunk)
输出结果:
((), {'node_1': {'parent': 'hi! parent'}})
(('node_2:60e75a5e-41d7-8ec4-915d-afb8f8f62c1c',), {'sub_node_1': {'sub_1': 'sub_1'}})
(('node_2:60e75a5e-41d7-8ec4-915d-afb8f8f62c1c',), {'sub_node_2': {'sub_2': 'hi! parentsub_1'}})
((), {'node_2': {'parent': 'hi! parentsub_1'}})
扩展:定义主图、子图、孙子图进行调用。
上述代码中,要在流式输出中包含子图的输出,可以在父图的 .stream() 方法中设置 subgraphs=True。这将从父图和任何子图流式传输输出。
🎀 方式二:将子图作为节点(共享状态模式)
这种方式可以将图添加为另一个图中的节点,如下所示:
# 子图和主图使用相同的状态结构
子图 = 创建子图()
# 直接把子图作为节点加入主图
主图.add_node("子图节点", 子图)
其特点是:子图和主图共享部分状态。
from typing_extensions import TypedDict
from langgraph.graph.state import StateGraph, START
# 1. 定义子图
class SubState(TypedDict):
parent: str # 共享父图状态
sub: str # Sub私有
def sub_node_1(state: SubState):
return {"sub": "sub"}
def sub_node_2(state: SubState):
return {"parent": state["parent"] + state["sub"]}
sub_builder = StateGraph(SubState)
sub_builder.add_node(sub_node_1)
sub_builder.add_node(sub_node_2)
sub_builder.add_edge(START, "sub_node_1")
sub_builder.add_edge("sub_node_1", "sub_node_2")
subgraph = sub_builder.compile()
# 2. 定义主图
class ParentState(TypedDict):
parent: str
def node_1(state: ParentState):
return {"parent": "hi! " + state["parent"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
for chunk in graph.stream({"parent": "parent"}):
print(chunk)
输出结果:
{'node_1': {'parent': 'hi! parent'}}
{'node_2': {'parent': 'hi! parentsub'}}
🦋 为子图添加短期记忆
如果图包含子图,则只需在编译父图时提供 checkpoint。LangGraph 会自动将 checkpoint 传播到子图。
from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict
class State(TypedDict):
foo: str
# 子图
def subgraph_node_1(state: State):
return {"foo": state["foo"] + "bar"}
subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph = subgraph_builder.compile()
# 主图
builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
🦋 在子图中使用中断
🎀 基本用法
在子图中,同样可以使用中断。且添加短期记忆后,可以检查图状态(检查点)。但要注意,只有当子图中断时,才能查看子图状态;恢复后,将无法访问子图形状态。
例如:
from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from typing_extensions import TypedDict
class State(TypedDict):
foo: str
# 子图
def subgraph_node_1(state: State):
print("sub_node_1")
return {}
def subgraph_node_2(state: State):
print("sub_node_2")
value = interrupt("输入值:")
return {"foo": state["foo"] + value}
subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()
# 主图
builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")
graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
parent_state = graph.get_state(config)
# 访问子图状态只能在子图被中断时才可用。
# 一旦恢复了图,将无法访问子图状态。
subgraph_state = graph.get_state(config, subgraphs=True).tasks[0].state
print(subgraph_state)
print(graph.invoke(Command(resume="bar"), config))
结果如下:
sub_node_1
sub_node_2
StateSnapshot(
values={'foo': ''},
next=('subgraph_node_2',),
...
interrupts=(
Interrupt(value='输入值:', id='...'),
),
)
sub_node_2
{'foo': 'bar'}
🎀 恢复时的注意事项
之前讲过,当节点恢复执行时,发起中断的节点会从头再跑一遍。因此,对于中断前的代码,会多重复执行!
而在子图场景下,子图的不同调用方式有不同的执行结果。
3.4.2.1 将子图作为节点时
上面的例子就是将子图作为节点的示例,去掉状态看看调用结果:
sub_node_1
sub_node_2
sub_node_2
{'foo': 'bar'}
由于是 subgraph_node_2 节点发起中断调用,我们可以看到 subgraph_node_2 节点被调用两次,符合预期。
3.4.2.2 节点内调用子图时
但当是节点内调用子图时:
- 父图将从调用子图并触发中断的节点的开头恢复执行。
- 同样,子图也将从调用中断的节点的开头恢复。
修改下代码:
# 主图
def node_1(state: State):
print("node_1")
response = subgraph.invoke({"foo": state["foo"]})
return {"foo": response["foo"]}
builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_edge(START, "node_1")
graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
print(graph.invoke(Command(resume="bar"), config))
打印结果:
node_1
sub_node_1
sub_node_2
node_1 # 主图节点重新执行
sub_node_2 # 子图节点重新执行
{'foo': 'bar'}
可以看到不仅是调用中断的子图节点重新执行,就连调用子图的主图节点也会被重新调用!
四:🔥 共勉
😋 以上就是我对 【LangGraph】其他核⼼能⼒ 的理解, 觉得这篇博客对你有帮助的,可以点赞收藏关注支持一波~ 😉
更多推荐



所有评论(0)