系列文章目录

LangGraph 入门教程:从邮件审核工作流说起
LangGraph 进阶:人工审核与中断机制详解
LangGraph 进阶:Send 并发执行——从串行到并行



前言

如果你已经用过 LangChain,一定对 Chain 不陌生——把 LLM 调用、工具、解析器串成一条流水线,数据从头流到尾。但真实业务很少是一条直线。审核要打回重写、订单要人工确认、出错了要重试……这些"回头路"用 Chain 很难表达。

LangGraph 就是来解决这个问题的。它用来组织工作流——节点是处理单元,边是流转方向。因为有向图天然支持分支和循环,所以"打回重写"这种事就变成了图上一条回头边。

本文不会罗列所有 API,而是围绕一个实际例子——邮件草稿生成 + 人工审核回路——把 LangGraph 的核心概念讲透。读完你应该能自己搭一个带人工审核的工作流。

本文基于 langgraph>=1.2.2,依赖 langchain-openai 作为 LLM 后端。代码完整可运行,环境变量配置好 API Key 就能跑。


一、先看成品:这个工作流长什么样

approve

reject

START

generate_draft
生成邮件草稿

human_review
人工审核

send_email
发送邮件

END

如果审核通过,流程是 START → 生成草稿 → 人工审核 → 发送邮件 → END,一条线走到底。

如果审核不通过,审核者写下修改意见,流程回到"生成草稿"节点重新写,再送审——这个循环可以一直重复,直到通过为止。

这个模式在内容审核、合同审批、工单流转里都非常常见。下面我们一步步把它搭出来。


二、准备工作:定义"状态"

LangGraph 里最核心的数据结构叫 State(状态)。你可以把它理解成一张"随流程传递的便签纸"——每个节点往上面写点东西,下一个节点接着用。

from typing import TypedDict

class AgentState(TypedDict):
    subject: str          # 邮件主题
    draft: str            # 草稿内容
    feedback: str         # 人工审核的修改意见
    review_decision: str  # 审核决策:approve / reject
    final_status: str     # 最终发送结果

TypedDict 是 Python 标准库的类型提示工具,LangGraph 用它来推断状态里有哪些字段。这里五个字段各司其职:

字段 谁写入 谁读取
subject 初始输入 生成草稿、发送邮件
draft 生成草稿节点 审核节点、发送节点
feedback 审核节点(拒绝时) 生成草稿节点(改写时)
review_decision 审核节点 条件边(决定走哪条路)
final_status 发送节点 最终输出

关键点:节点函数的返回值不需要包含所有字段,LangGraph 会自动把你返回的字段合并到当前状态上——这叫"部分状态更新"。比如 generate_draft 返回 {"draft": "xxx", "feedback": ""},其他字段保持不变。


三、LLM 实例:整个工作流的引擎

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()

llm = ChatOpenAI(
    model=os.getenv("AL_MODEL_NAME"),
    api_key=os.getenv("AL_API_KEY"),
    base_url=os.getenv("AL_BASE_URL"),
)

这里用了 OpenAI 兼容接口——你配的 AL_BASE_URL 可以是任何兼容 /v1/chat/completions 的服务(国内各种代理、本地模型都行),不一定是 OpenAI 官方。三个环境变量写在 .env 文件里,load_dotenv() 一行加载。


四、搭节点:三个函数,三种职责

LangGraph 的**节点(Node)**就是一个普通的 Python 函数。它接收当前状态,返回一个字典(部分更新),仅此而已。

4.1 生成草稿 —— generate_draft

这个节点负责调 LLM 写邮件正文。它有两种模式:

  • 首次生成feedback 为空,根据 subject 直接写
  • 打回重写feedback 有内容,根据修改意见重写
def generate_draft(state: AgentState) -> dict:
    subject = state["subject"]
    feedback = state.get("feedback", "")

    system_prompt = (
        "你是一个专业的商务邮件撰写助手。"
        "请根据用户提供的主题写一封简洁、友好的邮件草稿。"
    )

    if feedback:
        human_message = (
            f"主题:{subject}\n\n"
            f"请根据以下修改意见重新撰写邮件:\n{feedback}\n\n"
            f"只输出邮件正文,不要包含其他内容。"
        )
    else:
        human_message = (
            f"主题:{subject}\n\n"
            f"请写一封邮件草稿,通知客户最新进展。"
            f"只输出邮件正文,不要包含任何其他内容。"
        )

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": human_message},
    ]

    response = llm.invoke(messages)
    return {"draft": response.content, "feedback": ""}

值得注意的细节:返回时把 feedback 置空。因为如果上次审核给了修改意见,这次 LLM 已经按意见重写了,旧的反馈应该清掉,否则下次再走到这个节点会重复使用同一条反馈。

4.2 人工审核 —— human_review

这个节点用 input() 模拟人工介入。真实场景里这里可能是一个前端页面等待按钮点击,或者一条 Slack 消息等回复——但目前用命令行输入就够说明问题了。

