🌈 个人主页:Zfox_
🔥 系列专栏:LangChain-AI 应用开发框架

一:🔥 运⾏时上下⽂(Runtime context)

🦋 什么是运行上下文?

🎀 上下文定义与分类

上下文(Context)是程序运行时可访问的数据和环境信息。在 LangGraph 中,上下文用于传递:

  • 用户身份、配置参数
  • 数据库连接、API 密钥
  • 会话状态、历史记录等

上下文可按两个维度分类:

维度 类型 描述 示例
可变性 静态上下文 运行中不变的数据 用户ID、数据库连接
可变性 动态上下文 运行中会变化的数据 对话记录、中间结果
生命周期 运行时上下文 单次运行/线程有效 当前请求的临时数据
生命周期 跨会话上下文 多次会话持久化 用户偏好、历史记录

因此,在 LangGraph 中,包含三种上下文:

类型 可变性 生命周期 访问方式
静态运行时上下文 静态 单次运行/线程 context 参数传入
动态运行时上下文 动态 单次运行 图状态对象
动态跨会话上下文 动态 跨会话 存储(Store)

关于我们之前学习过的 checkpoints 和 store,则分别代表动态运行时上下文动态跨会话上下文。接下来一起来看下静态运行时上下文的使用方式。

🎀 场景练习

做个小练习:根据具体场景,分析各种数据如何保存。例如我们需要开发一个智能旅行规划助手,该助手需要:

  1. 根据用户的母语提供个性化回答
  2. 根据用户的会员等级提供不同服务
  3. 连接到旅游数据库查询信息
  4. 根据季节推荐不同的活动
  5. 记住用户的历史查询,提供更精准的建议

这些信息如何保存?

数据类型 上下文类型 为什么
数据库连接 静态运行时上下文 每次查询都需要,单次运行中不变
用户语言偏好 静态运行时上下文 个性化回答,单次运行中不变
用户会员等级 静态运行时上下文 服务分级,单次运行中不变
当前季节 静态运行时上下文 推荐季节性活动,单次运行中不变
对话历史 动态运行时上下文 了解上下文,单次运行中变化
用户旅行偏好 跨会话上下文 长期记忆,跨会话

通过以上练习,应该能够理解 LangGraph 中运行上下文的概念,并在实际项目中正确使用不同类型的上下文来构建更智能、更个性化的 AI 应用。

记住:**良好的上下文管理是构建复杂、可维护 AI 应用的关键!**正确的上下文设计能让我们的AI应用:

  • ✅ 更高效:避免重复传递不变数据
  • ✅ 更智能:基于上下文提供个性化服务
  • ✅ 更可维护:清晰的数据边界和职责分离
  • ✅ 更易扩展:支持多用户、多场景、长期记忆

🦋 配置运行时上下文

🎀 定义上下文模式

首先需要定义一个上下文的数据结构(通常用 dataclass 或 TypedDict)。下面我们演示一个实际示例,在该参数中我们配置了三个参数:用户ID、LLM 和系统消息,以便在运行时使用。

from dataclasses import dataclass

@dataclass
class ContextSchema:
    user_id: str
    model_provider: str = "openai"  # 默认值
    system_message: str = "你是一个乐于助人的助手。"

🎀 在图中使用上下文模式

创建图时传入 context_schema 参数。

from langgraph.graph import StateGraph

builder = StateGraph(
    State,  # 状态模式
    context_schema=ContextSchema  # 添加上下文模式
)

🎀 在节点中访问上下文

节点函数可通过 runtime 参数访问上下文。

from langgraph.runtime import Runtime

def my_node(state: State, runtime: Runtime[ContextSchema]):
    user_id = runtime.context.user_id
    model_provider = runtime.context.model_provider
    if model_provider == "openai":
        # 使用 OpenAI 模型
        pass
    elif model_provider == "anthropic":
        # 使用 Anthropic 模型
        pass
    return {"result": f"用户{user_id}处理完成"}

🎀 运行图时传入上下文

graph.invoke(
    {"input": "Hello"},
    context={
        "user_id": "user123",
        "model_provider": "anthropic",
        "system_message": "请用中文回答"
    }
)

🎀 【完整示例】

from dataclasses import dataclass
from langgraph.runtime import Runtime
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from langchain.messages import AnyMessage

