系列文章目录

LangGraph 入门教程:从邮件审核工作流说起
LangGraph 进阶:人工审核与中断机制详解
LangGraph 进阶:Send 并发执行——从串行到并行


前言

先看一个很常见的场景:你有 10 篇文章需要翻译成英文。串行做法是写个 for 循环,一篇翻完再翻下一篇,10 篇文章就是 10 次 LLM 调用排队等结果。如果一篇文章翻译要 2 秒,10 篇就是 20 秒——这还是网络状况好的时候。

你可能会想,能不能 10 篇同时发给 LLM?反正翻译之间没依赖关系,谁先翻完谁先回来。传统做法是多线程、asyncio、消息队列,代码写起来不麻烦但也不简单——尤其是线程间还得协调结果收集。

LangGraph 在图上用一个叫 Send 的机制解决了这个问题。你不用管线程、不用管异步,只需要在条件边里返回一组 Send 对象,LangGraph 的引擎自动帮你并行调度。本文就围绕 send.py 里的翻译工作流,把 Send 怎么用、底层怎么跑、跟普通条件边有什么区别,一点一点掰开讲。

本文基于 langgraph>=1.2.2,依赖 langchain-openai 作为 LLM 后端。代码完整可运行。


一、先跑一遍:翻译工作流长什么样

Send

Send

START

get_articles
获取原始文章列表

translate_articles
翻译文章 ①

translate_articles
翻译文章 ②

store_articles
收集并输出翻译结果

END

这个图跟前面两篇教程最大的不同:get_articles 出来后,同一个 translate_articles 节点跑了两次,分别处理不同的文章,然后各自的结果一起汇入 store_articles

如果文章有 10 篇,translate_articles 就会被并发调起 10 个实例——每个实例独立跑,互不干扰。


二、准备工作:一个有点不一样的状态定义

import operator
from typing import Annotated, TypedDict

class AgentState(TypedDict):
    articles: list[str]                                        # 原始文章
    translated_articles: Annotated[list[str], operator.add]    # 翻译后的文章

articles 没啥特别的,就是个普通列表。关键是 translated_articles 的类型注解:

Annotated[list[str], operator.add]

这行代码的信息量很大。Annotated 是 Python 标准库 typing 里的一个工具,它允许你给类型"贴标签"——第一个参数是正常类型(list[str]),第二个参数是元数据。LangGraph 把这个元数据拿来做了一件很聪明的事:用它来指定状态合并策略

具体来说,Annotated[list[str], operator.add] 的意思是:

  • 字段的类型是 list[str]
  • 当多个节点同时往这个字段写数据时,用 operator.add 来合并——也就是列表拼接

举个例子,3 篇文章并发翻译,各个翻译结果分别是 ["Hello"]["World"]["!"]。这三个列表不会互相覆盖,LangGraph 会自动调用 operator.add 把它们拼成 ["Hello", "World", "!"]

为什么这里必须用 reducer?普通字段的默认行为是覆盖——后写的值把先写的值盖掉。并发场景下,三个翻译节点几乎同时返回,如果不指定合并策略,你只能拿到最后一个完成的翻译结果,前面两个就丢了。

reducer 是 Send 并发机制能够工作的基石。没有它,多路并发的产出就没法安全地汇聚到同一个状态字段里。

operator.add 是一个很简单的 reducer。你完全可以写自定义的,比如 lambda acc, new: sorted(acc + new) 来在合并时顺便排序。只要是一个 (accumulated, incoming) -> new_accumulated 的函数就行。

LLM 实例的创建跟前两篇一样:

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()

llm = ChatOpenAI(
    model=os.getenv("AL_MODEL_NAME"),
    api_key=os.getenv("AL_API_KEY"),
    base_url=os.getenv("AL_BASE_URL"),
)

三、节点:三个函数,只有翻译节点是重点

3.1 获取原始文章 —— get_articles

def get_articles(state: AgentState) -> dict:
    articles = ["这是一篇文章", "这是另一篇文章"]
    return {"articles": articles}

硬编码了两篇文章作为模拟数据。实际项目里换成数据库查询、文件读取、API 调用都行。它返回一个字典,把 articles 字段填上,状态里就有了"待翻译文章列表"。