def human_review(state: AgentState) -> dict:
    draft = state["draft"]

    review_decision = input(
        f"请审核邮件草稿:\n{draft}\n"
        f"请输入审核决策(approve(通过)/ reject(拒绝)):"
    )
    while review_decision not in ["approve", "reject"]:
        review_decision = input("请输入正确的审核决策(approve/reject):")

    if review_decision == "approve":
        return {"review_decision": "approve"}
    else:
        feedback = input("请输入修改意见:")
        return {"review_decision": "reject", "feedback": feedback}

逻辑很直白:

  1. 打印草稿,等审核者输入决策
  2. 如果乱输入就一直让重输(简单的输入校验)
  3. 通过 → 只返回 review_decision
  4. 拒绝 → 同时返回 review_decisionfeedback(修改意见)

4.3 发送邮件 —— send_email

这是终点前的最后一步,这里用 print 模拟。真实场景替换成邮件 API 调用即可。

def send_email(state: AgentState) -> dict:
    subject = state["subject"]
    draft = state["draft"]
    print(f"发送邮件:主题:{subject}\n内容:{draft}")
    return {"final_status": "success"}

五、连边:把节点串成图

节点写好了,但它们是孤立的。我们需要告诉 LangGraph"谁之后是谁"。

5.1 固定边:add_edge

固定边就是"无条件、走完 A 就走 B":

from langgraph.graph import StateGraph
from langgraph.func import START, END

workflow = StateGraph(AgentState)

# 注册节点
workflow.add_node("generate_draft", generate_draft)
workflow.add_node("human_review", human_review)
workflow.add_node("send_email", send_email)

# 连边
workflow.add_edge(START, "generate_draft")         # 入口 → 生成草稿
workflow.add_edge("generate_draft", "human_review")  # 生成草稿 → 审核
workflow.add_edge("send_email", END)               # 发送邮件 → 出口

这里有三个点值得说明:

StateGraph(AgentState) 是图的构造器。泛型参数 AgentState 告诉 LangGraph"这个图的状态长这样"——后续所有节点函数的参数签名都会据此校验。

add_node(name, func) 注册一个节点。第一个参数是字符串名字(图上唯一标识),第二个是处理函数。

STARTEND 是两个特殊标记:

  • START 表示图的入口——工作流从哪开始执行
  • END 表示图的出口——走到这就结束了

5.2 条件边:add_conditional_edges

审核后走"发送"还是"打回重写",取决于 review_decision 的值。这就要用到条件边:

def after_review(state: AgentState) -> Literal["send_email", "generate_draft"]:
    if state.get("review_decision", "") == "approve":
        return "send_email"
    else:
        return "generate_draft"

workflow.add_conditional_edges("human_review", after_review)

add_conditional_edges(source, router) 的含义是:从 source 节点出来后,调用 router 函数,根据返回值决定下一个节点。路由函数的返回值必须是已注册的节点名(或者 END)。

返回类型注解 Literal["send_email", "generate_draft"] 不是装饰品——LangGraph 会读取它来绘制图的可视化,所以写清楚能让调试省很多事。

把这四条边拼在一起,图就完整了:

START → generate_draft → human_review → send_email → END
                              ↑              │
                              └── reject ────┘

六、编译与运行

# 编译——把图"固化"成一个可执行对象
agent = workflow.compile()

# 运行——传入初始状态
result = agent.invoke({"subject": "产品更新通知"})

print(result["final_status"], "发送结果")

compile() 做了几件事:校验图结构(有没有断头路、节点名是否都注册了)、建立状态更新的合并逻辑、生成可执行对象。

invoke(initial_state) 以同步方式运行整个图。传入的字典会作为初始状态,图跑到 END 后返回最终状态。

运行结果:

在这里插入图片描述


七、完整代码

import os
from typing import Literal, TypedDict

from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph,END, START

load_dotenv()


# 定义状态
class AgentState(TypedDict):
    subject: str  # 邮件主题
    draft: str  # 肉稿内容
    feedback: str  # 人工审核修改意见
    review_decision: str  # 审核决策,"approve"(通过) 或 "reject"(拒绝)
    final_status: str  # 发送结果


llm = ChatOpenAI(
    model=os.getenv("AL_MODEL_NAME"),  # 模型名称,从环境变量读取
    api_key=os.getenv("AL_API_KEY"),
    base_url=os.getenv("AL_BASE_URL"),
)


# 节点-生成邮件草稿
def generate_draft(state: AgentState) -> dict:
    subject = state["subject"]  # 邮件主题
    feedback = state.get("feedback", "")  # 人工审核修改意见
    # 系统提示
    system_prompt = "你是一个专业的商务邮件撰写助手。请根据用户提供的主题写一封简洁、友好的邮件草稿。"
    if feedback:
        # 有修改意见,根据修改意见重新撰写邮件
        human_message = f"主题:{subject}\n\n请根据以下修改意见重新撰写邮件:\n{feedback}\n\n只输出邮件正文,不要包含其他内容。"
    else:
        # 无修改意见,根据主题撰写邮件
        human_message = f"主题:{subject}\n\n请写一封邮件草稿,通知客户最新进展。只输出邮件正文,不要包含任何其他内容。"

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": human_message},
    ]
    # 调用模型生成邮件草稿
    response = llm.invoke(messages)

    # 返回草稿内容
    return {
        "draft": response.content,
        "feedback": "",
    }


