什么是LangGraph?

1. LangGraph 与 传统 LangChain

技术定义: LangChain 主要用于构建线性的工作流(DAG,有向无环图),数据从头传到尾。而 LangGraph 是 LangChain 的扩展框架,专门用于构建包含“循环(Cycles)”和“状态记忆(Stateful)”的多智能体(Multi-Agent)应用。

简单举例: 传统的 LangChain 像是一条单向流水线,原材料进去,经过几道工序,成品出来,不能回头。LangGraph 像是一个带环岛的交通网络或者一个带有反馈机制的办公室。员工(大模型)在遇到无法解答的问题时,可以去查资料(调用工具),查完资料后带着结果重新循环回到大脑中进行二次思考,直到问题真正解决才输出最终答案。

# 传统 LangChain (单向 Pipeline,无法循环)
chain = prompt | llm | output_parser

# LangGraph (基于状态图,支持复杂循环)
builder = StateGraph(AgentState)
# ... 添加节点与边 ...
graph = builder.compile()

2. State (状态)

技术定义: 图中所有节点共享的数据结构(通常是 Python 的 TypedDict 或 Pydantic 模型)。每次节点执行完毕后,都会返回一个更新,LangGraph 会将这个更新合并到全局状态中,传递给下一个节点。

简单举例: State 就像是会议室桌子上的一块公共白板。所有的参会者(节点)都能看到白板上的历史讨论记录(上下文对话),当轮到某个人发言或工作时,他会把自己的新发现写在白板上,供下一个人参考。

from typing import TypedDict

# 定义这块“公共白板”上能写哪些字段
class AgentState(TypedDict):
    input_query: str          # 用户的初始问题
    search_results: list[str] # 存放查找到的资料
    final_answer: str         # 最终输出的答案

3. Node (节点)

技术定义: 图中的处理单元,本质上就是接收 State、执行特定逻辑(如调用 LLM 或执行 Python 代码),然后返回 State 更新的普通 Python 函数。

简单举例: 节点就是公司里的具体岗位员工。比如“研究员节点”负责去搜索引擎找资料,“总结员节点”负责把资料写成报告。他们只负责自己份内的工作。

# 一个典型的节点函数(员工)
def researcher_node(state: AgentState):
    query = state["input_query"]
    # 模拟执行外部搜索
    results = [f"关于 {query} 的搜索结果A", "结果B"]
    
    # 只需要返回需要更新到白板上的增量数据
    return {"search_results": results}

4. Edge (边) 与 Conditional Edge (条件边)

技术定义: 边定义了节点之间的执行顺序。条件边(Conditional Edge)则是一个路由函数,它根据当前的 State 动态决定下一步该走向哪个节点。

简单举例: 边就像是公司的汇报流程图。而“条件边”则是项目经理,他会根据当前白板(State)上的情况做判断:“如果资料查全了,就发给总结员(走向结束);如果资料不够,就打回去让研究员继续查(形成循环)。”

# 路由函数(项目经理)
def route_logic(state: AgentState):
    if len(state.get("search_results", [])) > 0:
        return "summarizer" # 资料够了,去写总结
    return "researcher"     # 资料不够,打回去继续查

# 普通边:researcher 节点做完后,强制走到 router_node
builder.add_edge("researcher", "router_node")

# 条件边:根据 route_logic 的返回值动态决定去向
builder.add_conditional_edges("router_node", route_logic)

5. Reducer (聚合器)

技术定义: 定义在 State 字段上的数据合并逻辑。默认情况下,节点返回的数据会覆盖全局 State 中的同名字段;如果配置了 Reducer,LangGraph 会调用该函数,将新数据合并或追加到旧数据上。

简单举例: 如果没有 Reducer,会议白板上的字会被下一个人直接擦掉重写;配置了 Reducer,相当于在白板上划分了专门的“留言区”,新来的内容会排在旧留言下面,保留完整历史。

import operator
from typing import Annotated, TypedDict

class AgentState(TypedDict):
    # 没有 Reducer:状态被直接覆盖 (擦掉重写)
    current_status: str 
    
    # 有 Reducer (operator.add):新列表会追加到老列表后面 (保留历史)
    messages: Annotated[list[str], operator.add]

6. Checkpointer (检查点)