@dataclass
class AppContext:
    user_id: str
    language: str = "en"

class AppState(TypedDict):
    messages: list[AnyMessage]
    user_name: str = ""

def greet_user(state: AppState, runtime: Runtime[AppContext]):
    greeting = "Hello" if runtime.context.language == "en" else "你好"
    user_name = state.get("user_name", "Guest")
    return {"messages": [f"{greeting}, {user_name}!"]}

builder = StateGraph(AppState, context_schema=AppContext)
builder.add_node("greet", greet_user)
builder.add_edge(START, "greet")
builder.add_edge("greet", END)
graph = builder.compile()

# 英文用户
result_en = graph.invoke(
    {"user_name": "Alice"}, context={"user_id": "123", "language": "en"}
)
# 输出: Hello, Alice!

# 中文用户
result_zh = graph.invoke(
    {"user_name": "张三"}, context={"user_id": "456", "language": "zh"}
)
# 输出: 你好, 张三!

🦋 在工具中访问上下文

工具是调用外部系统、API、数据库交互或执行计算的功能。因此,对于用户身份、配置参数、数据库连接、API 密钥等这类调用 API 的参数信息和配置信息,则需要传递给工具。

上下文对工具的重要性:

  • 个性化响应:根据用户上下文提供定制化回答
  • 权限控制:基于用户身份限制工具访问
  • 状态感知:工具可以根据当前状态决定行为
  • 依赖注入:避免硬编码配置,提高可测试性

🎀 基本用法

工具可以通过 ToolRuntime 参数访问运行时信息。这个参数,为工具提供包括:

  • State:图状态数据
  • Context:静态上下文
  • Store:持久化存储等

使用 ToolRuntime 时,只需在工具签名中添加 runtime: ToolRuntime,它会自动注入。调用时,无需手动传输。

定义一个带有运行时信息的工具如下所示:

from langchain.tools import tool, ToolRuntime

@tool
def get_user_info(runtime: ToolRuntime) -> str:
    """获取当前用户的信息"""
    user_id = runtime.context.user_id  # 访问上下文
    user_state = runtime.state["user_name"]  # 访问状态
    return f"User {user_id}, state: {user_state}"

🎀 【完整示例】

构建一个支持搜索的 AI 系统,假设调用搜索 API 需要用户数据作为参数,则需要向工具中传入相关信息。关键步骤如下:

  1. 定义状态、上下文结构
  2. 定义工具节点(ToolNode)、定义 LLM 节点
  3. 构建并编译图,需加入状态和上下文参数
  4. 执行并验证结果
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage, HumanMessage
from langchain.tools import tool, ToolRuntime
from langchain.messages import AnyMessage
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]
    user_name: str = ""

@dataclass
class Context:
    user_id: str

@tool
def search(runtime: ToolRuntime[Context]) -> str:
    """调用搜索工具"""
    user_id = runtime.context.user_id  # 访问上下文
    user_name = runtime.state["user_name"]  # 访问状态
    print(f"日志记录:user_id: {user_id}, user_name: {user_name} 调用查询工具")
    return f"查询天气:晴天,15-20度"  # 模拟调用

# 绑定工具
model_with_tools = init_chat_model("gpt-4o-mini", temperature=0).bind_tools([search])

def llm_call(state: dict):
    """LLM决定是否调用工具"""
    return {
        "messages": [
            model_with_tools.invoke(
                [SystemMessage(content="你是一个乐于助人的助手,支持调用工具进行搜索。")] + state["messages"]
            )
        ]
    }

# 定义并编译图
builder = StateGraph(MessagesState, context_schema=Context)
builder.add_node("llm_call", llm_call)
builder.add_node("tool_node", ToolNode([search]))
builder.add_edge(START, "llm_call")
builder.add_conditional_edges(
    "llm_call",
    tools_condition, {
        "tools": "tool_node",
        "__end__": END,
    },
)
builder.add_edge("tool_node", "llm_call")
graph = builder.compile()

for chunk in graph.stream(
    {
        "messages": [HumanMessage(content="今天西安的天气如何?")],
        "user_name": "小明"
    },
    context={"user_id": "123"}
):
    for node, update in chunk.items():
        update["messages"][-1].pretty_print()

打印结果如下:

================================== Ai Message ==================================
Tool Calls:
  search (call_YU9O02F9JBdiz3J7CMmUYLNE)
  Call ID: call_YU9O02F9JBdiz3J7CMmUYLNE
  Args:
  日志记录:user_id: 123, user_name:小明调用查询工具
================================= Tool Message =================================
Name: search
查询天气:晴天,15-20度
================================== Ai Message ==================================
今天西安的天气是晴天,气温在15到20度之间。

二:🔥 流(Streaming)

🦋 概念

在LangChain篇章中,我们已经知道了流式传输用来逐步输出数据,无需等待全部处理完成。这可以提升用户体验,减少等待感,尤其适用于大语言模型(LLM)这类延迟较高的任务。就像看电影时,画面一帧帧播放,而不是等全部下载完再看。

在LangGraph篇章中,流式处理可将图运行的实时数据反馈显示到应用程序中,如:状态、LLM生成的文本、自定义数据等。且支持多种流模式。

🦋 五种流模式

LangGraph支持以下五种流模式:

模式 说明 适用场景
values 流式输出完整状态 需要知道每一步的完整状态
updates 流式输出状态变化 关注每一步更新了哪些字段
messages 流式输出LLM生成的token 实时展示LLM生成内容
custom 流式输出自定义数据 自定义进度条、日志等
debug 输出所有调试信息 开发调试阶段

将一种或多种流模式作为列表传递给stream或astream方法是其使用姿势。

🦋 基础示例:流式输出状态值

from langgraph.graph import StateGraph, START

# 定义状态结构
class State(dict):
    topic: str
    joke: str

# 创建节点函数
def refine_topic(state):
    return {"topic": state["topic"] + "和猫"}

def generate_joke(state):
    return {"joke": f"这是一个关于{state['topic']}的笑话"}

# 构建图
graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

# 流式输出状态更新
for chunk in graph.stream(
    {"topic": "冰激凌"}, stream_mode="updates"  # 只看更新部分
):
    print(chunk)

for chunk in graph.stream(
    {"topic": "冰激凌"}, stream_mode="values"  # 每一步的完整状态
):
    print(chunk)

stream_mode="updates" 时输出:

{'refine_topic': {'topic': '冰激凌和猫'}}
{'generate_joke': {'joke': '这是一个关于冰激凌和猫的笑话'}}

stream_mode="values" 时输出:

{'topic': '冰激凌'}
{'topic': '冰激凌和猫'}
{'topic': '冰激凌和猫', 'joke': '这是一个关于冰激凌和猫的笑话'}

🦋 流式传输自定义数据

LangGraph 不仅支持输出状态这类的数据,还支持从节点或工具中输出用户自定义的数据。步骤如下:

  • 使用 get_stream_writer() 访问流编写器并发出自定义数据。
  • 调用 .stream().astream() 时设置 stream_mode="custom" 以获取流中的自定义数据。还可以组合多种模式(如 ["updates"、"custom"]),但至少必须有一个是 "custom"

🎀 基本用法

2.4.1.1 从节点和工具中输出用户自定义数据

这里改造下【运行时上下文-在工具中访问上下文】的代码:

from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage, HumanMessage
from langchain.tools import tool, ToolRuntime
from langchain.messages import AnyMessage
from langgraph.config import get_stream_writer
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]
    user_name: str = ""

@dataclass
class Context:
    user_id: str

@tool
def search(runtime: ToolRuntime[Context]) -> str:
    """调用搜索工具"""
    user_id = runtime.context.user_id  # 访问上下文
    user_name = runtime.state["user_name"]  # 访问状态
    # 获取流式写入器
    writer = get_stream_writer()
    # 发送开始信号
    writer({
        "type": "search_tool",
        "status": "start",
        "user_id": user_id,
        "user_name": user_name
    })
    # 模拟搜索过程
    writer({
        "type": "search_tool",
        "status": "searching",
        "user_id": user_id,
        "user_name": user_name
    })
    # 模拟处理时间
    import time
    time.sleep(2)
    # 结束
    writer({
        "type": "search_tool",
        "status": "end",
        "user_id": user_id,
        "user_name": user_name
    })
    return f"查询天气:晴天,15-20度"  # 模拟调用