# 节点-人工审核邮件草稿
def human_review(state: AgentState) -> dict:
    draft = state["draft"]  # 肉稿内容
    # 用input模拟人工审核,返回审核决策,"approve" 或 "reject"
    review_decision = input(
        f"请审核邮件草稿:\n{draft}\n请输入审核决策(approve(通过)/reject(拒绝):"
    )
    while review_decision not in ["approve", "reject"]:
        review_decision = input("请输入正确的审核决策(approve/reject):")
    # 通过
    if review_decision == "approve":
        return {
            "review_decision": review_decision,
        }
    else:
        # 拒绝
        feedback = input("请输入修改意见:")
        return {
            "review_decision": review_decision,
            "feedback": feedback,
        }


# 节点-发送邮件
def send_email(state: AgentState) -> dict:
    subject = state["subject"]  # 邮件主题
    draft = state["draft"]  # 肉稿内容
    # 模拟发送邮件
    print(f"发送邮件:主题:{subject}\n内容:{draft}")
    return {"final_status": "success"}


# 条件边-审核后操作
def after_review(state: AgentState) -> Literal["send_email", "generate_draft"]:
    if state.get("review_decision", "") == "approve":
        # 通过,发送邮件
        return "send_email"
    else:
        # 拒绝,重新生成邮件草稿
        return "generate_draft"


# 定义工作流
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("generate_draft", generate_draft)
workflow.add_node("human_review", human_review)
workflow.add_node("send_email", send_email)

# 添加边
workflow.add_edge(START, "generate_draft")  # 添加边-从开始到生成邮件草稿
workflow.add_edge("generate_draft", "human_review")  # 添加边-从生成邮件草稿到人工审核
workflow.add_conditional_edges(
    "human_review", after_review
)  # 添加条件边-从人工审核到发送邮件或重新生成邮件草稿
workflow.add_edge("send_email", END)  # 添加边-从发送邮件到结束

# 编译工作流
agent = workflow.compile()
# 执行工作流
result = agent.invoke({"subject": "产品更新通知"})
# 打印结果
print(result["final_status"], "发送结果")


八、关键 API 速查

API 作用 一句话说明
StateGraph(StateType) 创建状态图 指定状态类型,后续所有操作都基于这个状态
add_node(name, func) 注册节点 name 是字符串 ID,func(state) -> dict 的函数
add_edge(from, to) 添加固定边 from 走完一定走 to,可用 START/END
add_conditional_edges(source, router) 添加条件边 router 返回下一个节点名,实现分支
compile(checkpointer=None) 编译图 校验 + 固化,可选传入 checkpoint 实现持久化
invoke(state, config=None) 同步执行 传入初始状态,阻塞运行直到 END
astream(state) 流式执行 返回异步迭代器,可以逐节点观察状态变化

九、总结

回顾一下我们做了什么:

  1. TypedDict 定义了一个状态——工作流中流转的数据
  2. 写了三个节点函数——生成草稿、人工审核、发送邮件
  3. add_edgeadd_conditional_edges 搭出了带回路的有向图
  4. compile() + invoke() 跑通了整个流程

LangGraph 本质上就是帮你管理"LLM 调用 + 流程控制 + 状态传递"这三件事。对于有 LangChain 基础的开发者,把它理解成"加强版的 Chain,支持循环和条件分支"就对了。

从这个例子出发,你可以很容易地扩展出更多模式:

  • 多加几个审核节点 → 多级审批
  • 加一个并行节点 → 同时生成多个草稿让审核者挑
  • MemorySaver 换成 SqliteSaver → 审核状态持久化到数据库
  • 结合 interrupt() → 做一个真正的 Web 端审核后台

LangGraph 的学习曲线比 LangChain 略陡,但一旦理解了"状态→节点→边"这个心智模型,能做的事情比 Chain 多得多。

另外,不要误以为 LangChain 和 LangGraph 是两个独立竞争的框架——实际上 LangGraph 本身就是 LangChain 生态的一部分,底层复用了 LangChain 的 LLM 调用、工具、消息等基础组件。很多企业级项目会先用 LangChain 快速搭出原型来验证业务可行性,跑通之后再迁移到 LangGraph 处理复杂的流程控制,最终上线。两者是互补关系,不是二选一。

从可控性的角度看,LangChain 的 Chain 更像一个黑盒子——你传入输入等输出,中间的执行过程对你是不可见的,想插一脚改个中间结果很难。而 LangGraph 是白盒模型,整个流程按你定义的节点和边一步步流转,每个节点的输入输出都暴露在状态里,你可以随时在任何节点前后加逻辑、查状态、打断点。这种"流程完全可见"的特点,正是生产环境里排错、审计、人工介入这些需求所依赖的。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