3.2 翻译文章 —— translate_articles

def translate_articles(state: AgentState) -> dict:
    current_article = state["current_article"]
    response = llm.invoke(f"请将以下文章翻译成英文只返回翻译结果不包含其他内容:{current_article}")
    return {"translated_articles": [response.content]}

这个函数有一个很微妙的点:它从状态里读的是 current_article,而不是 articles。回顾状态定义:

class AgentState(TypedDict):
    articles: list[str]
    translated_articles: Annotated[list[str], operator.add]

current_article 不在 AgentState 里。那它从哪来?答案在后面的 Send 调用里——Send 可以在派发时往状态里注入临时字段。这就是 Send 的灵活之处:你可以给每个并发实例传入不同的"上下文",让同一个节点函数跑出不同的结果。

返回 {"translated_articles": [response.content]} 时,注意是包装成列表的单元素。并发执行时,每个翻译实例返回它自己的那份 [result],LangGraph 用 operator.add 把它们全拼起来,最终在 store_articles 里拿到的就是一个完整的结果列表。

3.3 存储翻译结果 —— store_articles

def store_articles(state: AgentState) -> dict:
    translated_articles = state["translated_articles"]
    print("所有翻译结果:")
    for i, text in enumerate(translated_articles, 1):
        print(f"第{i}篇文章的翻译结果: {text}")
    return {}

这个节点只是把汇聚好的翻译结果打印出来。到这里所有并发翻译已经完成了,translated_articles 是一个拼接完毕的完整列表。


四、Send 的核心魔法:一条返回 Send 列表的条件边

这是整篇文章最关键的代码:

from langgraph.types import Send

def route_to_translate(state: AgentState):
    return [
        Send("translate_articles", {"current_article": article})
        for article in state["articles"]
    ]

这个函数的签名跟普通条件边路由函数一模一样:(state) -> ...。但以前我们返回的是字符串(要去的节点名),这次返回的是一个 Send 对象的列表

Send 的构造器就两个参数:

参数 含义
第一个参数 "translate_articles" 目标节点名——派发到哪个节点
第二个参数 {"current_article": article} 传给目标节点的状态数据

当 LangGraph 的引擎走到 get_articles → 条件边 → route_to_translate 时,它检查返回值:

  • 如果返回 "some_node"(字符串)→ 走单路,跟普通条件边一样
  • 如果返回 [Send("node", {...}), Send("node", {...}), ...](Send 列表)→ 走多路,每个 Send 被同时调度到目标节点

以上面例子,articles 里有 2 篇文章,所以 route_to_translate 返回 2 个 Send 对象。引擎看到后,并行创建 2 个 translate_articles 实例,各自拿到不同的 current_article

实例 ①:state["current_article"] = "这是一篇文章"
实例 ②:state["current_article"] = "这是另一篇文章"

两个实例同时调用 LLM 翻译。如果一篇文章翻 2 秒,总耗时还是 2 秒——因为两个实例在并发等结果。

重点Send 的第二个参数会跟当前状态做一次浅合并。也就是说 {"current_article": "这是一篇文章"} 不会覆盖已有的 articlestranslated_articles 字段,只是在传给 translate_articles 的状态快照里多塞了一个 current_article 键。原来的字段该怎么流转还怎么流转。


五、搭图:add_conditional_edges 在这里不一样了

from langgraph.graph import StateGraph, END, START

workflow = StateGraph(AgentState)

workflow.add_node("get_articles", get_articles)
workflow.add_node("translate_articles", translate_articles)
workflow.add_node("store_articles", store_articles)

workflow.add_edge(START, "get_articles")
workflow.add_conditional_edges("get_articles", route_to_translate)
workflow.add_edge("translate_articles", "store_articles")
workflow.add_edge("store_articles", END)

agent = workflow.compile()
agent.invoke({"articles": [], "translated_articles": []})

图的搭建整体跟前两篇一致,只有两处值得专门说一下。

条件边的路由函数返回了 Send 列表

workflow.add_conditional_edges("get_articles", route_to_translate)

add_conditional_edges 的第二个参数仍然是一个路由函数,但这个路由函数不再返回节点名字符串,而是返回 Send 对象列表。LangGraph 会根据返回类型自动区分——你不需要在 add_conditional_edges 里多传任何参数。