# 绑定工具
model_with_tools = init_chat_model("gpt-4o-mini", temperature=0).bind_tools([search])

def llm_call(state: dict):
    """LLM决定是否调用工具"""
    # 获取流式写入器
    writer = get_stream_writer()
    # 发送开始处理的信号
    writer({
        "type": "llm_call",
        "status": "start", "message": "开始调用LLM", "content": state["messages"][-1].content
    })
    result = model_with_tools.invoke(
        [SystemMessage(content="你是一个乐于助人的助手,支持调用工具进行搜索。")] + state["messages"]
    )
    # 调用结束
    writer({
        "type": "llm_call",
        "status": "end", "message": "调用LLM完成"
    })
    return {"messages": [result]}

# 定义并编译图
builder = StateGraph(MessagesState, context_schema=Context)
builder.add_node("llm_call", llm_call)
builder.add_node("tool_node", ToolNode([search]))
builder.add_edge(START, "llm_call")
builder.add_conditional_edges(
    "llm_call",
    tools_condition, {
        "tools": "tool_node",
        "__end__": END,
    },
)
builder.add_edge("tool_node", "llm_call")
graph = builder.compile()

for chunk in graph.stream(
    {
        "messages": [HumanMessage(content="今天西安的天气如何?")],
        "user_name": "小明"
    },
    context={"user_id": "123"},
    stream_mode=["custom"]
):
    print(chunk)

打印结果:

('custom', {'type': 'llm_call', 'status': 'start', 'message': '开始调用LLM', 'content': '今天西安的天气如何?'})
('custom', {'type': 'llm_call', 'status': 'end', 'message': '调用LLM完成'})
('custom', {'type': 'search_tool', 'status': 'start', 'user_id': '123', 'user_name': '小明'})
('custom', {'type': 'search_tool', 'status': 'searching', 'user_id': '123', 'user_name': '小明'})
('custom', {'type': 'search_tool', 'status': 'end', 'user_id': '123', 'user_name': '小明'})
('custom', {'type': 'llm_call', 'status': 'start', 'message': '开始调用LLM', 'content': '查询天气:晴天,15-20度'})
('custom', {'type': 'llm_call', 'status': 'end', 'message': '调用LLM完成'})
2.4.1.2 设置多种传输模式

还可以组合多种模式(如 ["updates"、"custom"]),但至少必须有一个是 "custom"。如下所示:

for chunk in graph.stream(
    {
        "messages": [HumanMessage(content="今天西安的天气如何?")],
        "user_name": "小明"
    },
    context={"user_id": "123"},
    stream_mode=["custom", "updates"]
):
    print(chunk)

可以看到 LangGraph 运行将一种或多种流模式作为列表传递给 stream 或 astream。

🎀 应用场景

2.4.2.1 创建自定义监控面板

那么自定义流式数据到底有什么用?让我们改造一下代码。

改造一:将工具调用模拟为多步骤执行

@tool
def search(runtime: ToolRuntime[Context]) -> str:
    """调用搜索工具"""
    user_id = runtime.context.user_id
    user_name = runtime.state["user_name"]
    writer = get_stream_writer()
    writer({
        "type": "search_tool",
        "status": "start",
        "user_id": user_id,
        "user_name": user_name
    })
    search_steps = [
        {"name": "搜索1", "time": 1, "result": "晴天,"},
        {"name": "搜索2", "time": 2, "result": "15-20度"},
    ]
    all_result = "查询天气:"
    import time
    for i, step in enumerate(search_steps, 1):
        writer({
            "type": "search_tool",
            "status": "searching",
            "step": step['name'],
            "all_step": len(search_steps),
            "cur_step": i,
            "user_id": user_id,
            "user_name": user_name
        })
        time.sleep(step['time'])
        all_result += step['result']
    writer({
        "type": "search_tool",
        "status": "end",
        "user_id": user_id,
        "user_name": user_name,
        "result": all_result
    })
    return all_result

改造二:输出结果重定义(这里只处理了工具部分自定义数据流。其它同理自行改造)