技术定义: LangGraph 的状态持久层组件。它会在图的每一步执行完毕后,将完整的 State 拍一个快照(Snapshot)保存下来。这是实现时光倒流 (Time Travel) 和人类介入 (Human-in-the-loop) 的基石。

简单举例: 就像单机游戏的“自动存档”功能。如果打 Boss(调用工具)时报错了,你可以直接读档回到打 Boss 前的状态重试。

from langgraph.checkpoint.memory import MemorySaver

# 1. 初始化保存器 (也支持 SQLite/Postgres 等数据库)
memory = MemorySaver()

# 2. 编译时注入保存器
app = builder.compile(checkpointer=memory)

# 3. 运行时必须指定线程 ID,用于隔离不同用户的“游戏存档”
config = {"configurable": {"thread_id": "user_123_session_1"}}
app.invoke({"input_query": "你好"}, config=config)

7. Subgraph (子图)

技术定义: 将一个已经编译好的 StateGraph 实例作为一个普通的 Node,无缝嵌入到另一个更大的图(父图)中。专门用于降低复杂 Agent 系统的代码耦合度。

简单举例: 就像大公司里的“部门外包”。父图是总公司,遇到复杂的财务问题时,直接把任务扔给“财务部(子图)”。财务部内部有自己的一套流转节点,搞定后直接把最终结果交回给总公司。

# 1. 编译子图 (财务部门)
finance_graph = StateGraph(FinanceState)
# ... 给财务部添加主管、会计节点 ...
finance_app = finance_graph.compile()

# 2. 将子图作为普通节点,无缝接入父图 (总公司)
main_graph = StateGraph(MainState)
main_graph.add_node("finance_department", finance_app)

8. Send API (并行分发)

技术定义: LangGraph 提供的用于实现动态并行执行的核心 API。在条件边中返回 Send 对象,LangGraph 会同时拉起多个相同的节点并发处理不同数据,最后汇总结果。

简单举例: 就像老师批改 50 份期末试卷。老师不自己挨个批改(串行),而是把试卷同时分发给 5 个助教(并行 Send 拉起 5 个节点),改完后再将分数统一汇总给老师。

from langgraph.constants import Send

def distribute_grading(state: GlobalState):
    papers = state["exam_papers"]
    # 动态返回多个 Send 对象,并行拉起多个 grader_node
    return [Send("grader_node", {"paper_content": p}) for p in papers]

# 起点直接触发分发逻辑
builder.add_conditional_edges("start", distribute_grading)

9. Command (控制指令)

技术定义: 一种在 Node 内部直接返回的特殊数据类。它允许节点在更新 State 的同时,动态指定下一步要跳转的目标节点,将“状态更新”和“路由跳转”合二为一。

简单举例: 以前员工干完活(节点),要把结果写在白板上等经理(条件边)决定下一步。现在员工自带分配权(Command),干完活不仅更新白板,还能直接指定“转交给谁干”。

from langgraph.types import Command

def smart_worker_node(state: AgentState):
    # 干完活了
    result = "数据分析报告已生成"
    
    # 以前只能 return {"final_answer": result},然后靠外部判断
    # 现在直接用 Command 决定去向:更新状态并强制跳到 manager_node
    return Command(
        update={"final_answer": result},
        goto="manager_node"
    )

LangGraph的基本使用

第一步:环境安装与导入

安装 LangGraph 和基础依赖环境。

# 安装基础包
# pip install -U langgraph langchain-openai
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

第二步:定义全局状态 (State)

我们需要定义图在流转过程中传递的数据结构,最典型的用法是保存对话历史记录。

# 定义状态字典,Annotated[list, add_messages] 引入了 Reducer 聚合器
# 告诉框架:新消息要追加到列表中,而不是覆盖
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]

第三步:定义节点 (Nodes)

创建具体的处理函数。这里模拟两个节点:思考的大模型和外部工具。

# 1. 模拟大模型节点:根据当前消息思考
def call_llm(state: AgentState):
    messages = state['messages']
    last_message = messages[-1]
    
    if "天气" in last_message:
        return {"messages": ["CALL_TOOL: weather_api"]}
    else:
        return {"messages": ["我是一个AI,我已经为你解答了问题。"]}