translate_articlesstore_articles 的边

workflow.add_edge("translate_articles", "store_articles")

这条边描述的是:每一个 translate_articles 实例完成后,都去 store_articles。如果并发跑了 3 个翻译实例,3 个都会各自走到 store_articles

store_articles 不会被跑 3 次——LangGraph 的调度器会等所有指向它的入边全部就绪后,只执行一次。实际上 store_articles 节点在内部依然是执行一次,它拿到的 state["translated_articles"] 已经是合并好的完整列表。

另外注意 invoke 传的是 {"articles": [], "translated_articles": []} 而不是空字典。因为状态里两个字段都标注了类型,传空字典的话 LangGraph 无法推断初始结构。显式给两个空列表,引擎就知道怎么初始化了。后面 get_articles 节点会覆盖 articles 字段,translated_articles 则靠 reducer 逐步累积。


六、执行模型:引擎到底干了什么

调用 agent.invoke({"articles": [], "translated_articles": []}) 后,引擎内部大概是这么几步:

1. START → get_articles
   └─ 拿到 articles: ["这是一篇文章", "这是另一篇文章"]

2. get_articles → route_to_translate(state)
   └─ 返回 [Send("translate_articles", {"current_article": "这是一篇文章"}),
            Send("translate_articles", {"current_article": "这是另一篇文章"})]

3. 引擎看到 Send 列表,并行启动 2 个 translate_articles 实例
   ├─ 实例 A: llm.invoke("请将以下文章翻译成英文...这是一篇文章")
   └─ 实例 B: llm.invoke("请将以下文章翻译成英文...这是另一篇文章")

4. 两个实例各自返回 {"translated_articles": ["..."]}

5. 引擎用 operator.add 合并: ["Article 1 EN", "Article 2 EN"]

6. store_articles 执行,打印合并后的列表

7. END

用一张时序图来看更直观:

get_articles ──┬── translate_articles(article=这是一篇文章) ──┬── store_articles ── END
              │                                             │
              └── translate_articles(article=这是另一篇文章) ──┘

两个翻译分支是并发跑的,store_articles 等两个都跑完才执行。


七、 完整代码

import operator
import os
from typing import Annotated, TypedDict
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
from langgraph.types import Send

load_dotenv()


# 定义状态
class AgentState(TypedDict):
    articles: list[str]  # 原始文章
    translated_articles: Annotated[list[str], operator.add]  # 翻译后的文章


llm = ChatOpenAI(
    model=os.getenv("AL_MODEL_NAME"),  # 模型名称,从环境变量读取
    api_key=os.getenv("AL_API_KEY"),
    base_url=os.getenv("AL_BASE_URL"),
)


# 节点-获取原始文章
def get_articles(state: AgentState) -> dict:
    # 模拟获取原始文章,真实场景中从数据库或文件中读取
    articles = ["这是一篇文章", "这是另一篇文章"]
    return {
        "articles": articles,
    }


# 节点-翻译文章
def translate_articles(state: AgentState) -> dict:
    # 从状态中获取当前文章
    current_article = state["current_article"]
    # 调用模型翻译文章
    response = llm.invoke(
        f"请将以下文章翻译成英文只返回翻译结果不包含其他内容:{current_article}"
    )

    return {"translated_articles": [response.content]}


# 节点-存储翻译后的文章
def store_articles(state: AgentState) -> dict:
    # 从状态中获取翻译后的文章
    translated_articles = state["translated_articles"]
    print("所有翻译结果:")
    # 打印所有翻译结果,实际场景中可以保存到数据库或文件中
    for i, text in enumerate(translated_articles, 1):
        print(f"第{i}篇文章的翻译结果: {text}")
    return {}


# 条件边-动态路由函数
def route_to_translate(state: AgentState):

    return [
        # 并发翻译文章
        Send("translate_articles", {"current_article": article})
        for article in state["articles"]
    ]

# 定义工作流
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("get_articles", get_articles)
workflow.add_node("translate_articles", translate_articles)
workflow.add_node("store_articles", store_articles)
workflow.add_edge(START, "get_articles")
# 添加边-从获取到文章到翻译文章
workflow.add_conditional_edges("get_articles", route_to_translate)
workflow.add_edge("translate_articles", "store_articles")
workflow.add_edge("store_articles", END)

