LangGraph 进阶:Send 并发执行——从串行到并行
系列文章目录
LangGraph 入门教程:从邮件审核工作流说起
LangGraph 进阶:人工审核与中断机制详解
LangGraph 进阶:Send 并发执行——从串行到并行
文章目录
前言
先看一个很常见的场景:你有 10 篇文章需要翻译成英文。串行做法是写个 for 循环,一篇翻完再翻下一篇,10 篇文章就是 10 次 LLM 调用排队等结果。如果一篇文章翻译要 2 秒,10 篇就是 20 秒——这还是网络状况好的时候。
你可能会想,能不能 10 篇同时发给 LLM?反正翻译之间没依赖关系,谁先翻完谁先回来。传统做法是多线程、asyncio、消息队列,代码写起来不麻烦但也不简单——尤其是线程间还得协调结果收集。
LangGraph 在图上用一个叫 Send 的机制解决了这个问题。你不用管线程、不用管异步,只需要在条件边里返回一组 Send 对象,LangGraph 的引擎自动帮你并行调度。本文就围绕 send.py 里的翻译工作流,把 Send 怎么用、底层怎么跑、跟普通条件边有什么区别,一点一点掰开讲。
本文基于
langgraph>=1.2.2,依赖 langchain-openai 作为 LLM 后端。代码完整可运行。
一、先跑一遍:翻译工作流长什么样
这个图跟前面两篇教程最大的不同:get_articles 出来后,同一个 translate_articles 节点跑了两次,分别处理不同的文章,然后各自的结果一起汇入 store_articles。
如果文章有 10 篇,translate_articles 就会被并发调起 10 个实例——每个实例独立跑,互不干扰。
二、准备工作:一个有点不一样的状态定义
import operator
from typing import Annotated, TypedDict
class AgentState(TypedDict):
articles: list[str] # 原始文章
translated_articles: Annotated[list[str], operator.add] # 翻译后的文章
articles 没啥特别的,就是个普通列表。关键是 translated_articles 的类型注解:
Annotated[list[str], operator.add]
这行代码的信息量很大。Annotated 是 Python 标准库 typing 里的一个工具,它允许你给类型"贴标签"——第一个参数是正常类型(list[str]),第二个参数是元数据。LangGraph 把这个元数据拿来做了一件很聪明的事:用它来指定状态合并策略。
具体来说,Annotated[list[str], operator.add] 的意思是:
- 字段的类型是
list[str] - 当多个节点同时往这个字段写数据时,用
operator.add来合并——也就是列表拼接
举个例子,3 篇文章并发翻译,各个翻译结果分别是 ["Hello"]、["World"]、["!"]。这三个列表不会互相覆盖,LangGraph 会自动调用 operator.add 把它们拼成 ["Hello", "World", "!"]。
为什么这里必须用 reducer?普通字段的默认行为是覆盖——后写的值把先写的值盖掉。并发场景下,三个翻译节点几乎同时返回,如果不指定合并策略,你只能拿到最后一个完成的翻译结果,前面两个就丢了。
reducer 是 Send 并发机制能够工作的基石。没有它,多路并发的产出就没法安全地汇聚到同一个状态字段里。
operator.add是一个很简单的 reducer。你完全可以写自定义的,比如lambda acc, new: sorted(acc + new)来在合并时顺便排序。只要是一个(accumulated, incoming) -> new_accumulated的函数就行。
LLM 实例的创建跟前两篇一样:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
load_dotenv()
llm = ChatOpenAI(
model=os.getenv("AL_MODEL_NAME"),
api_key=os.getenv("AL_API_KEY"),
base_url=os.getenv("AL_BASE_URL"),
)
三、节点:三个函数,只有翻译节点是重点
3.1 获取原始文章 —— get_articles
def get_articles(state: AgentState) -> dict:
articles = ["这是一篇文章", "这是另一篇文章"]
return {"articles": articles}
硬编码了两篇文章作为模拟数据。实际项目里换成数据库查询、文件读取、API 调用都行。它返回一个字典,把 articles 字段填上,状态里就有了"待翻译文章列表"。
3.2 翻译文章 —— translate_articles
def translate_articles(state: AgentState) -> dict:
current_article = state["current_article"]
response = llm.invoke(f"请将以下文章翻译成英文只返回翻译结果不包含其他内容:{current_article}")
return {"translated_articles": [response.content]}
这个函数有一个很微妙的点:它从状态里读的是 current_article,而不是 articles。回顾状态定义:
class AgentState(TypedDict):
articles: list[str]
translated_articles: Annotated[list[str], operator.add]
current_article 不在 AgentState 里。那它从哪来?答案在后面的 Send 调用里——Send 可以在派发时往状态里注入临时字段。这就是 Send 的灵活之处:你可以给每个并发实例传入不同的"上下文",让同一个节点函数跑出不同的结果。
返回 {"translated_articles": [response.content]} 时,注意是包装成列表的单元素。并发执行时,每个翻译实例返回它自己的那份 [result],LangGraph 用 operator.add 把它们全拼起来,最终在 store_articles 里拿到的就是一个完整的结果列表。
3.3 存储翻译结果 —— store_articles
def store_articles(state: AgentState) -> dict:
translated_articles = state["translated_articles"]
print("所有翻译结果:")
for i, text in enumerate(translated_articles, 1):
print(f"第{i}篇文章的翻译结果: {text}")
return {}
这个节点只是把汇聚好的翻译结果打印出来。到这里所有并发翻译已经完成了,translated_articles 是一个拼接完毕的完整列表。
四、Send 的核心魔法:一条返回 Send 列表的条件边
这是整篇文章最关键的代码:
from langgraph.types import Send
def route_to_translate(state: AgentState):
return [
Send("translate_articles", {"current_article": article})
for article in state["articles"]
]
这个函数的签名跟普通条件边路由函数一模一样:(state) -> ...。但以前我们返回的是字符串(要去的节点名),这次返回的是一个 Send 对象的列表。
Send 的构造器就两个参数:
| 参数 | 值 | 含义 |
|---|---|---|
| 第一个参数 | "translate_articles" |
目标节点名——派发到哪个节点 |
| 第二个参数 | {"current_article": article} |
传给目标节点的状态数据 |
当 LangGraph 的引擎走到 get_articles → 条件边 → route_to_translate 时,它检查返回值:
- 如果返回
"some_node"(字符串)→ 走单路,跟普通条件边一样 - 如果返回
[Send("node", {...}), Send("node", {...}), ...](Send 列表)→ 走多路,每个Send被同时调度到目标节点
以上面例子,articles 里有 2 篇文章,所以 route_to_translate 返回 2 个 Send 对象。引擎看到后,并行创建 2 个 translate_articles 实例,各自拿到不同的 current_article:
实例 ①:state["current_article"] = "这是一篇文章"
实例 ②:state["current_article"] = "这是另一篇文章"
两个实例同时调用 LLM 翻译。如果一篇文章翻 2 秒,总耗时还是 2 秒——因为两个实例在并发等结果。
重点:Send 的第二个参数会跟当前状态做一次浅合并。也就是说 {"current_article": "这是一篇文章"} 不会覆盖已有的 articles 和 translated_articles 字段,只是在传给 translate_articles 的状态快照里多塞了一个 current_article 键。原来的字段该怎么流转还怎么流转。
五、搭图:add_conditional_edges 在这里不一样了
from langgraph.graph import StateGraph, END, START
workflow = StateGraph(AgentState)
workflow.add_node("get_articles", get_articles)
workflow.add_node("translate_articles", translate_articles)
workflow.add_node("store_articles", store_articles)
workflow.add_edge(START, "get_articles")
workflow.add_conditional_edges("get_articles", route_to_translate)
workflow.add_edge("translate_articles", "store_articles")
workflow.add_edge("store_articles", END)
agent = workflow.compile()
agent.invoke({"articles": [], "translated_articles": []})
图的搭建整体跟前两篇一致,只有两处值得专门说一下。
条件边的路由函数返回了 Send 列表
workflow.add_conditional_edges("get_articles", route_to_translate)
add_conditional_edges 的第二个参数仍然是一个路由函数,但这个路由函数不再返回节点名字符串,而是返回 Send 对象列表。LangGraph 会根据返回类型自动区分——你不需要在 add_conditional_edges 里多传任何参数。
translate_articles → store_articles 的边
workflow.add_edge("translate_articles", "store_articles")
这条边描述的是:每一个 translate_articles 实例完成后,都去 store_articles。如果并发跑了 3 个翻译实例,3 个都会各自走到 store_articles。
store_articles 不会被跑 3 次——LangGraph 的调度器会等所有指向它的入边全部就绪后,只执行一次。实际上 store_articles 节点在内部依然是执行一次,它拿到的 state["translated_articles"] 已经是合并好的完整列表。
另外注意 invoke 传的是 {"articles": [], "translated_articles": []} 而不是空字典。因为状态里两个字段都标注了类型,传空字典的话 LangGraph 无法推断初始结构。显式给两个空列表,引擎就知道怎么初始化了。后面 get_articles 节点会覆盖 articles 字段,translated_articles 则靠 reducer 逐步累积。
六、执行模型:引擎到底干了什么
调用 agent.invoke({"articles": [], "translated_articles": []}) 后,引擎内部大概是这么几步:
1. START → get_articles
└─ 拿到 articles: ["这是一篇文章", "这是另一篇文章"]
2. get_articles → route_to_translate(state)
└─ 返回 [Send("translate_articles", {"current_article": "这是一篇文章"}),
Send("translate_articles", {"current_article": "这是另一篇文章"})]
3. 引擎看到 Send 列表,并行启动 2 个 translate_articles 实例
├─ 实例 A: llm.invoke("请将以下文章翻译成英文...这是一篇文章")
└─ 实例 B: llm.invoke("请将以下文章翻译成英文...这是另一篇文章")
4. 两个实例各自返回 {"translated_articles": ["..."]}
5. 引擎用 operator.add 合并: ["Article 1 EN", "Article 2 EN"]
6. store_articles 执行,打印合并后的列表
7. END
用一张时序图来看更直观:
get_articles ──┬── translate_articles(article=这是一篇文章) ──┬── store_articles ── END
│ │
└── translate_articles(article=这是另一篇文章) ──┘
两个翻译分支是并发跑的,store_articles 等两个都跑完才执行。
七、 完整代码
import operator
import os
from typing import Annotated, TypedDict
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END, START
from langgraph.types import Send
load_dotenv()
# 定义状态
class AgentState(TypedDict):
articles: list[str] # 原始文章
translated_articles: Annotated[list[str], operator.add] # 翻译后的文章
llm = ChatOpenAI(
model=os.getenv("AL_MODEL_NAME"), # 模型名称,从环境变量读取
api_key=os.getenv("AL_API_KEY"),
base_url=os.getenv("AL_BASE_URL"),
)
# 节点-获取原始文章
def get_articles(state: AgentState) -> dict:
# 模拟获取原始文章,真实场景中从数据库或文件中读取
articles = ["这是一篇文章", "这是另一篇文章"]
return {
"articles": articles,
}
# 节点-翻译文章
def translate_articles(state: AgentState) -> dict:
# 从状态中获取当前文章
current_article = state["current_article"]
# 调用模型翻译文章
response = llm.invoke(
f"请将以下文章翻译成英文只返回翻译结果不包含其他内容:{current_article}"
)
return {"translated_articles": [response.content]}
# 节点-存储翻译后的文章
def store_articles(state: AgentState) -> dict:
# 从状态中获取翻译后的文章
translated_articles = state["translated_articles"]
print("所有翻译结果:")
# 打印所有翻译结果,实际场景中可以保存到数据库或文件中
for i, text in enumerate(translated_articles, 1):
print(f"第{i}篇文章的翻译结果: {text}")
return {}
# 条件边-动态路由函数
def route_to_translate(state: AgentState):
return [
# 并发翻译文章
Send("translate_articles", {"current_article": article})
for article in state["articles"]
]
# 定义工作流
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("get_articles", get_articles)
workflow.add_node("translate_articles", translate_articles)
workflow.add_node("store_articles", store_articles)
workflow.add_edge(START, "get_articles")
# 添加边-从获取到文章到翻译文章
workflow.add_conditional_edges("get_articles", route_to_translate)
workflow.add_edge("translate_articles", "store_articles")
workflow.add_edge("store_articles", END)
# 编译工作流
agent = workflow.compile()
# 执行工作流
agent.invoke({"articles": [], "translated_articles": []})
运行效果:
八、Send vs 普通条件边:什么时候用哪个
| 场景 | 路由函数返回 | 行为 |
|---|---|---|
| 普通分支 | "node_name" |
走一条路,选哪个节点由逻辑决定 |
| 并发分发 | [Send("node", {}), ...] |
同一个节点同时跑多个实例,数据不同 |
| 混合分发 | [Send("node_a", {}), Send("node_b", {})] |
同时并发派发到不同节点(不常见但支持) |
简单来说:返回字符串是"去哪个节点",返回 Send 列表是"去这个节点,跑 N 次"。
什么时候用 Send?
- 列表里的每项处理彼此独立(翻译、审核、数据分析……)
- 每项处理是 IO 密集型的(LLM 调用、API 请求……),并发能显著缩短总耗时
- 有 reducer(
Annotated[x, operator.add]之类)来安全合并并发结果
什么时候不适合?
- 处理流程有严格顺序依赖(第三步依赖第二步的结果)
- 状态合并逻辑很复杂,reducer 写不明白
- 并发数太大(一次 invoke 派发上千个 Send 不现实,也容易打爆 LLM 的速率限制)
九、常见问题
Send 支持多少个并发实例?
没有硬性限制,但实际受两个因素制约:
- LLM 速率限制:你同时发 100 个请求过去,模型 API 大概率会返回 429(rate limit)。做好重试和并发数控制。
- Python 的 GIL:虽然翻译是 IO 密集的(等网络响应),GIL 影响不大。但如果在节点里做 CPU 密集的计算,并发未必比串行快。
建议根据实际 API 的并发上限来控制 articles 列表的长度,或者在 route_to_translate 里做分批。
能不能在 Send 里传复杂对象?
可以。Send 的第二个参数是一个字典,值可以是任意 Python 对象——字符串、数字、列表、字典都行。但因为 LangGraph 的 checkpoint 需要序列化状态,如果你传了不能被 pickle 的对象,加了 checkpointer 后就会报错。
reducer 不写会怎样?
节点的函数返回 {"translated_articles": ["..."]} 时,没有 reducer 的话,后返回的值会覆盖先返回的值。并发场景下,你得到的永远只有一个翻译结果(而且哪个实例最后完成是不确定的)。
如果你的业务场景是"只关心最快的那个结果",那覆盖反而是你想要的——但大多数 map-reduce 场景需要聚合,reducer 就是干这个的。
Send 和 interrupt() 能一起用吗?
可以。在并发翻译节点里加 interrupt(),每个并发分支都能独立中断。恢复时用 interrupt_id 指定恢复哪一个。
十、总结
本文围绕 send.py 的翻译工作流,把 Send 并发执行机制从头说了一遍:
Annotated[list, operator.add]定义了状态字段的合并策略——没有它,并发结果会互相踩踏Send(node, state_dict)是派发指令——告诉引擎"用这组数据跑一次这个节点"- 条件边返回
[Send, Send, ...]是并发的触发点——列表里几个 Send,就并发跑几次 translate_articles→store_articles这条边是汇聚点——所有并发分支完成后在这里合流
把前三篇串起来,LangGraph 的能力版图大概是这样的:
StateGraph + 节点 + 边 → 串行工作流(第一篇)
+ 条件边 → 分支与循环(第一篇)
+ interrupt + Command + checkpoint → 人工审核与中断(第二篇)
+ Annotated reducer + Send → 并发执行(本篇)
这四块拼在一起,已经能覆盖绝大多数业务工作流的需求——串行、分支、循环、暂停、恢复、并发。LangGraph 的学习成本主要在前两篇(节点和边的思维切换、中断的 checkpoint 概念),Send 其实是比较轻量的一个扩展——本质上就是把条件边的返回值从"一个节点名"扩展成"一组带数据的派发指令"。
从串行到并行,不用换框架,不用学新概念,改一下路由函数的返回值就行。这种"同一个抽象层解决不同复杂度问题"的设计,是 LangGraph 最舒服的地方。
更多推荐



所有评论(0)