# 运行并监控自定义流
print("图开始执行:")
for chunk in graph.stream(
    {
        "messages": [HumanMessage(content="今天西安的天气如何?")],
        "user_name": "小明"
    },
    context={"user_id": "123"},
    stream_mode=["custom", "updates"]
):
    if chunk[0] == "custom":
        info = chunk[-1]
        if info.get("type") == "llm_call":
            pass
        elif info.get("type") == "search_tool":
            status = info.get("status")
            if status == "start":
                print(f"用户id:{info['user_id']}, 用户名称:{info['user_name']}开始调用工具...")
            elif status == "searching":
                print(f"[{info['cur_step']}/{info['all_step']}] 正在处理: {info['step']}")
            elif status == "end":
                print(f"调用完成!结果: {info['result']}")
    elif chunk[0] == "updates":
        pass

打印结果:

图开始执行:
用户id:123, 用户名称:小明开始调用工具...
[1/2] 正在处理: 搜索1
[2/2] 正在处理: 搜索2
调用完成!结果: 查询天气:晴天,15-20度

可以看到,我们可以通过自定义数据创建自定义的监控面板!这个过程中,还可以计算中间进度([1/2][2/2])。

因此,自定义流式数据的用法,可以做到:

  1. 实时进度反馈:在长时间处理任务中显示进度
  2. 调试信息输出:输出中间计算结果
  3. 多源数据整合:同时流式输出不同类型的数据
  4. 自定义监控:创建自定义的监控面板

🦋 流式传输 LLM tokens

Token 是大语言模型处理文本的基本单位。因此 LangGraph 可以:

  • 使用 stream_mode="messages" 模式可以从 graph 的任何部分(包括节点、工具等)逐Token 流式传输 LLM 输出。
  • 输出格式为 (message_chunk, metadata) 元组。
2.5.1 基本用法
from typing import TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

# 定义状态
class State(TypedDict):
    input: str
    output: str

# 初始化模型
model = ChatOpenAI(model="gpt-4o-mini")

def llm_node(state: State):
    """生成答案的节点"""
    return {"output": model.invoke([
        {"role": "system", "content": "你是一个乐于助人的助手。"},
        {"role": "user", "content": state["input"]}
    ])
    }

# 构建图
builder = StateGraph(State)
builder.add_node(llm_node)
builder.add_edge(START, "llm_node")
graph = builder.compile()

# 流式输出 LLM Tokens
# 输出格式为(message_chunk, metadata) 元组。
for token_chunk, metadata in graph.stream(
    {"input": "请解释什么是机器学习?"}, stream_mode="messages"
):
    if token_chunk.content:
        # 逐 Token 输出
        print(token_chunk.content, end="", flush=True)
2.5.2 高级功能
2.5.2.1 按 Tags 过滤 Tokens

我们还可以将 tags 与 LLM 调用相关联,以按 LLM 调用筛选流式令牌。

from typing import TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

# 初始化带标签的模型
joke_model = ChatOpenAI(
    model="gpt-4o-mini", model_kwargs={"tags": ["joke"]}  # 给模型添加标签
)
poem_model = ChatOpenAI(
    model="gpt-4o-mini", model_kwargs={"tags": ["poem"]}  # 给模型添加标签
)

class CreativeState(TypedDict):
    topic: str
    joke: str
    poem: str

def generate_creative_content(state: CreativeState):
    """同时生成笑话和诗歌"""
    topic = state["topic"]
    print(f"\n生成关于 {topic} 的笑话:")
    joke_response = joke_model.invoke([
        {"role": "user", "content": f"讲一个关于 {topic} 的笑话"}
    ])
    print(f"\n生成关于 {topic} 的诗歌:")
    poem_response = poem_model.invoke([
        {"role": "user", "content": f"写一首关于 {topic} 的短诗"}
    ])
    return {
        "joke": joke_response.content,
        "poem": poem_response.content
    }

# 构建图
builder = StateGraph(CreativeState)
builder.add_node("creative", generate_creative_content)
builder.add_edge(START, "creative")
graph = builder.compile()

# 流式输出并过滤
for token_chunk, metadata in graph.stream(
    {"topic": "猫"}, stream_mode="messages"
):
    # 只输出笑话相关的 Tokens
    tags = metadata.get("tags", [])
    if "joke" in tags:
        print(token_chunk.content, end="", flush=True)
2.5.2.2 按节点名称过滤

可以指定特定节点流式传输Tokens,需按流式传输元数据中的 langgraph_node 字段筛选输出:

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")

class State(TypedDict):
    query: str
    summary: str
    translation: str

def generate_summary(state: State):
    """生成摘要"""
    response = model.invoke([
        {"role": "user", "content": f"请为以下内容生成摘要:{state['query']}"}
    ])
    return {"summary": response.content}

def generate_translation(state: State):
    """生成翻译"""
    response = model.invoke([
        {"role": "user", "content": f"请将以下内容翻译成英文:{state['query']}"}
    ])
    return {"translation": response.content}

# 构建并行处理图
builder = StateGraph(State)
builder.add_node("summarize", generate_summary)
builder.add_node("translate", generate_translation)
builder.add_edge(START, "summarize")
builder.add_edge(START, "translate")
builder.add_edge("summarize", END)
builder.add_edge("translate", END)
graph = builder.compile()

# 流式输出并只显示某个节点的 Tokens
target_node = "summarize"  # 可以改为 "translate"
for token_chunk, metadata in graph.stream(
    {"query": "人工智能是计算机科学的一个分支,致力于创造能够执行通常需要人类智能的任务的机器。"},
    stream_mode="messages"
):
    node_name = metadata.get("langgraph_node", "")
    if token_chunk.content and node_name == target_node:
        print(f"{token_chunk.content}", end="", flush=True)

三:🔥 子图(Subgraphs)

🦋 什么是子图?

在 LangGraph 中,子图是另一个图中的一个节点,可以独立开发和测试,也可以被多个主图复用。子图可用于:

  • 模块化开发:不同团队可以独立开发不同部分
  • 代码复用:相同逻辑的图只需开发一次

🦋 使用子图的两种方式

🎀 方式一:从节点调用子图(不同状态模式)

这种方式是从一个图(如主图)的节点内部调用另一个图(如子图)。因此其使用特点是:子图和主图的状态结构可以完全不同。

from typing_extensions import TypedDict
from langgraph.graph.state import StateGraph, START

# 1. 定义子图
class SubState(TypedDict):
    # 注意,这些键都不与父图状态共享
    sub_1: str
    sub_2: str

def sub_node_1(state: SubState):
    return {"sub_1": "sub_1"}

def sub_node_2(state: SubState):
    return {"sub_2": state["sub_2"] + state["sub_1"]}

sub_builder = StateGraph(SubState)
sub_builder.add_node(sub_node_1)
sub_builder.add_node(sub_node_2)
sub_builder.add_edge(START, "sub_node_1")
sub_builder.add_edge("sub_node_1", "sub_node_2")
subgraph = sub_builder.compile()

# 2. 定义主图
class ParentState(TypedDict):
    parent: str

def node_1(state: ParentState):
    return {"parent": "hi! " + state["parent"]}

def node_2(state: ParentState):
    # 将状态转换为子图状态
    response = subgraph.invoke({"sub_2": state["parent"]})
    # 将响应转换回父状态
    return {"parent": response["sub_2"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

# 要在流式输出中包含子图的输出,可以在父图的 .stream() 方法中设置 subgraphs=True。
for chunk in graph.stream({"parent": "parent"}, subgraphs=True):
    print(chunk)

输出结果:

((), {'node_1': {'parent': 'hi! parent'}})
(('node_2:60e75a5e-41d7-8ec4-915d-afb8f8f62c1c',), {'sub_node_1': {'sub_1': 'sub_1'}})
(('node_2:60e75a5e-41d7-8ec4-915d-afb8f8f62c1c',), {'sub_node_2': {'sub_2': 'hi! parentsub_1'}})
((), {'node_2': {'parent': 'hi! parentsub_1'}})

扩展:定义主图、子图、孙子图进行调用。

上述代码中,要在流式输出中包含子图的输出,可以在父图的 .stream() 方法中设置 subgraphs=True。这将从父图和任何子图流式传输输出。

🎀 方式二:将子图作为节点(共享状态模式)

这种方式可以将图添加为另一个图中的节点,如下所示:

# 子图和主图使用相同的状态结构
子图 = 创建子图()
# 直接把子图作为节点加入主图
主图.add_node("子图节点", 子图)

其特点是:子图和主图共享部分状态。

from typing_extensions import TypedDict
from langgraph.graph.state import StateGraph, START

# 1. 定义子图
class SubState(TypedDict):
    parent: str  # 共享父图状态
    sub: str     # Sub私有

def sub_node_1(state: SubState):
    return {"sub": "sub"}

def sub_node_2(state: SubState):
    return {"parent": state["parent"] + state["sub"]}

sub_builder = StateGraph(SubState)
sub_builder.add_node(sub_node_1)
sub_builder.add_node(sub_node_2)
sub_builder.add_edge(START, "sub_node_1")
sub_builder.add_edge("sub_node_1", "sub_node_2")
subgraph = sub_builder.compile()

# 2. 定义主图
class ParentState(TypedDict):
    parent: str

def node_1(state: ParentState):
    return {"parent": "hi! " + state["parent"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

for chunk in graph.stream({"parent": "parent"}):
    print(chunk)

输出结果:

{'node_1': {'parent': 'hi! parent'}}
{'node_2': {'parent': 'hi! parentsub'}}

🦋 为子图添加短期记忆

如果图包含子图,则只需在编译父图时提供 checkpoint。LangGraph 会自动将 checkpoint 传播到子图。

from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict

class State(TypedDict):
    foo: str

# 子图
def subgraph_node_1(state: State):
    return {"foo": state["foo"] + "bar"}

subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph = subgraph_builder.compile()

# 主图
builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

🦋 在子图中使用中断

🎀 基本用法

在子图中,同样可以使用中断。且添加短期记忆后,可以检查图状态(检查点)。但要注意,只有当子图中断时,才能查看子图状态;恢复后,将无法访问子图形状态。

例如:

from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from typing_extensions import TypedDict

class State(TypedDict):
    foo: str

# 子图
def subgraph_node_1(state: State):
    print("sub_node_1")
    return {}

def subgraph_node_2(state: State):
    print("sub_node_2")
    value = interrupt("输入值:")
    return {"foo": state["foo"] + value}

subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# 主图
builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")
graph = builder.compile(checkpointer=InMemorySaver())

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

parent_state = graph.get_state(config)
# 访问子图状态只能在子图被中断时才可用。
# 一旦恢复了图,将无法访问子图状态。
subgraph_state = graph.get_state(config, subgraphs=True).tasks[0].state
print(subgraph_state)

print(graph.invoke(Command(resume="bar"), config))

结果如下:

sub_node_1
sub_node_2
StateSnapshot(
    values={'foo': ''},
    next=('subgraph_node_2',),
    ...
    interrupts=(
        Interrupt(value='输入值:', id='...'),
    ),
)
sub_node_2
{'foo': 'bar'}

🎀 恢复时的注意事项

之前讲过,当节点恢复执行时,发起中断的节点会从头再跑一遍。因此,对于中断前的代码,会多重复执行!

而在子图场景下,子图的不同调用方式有不同的执行结果。

3.4.2.1 将子图作为节点时

上面的例子就是将子图作为节点的示例,去掉状态看看调用结果:

sub_node_1
sub_node_2
sub_node_2
{'foo': 'bar'}

由于是 subgraph_node_2 节点发起中断调用,我们可以看到 subgraph_node_2 节点被调用两次,符合预期。

3.4.2.2 节点内调用子图时

但当是节点内调用子图时:

  • 父图将从调用子图并触发中断的节点的开头恢复执行。
  • 同样,子图也将从调用中断的节点的开头恢复。

修改下代码:

# 主图
def node_1(state: State):
    print("node_1")
    response = subgraph.invoke({"foo": state["foo"]})
    return {"foo": response["foo"]}

builder = StateGraph(State)
builder.add_node("node_1", node_1)
builder.add_edge(START, "node_1")
graph = builder.compile(checkpointer=InMemorySaver())

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)
print(graph.invoke(Command(resume="bar"), config))

打印结果:

node_1
sub_node_1
sub_node_2
node_1        # 主图节点重新执行
sub_node_2    # 子图节点重新执行
{'foo': 'bar'}

可以看到不仅是调用中断的子图节点重新执行,就连调用子图的主图节点也会被重新调用!

四:🔥 共勉

😋 以上就是我对 【LangGraph】其他核⼼能⼒ 的理解, 觉得这篇博客对你有帮助的,可以点赞收藏关注支持一波~ 😉
在这里插入图片描述

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