# 编译工作流
agent = workflow.compile()
# 执行工作流
agent.invoke({"articles": [], "translated_articles": []})

运行效果:
在这里插入图片描述


八、Send vs 普通条件边:什么时候用哪个

场景 路由函数返回 行为
普通分支 "node_name" 走一条路,选哪个节点由逻辑决定
并发分发 [Send("node", {}), ...] 同一个节点同时跑多个实例,数据不同
混合分发 [Send("node_a", {}), Send("node_b", {})] 同时并发派发到不同节点(不常见但支持)

简单来说:返回字符串是"去哪个节点",返回 Send 列表是"去这个节点,跑 N 次"。

什么时候用 Send?

  • 列表里的每项处理彼此独立(翻译、审核、数据分析……)
  • 每项处理是 IO 密集型的(LLM 调用、API 请求……),并发能显著缩短总耗时
  • 有 reducer(Annotated[x, operator.add] 之类)来安全合并并发结果

什么时候不适合?

  • 处理流程有严格顺序依赖(第三步依赖第二步的结果)
  • 状态合并逻辑很复杂,reducer 写不明白
  • 并发数太大(一次 invoke 派发上千个 Send 不现实,也容易打爆 LLM 的速率限制)

九、常见问题

Send 支持多少个并发实例?

没有硬性限制,但实际受两个因素制约:

  1. LLM 速率限制:你同时发 100 个请求过去,模型 API 大概率会返回 429(rate limit)。做好重试和并发数控制。
  2. Python 的 GIL:虽然翻译是 IO 密集的(等网络响应),GIL 影响不大。但如果在节点里做 CPU 密集的计算,并发未必比串行快。

建议根据实际 API 的并发上限来控制 articles 列表的长度,或者在 route_to_translate 里做分批。

能不能在 Send 里传复杂对象?

可以。Send 的第二个参数是一个字典,值可以是任意 Python 对象——字符串、数字、列表、字典都行。但因为 LangGraph 的 checkpoint 需要序列化状态,如果你传了不能被 pickle 的对象,加了 checkpointer 后就会报错。

reducer 不写会怎样?

节点的函数返回 {"translated_articles": ["..."]} 时,没有 reducer 的话,后返回的值会覆盖先返回的值。并发场景下,你得到的永远只有一个翻译结果(而且哪个实例最后完成是不确定的)。

如果你的业务场景是"只关心最快的那个结果",那覆盖反而是你想要的——但大多数 map-reduce 场景需要聚合,reducer 就是干这个的。

Sendinterrupt() 能一起用吗?

可以。在并发翻译节点里加 interrupt(),每个并发分支都能独立中断。恢复时用 interrupt_id 指定恢复哪一个。


十、总结

本文围绕 send.py 的翻译工作流,把 Send 并发执行机制从头说了一遍:

  1. Annotated[list, operator.add] 定义了状态字段的合并策略——没有它,并发结果会互相踩踏
  2. Send(node, state_dict) 是派发指令——告诉引擎"用这组数据跑一次这个节点"
  3. 条件边返回 [Send, Send, ...] 是并发的触发点——列表里几个 Send,就并发跑几次
  4. translate_articlesstore_articles 这条边是汇聚点——所有并发分支完成后在这里合流

把前三篇串起来,LangGraph 的能力版图大概是这样的:

StateGraph + 节点 + 边              → 串行工作流(第一篇)
    + 条件边                         → 分支与循环(第一篇)
    + interrupt + Command + checkpoint → 人工审核与中断(第二篇)
    + Annotated reducer + Send        → 并发执行(本篇)

这四块拼在一起,已经能覆盖绝大多数业务工作流的需求——串行、分支、循环、暂停、恢复、并发。LangGraph 的学习成本主要在前两篇(节点和边的思维切换、中断的 checkpoint 概念),Send 其实是比较轻量的一个扩展——本质上就是把条件边的返回值从"一个节点名"扩展成"一组带数据的派发指令"。

从串行到并行,不用换框架,不用学新概念,改一下路由函数的返回值就行。这种"同一个抽象层解决不同复杂度问题"的设计,是 LangGraph 最舒服的地方。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