# 2. 模拟工具节点:执行具体的外部查询
def execute_tool(state: AgentState):
    return {"messages": ["工具执行结果:今天天气晴朗,气温25度。"]}

第四步:定义条件路由 (Conditional Edge)

写一个逻辑函数,决定大模型思考完之后,是直接结束,还是去调用工具。

# 路由逻辑:判断下一步去哪里
def should_continue(state: AgentState):
    last_message = state['messages'][-1]
    
    if "CALL_TOOL" in last_message:
        return "continue_to_tool"
    return "end_conversation"

第五步:构建并编译计算图 (Graph)

将状态、节点和边像拼图一样组装起来。

workflow = StateGraph(AgentState)

workflow.add_node("agent", call_llm)
workflow.add_node("tool", execute_tool)

workflow.add_edge(START, "agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue_to_tool": "tool",
        "end_conversation": END
    }
)

workflow.add_edge("tool", "agent")

app = workflow.compile()

第六步:运行图

向图中输入初始状态,观察数据的流转。

# 测试 1:不需要工具的普通对话 (START -> agent -> END)
inputs = {"messages": ["你好,请介绍一下你自己。"]}
for output in app.stream(inputs):
    print("当前节点输出:", output)

print("---")

# 测试 2:触发工具调用的复杂对话 (START -> agent -> tool -> agent -> END)
inputs = {"messages": ["北京今天的天气怎么样?"]}
for output in app.stream(inputs):
    print("当前节点输出:", output)

第七步:使用 Send 实现并行处理 (Map-Reduce 进阶)

在需要并行查询多个数据源时使用。

from langgraph.constants import Send
import operator

# 状态定义:使用 operator.add 作为 Reducer 来累加多个并行节点的结果
class ParallelState(TypedDict):
    topics: list[str]
    reports: Annotated[list[str], operator.add] 

def generate_report(state: str):
    return {"reports": [f"【{state}】的调研报告已生成"]}

def continue_to_parallel(state: ParallelState):
    # 返回多个 Send 对象,LangGraph 会并行触发 generate_report 节点
    return [Send("generate_report", topic) for topic in state["topics"]]

parallel_workflow = StateGraph(ParallelState)
parallel_workflow.add_node("generate_report", generate_report)
parallel_workflow.add_conditional_edges(START, continue_to_parallel)
parallel_workflow.add_edge("generate_report", END)

app_parallel = parallel_workflow.compile()

第八步:使用 Command 简化图结构路由 (高阶写法)

抛弃繁琐的 add_conditional_edges,让节点自己决定去向。

from langgraph.types import Command

def smart_agent_node(state: AgentState):
    messages = state['messages']
    last_message = messages[-1]
    
    if "查天气" in last_message:
        # Command 方式:同时更新状态并指定去向
        return Command(
            goto="tool",
            update={"messages": ["准备调用天气工具..."]}
        )
    else:
        return Command(
            goto=END,
            update={"messages": ["直接回答问题,结束流程。"]}
        )

command_workflow = StateGraph(AgentState)
command_workflow.add_node("agent", smart_agent_node)
command_workflow.add_node("tool", execute_tool)
command_workflow.add_edge(START, "agent")
# 节点自带 goto 逻辑,无需再写 add_conditional_edges

进阶与生产环境迁移

当你掌握了基础的图结构后,LangGraph 在生产环境中的两大杀手锏是记忆持久化(Persistence)和人工干预(Human-in-the-loop)。

在编译图时,你可以传入一个 checkpointer(如基于 SQLite 或 Postgres 的保存器)。加入保存器后,图不仅能在单个循环中流转状态,还能将状态永久保存在数据库中。这意味着你可以实现线程级别的断点续传:

  1. 你的 Agent 可以在执行完某个高风险操作(如发送邮件前)自动暂停。

  2. 状态被冻结,等待人类用户确认(或直接在 UI 界面中修改 State 里的 API 请求参数)。

  3. 人类审批通过后,Agent 读取之前的状态,继续往下执行。

# 进阶编译示例(引入持久化检查点)
# from langgraph.checkpoint.sqlite import SqliteSaver
# memory = SqliteSaver.from_conn_string(":memory:")
# app = workflow.compile(checkpointer=memory, interrupt_before=["tool"])
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