langgraph官网翻译:workflows
文章目录
本文翻译自langgraph官网:https://langchain-ai.github.io/langgraph/tutorials/workflows/
Workflows and Agents
本指南回顾了智能体系统的常见模式。在描述这些系统时,区分“工作流”和“智能体”会很有帮助。Anthropic 的一篇博客文章《Building Effective Agents》很好地解释了二者之间的差异。
Workflows是通过预定义的代码路径来编排大语言模型和工具的系统。而Agents则是由大语言模型动态地指导其自身流程和工具使用的系统,并自主控制如何完成任务。
以下是可视化这些方法:
在构建智能体和工作流时,LangGraph 提供了诸多优势,包括持久化、流式传输、调试支持以及部署能力。
Set up
您可以使用任何支持structured outputs 和 tool calling的聊天模型。下面,我们展示了安装软件包、设置 API 密钥以及测试 Anthropic 的结构化输出/工具调用的过程。
# Install dependencies
pip install langchain_core langchain-anthropic langgraph
# 初始化一个 LLM:
import os
import getpass
from langchain_anthropic import ChatAnthropic
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
llm = ChatAnthropic(model="claude-3-5-sonnet-latest")
Building Blocks: The Augmented LLM
大语言模型拥有一些增强功能,可用于构建工作流和智能体。这些功能包括结构化输出和工具调用,如下图所示(该图取自 Anthropic 的博客文章《Building Effective Agents》):

# Schema for structured output 结构化输出的模式
from pydantic import BaseModel, Field
class SearchQuery(BaseModel):
search_query: str = Field(None, description="Query that is optimized web search.")
justification: str = Field(
None, description="Why this query is relevant to the user's request."
)
# Augment the LLM with schema for structured output 用模式增强LLM以实现结构化输出
structured_llm = llm.with_structured_output(SearchQuery)
# Invoke the augmented LLM 调用增强型LLM
output = structured_llm.invoke("How does Calcium CT score relate to high cholesterol?")
# Define a tool
def multiply(a: int, b: int) -> int:
return a * b
# Augment the LLM with tools
llm_with_tools = llm.bind_tools([multiply])
# Invoke the LLM with input that triggers the tool call
# 调用LLM并输入触发工具调用的内容
msg = llm_with_tools.invoke("What is 2 times 3?")
# Get the tool call
msg.tool_calls
Prompt chaining 提示链
在Prompt chaining中,每一次大语言模型(LLM)调用都会处理前一次调用的输出。
正如 Anthropic 博客文章《Building Effective Agents》中所述:
提示链将一个任务分解为一系列步骤,其中每个 LLM
调用都处理前一个调用的输出。你可以在任何中间步骤添加程序化检查(见下图中的“门”),以确保整个过程仍在正轨上。
何时使用此工作流:当一个任务可以被轻松、清晰地分解为固定的子任务时,此工作流是理想选择。其主要目标是通过让每一次 LLM 调用都成为一个更简单的任务,来以延迟换取更高的准确性。

# 从 typing_extensions 模块导入 TypedDict,用于创建有明确键类型的字典
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display
# 定义图的状态结构
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str
# 定义节点(即工作流中的各个步骤)
def generate_joke(state: State):
"""第一次 LLM 调用,用于生成初始笑话"""
# 调用大语言模型,根据传入的主题生成一个笑话
msg = llm.invoke(f"Write a short joke about {state['topic']}")
# 返回一个字典,更新状态中的 'joke' 字段
return {"joke": msg.content}
def check_punchline(state: State):
"""门控函数,用于检查笑话是否包含点睛之笔"""
# 简单的检查逻辑:笑话中是否包含 "?" 或 "!"
if "?" in state["joke"] or "!" in state["joke"]:
return "Pass" # 如果包含,则通过
return "Fail" # 否则,不通过
def improve_joke(state: State):
"""第二次 LLM 调用,用于改进笑话"""
# 调用大语言模型,通过添加文字游戏让笑话更有趣
msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")
# 返回一个字典,更新状态中的 'improved_joke' 字段
return {"improved_joke": msg.content}
def polish_joke(state: State):
"""第三次 LLM 调用,进行最终打磨"""
# 调用大语言模型,为笑话添加一个令人惊讶的转折
msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}")
# 返回一个字典,更新状态中的 'final_joke' 字段
return {"final_joke": msg.content}
# 构建工作流
workflow = StateGraph(State)
# 添加节点
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)
# 添加边来连接节点
# 设置起始点,工作流从 START 节点直接进入 "generate_joke" 节点
workflow.add_edge(START, "generate_joke")
# 添加条件边,从 "generate_joke" 节点出发
# 根据 check_punchline 函数的返回值决定下一步:
# 如果返回 "Fail",则进入 "improve_joke" 节点
# 如果返回 "Pass",则直接进入 END 节点,结束流程
workflow.add_conditional_edges(
"generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
# "improve_joke" 节点执行完毕后,进入 "polish_joke" 节点
workflow.add_edge("improve_joke", "polish_joke")
# "polish_joke" 节点执行完毕后,进入 END 节点,结束流程
workflow.add_edge("polish_joke", END)
# 编译工作流,创建一个可执行链
chain = workflow.compile()
# 可视化并显示工作流图
display(Image(chain.get_graph().draw_mermaid_png()))
# 调用工作流链,并传入初始状态(主题为 "cats")
state = chain.invoke({"topic": "cats"})
# 打印初始生成的笑话
print("Initial joke:")
print(state["joke"])
print("\n--- --- ---\n")
# 检查状态中是否存在 'improved_joke' 字段
# 如果存在,说明初始笑话未通过质量检查,并经过了后续的改进和打磨
if "improved_joke" in state:
print("Improved joke:")
print(state["improved_joke"])
print("\n--- --- ---\n")
print("Final joke:")
print(state["final_joke"])
else:
# 如果不存在,说明初始笑话质量足够好,流程已提前结束
print("Joke failed quality gate - no punchline detected!")
Parallelization
通过并行化,大型语言模型可以同时处理一项任务:
大型语言模型有时可以同时处理一项任务,并通过程序化的方式汇总其输出。这种并行化工作流,主要有两种模式: 分块:将一个任务分解为多个独立的子任务,并并行运行。 投票:重复执行相同任务以获取多样化输出。
适用场景:当分解后的子任务可通过并行化提升速度,或者需要多个视角或多次尝试来获得更高可信度的结果时,并行化是有效的。对于包含多个考量因素的复杂任务,如果每个考量因素都由一次独立的 LLM 调用来处理,让模型能专注于每个特定方面,那么 LLM 的整体表现通常会更好。
# 图状态
class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str
# 节点
def call_llm_1(state: State):
"""第一次 LLM 调用,用于生成初始笑话"""
msg = llm.invoke(f"请写一个关于 {state['topic']} 的笑话")
return {"joke": msg.content}
def call_llm_2(state: State):
"""第二次 LLM 调用,用于生成故事"""
msg = llm.invoke(f"请写一个关于 {state['topic']} 的故事")
return {"story": msg.content}
def call_llm_3(state: State):
"""第三次 LLM 调用,用于生成诗歌"""
msg = llm.invoke(f"请写一首关于 {state['topic']} 的诗歌")
return {"poem": msg.content}
def aggregator(state: State):
"""将笑话、故事和诗歌组合成一个单一的输出"""
combined = f"这是一个关于 {state['topic']} 的故事、笑话和诗歌!\n\n"
combined += f"故事:\n{state['story']}\n\n"
combined += f"笑话:\n{state['joke']}\n\n"
combined += f"诗歌:\n{state['poem']}"
return {"combined_output": combined}
# 构建工作流
parallel_builder = StateGraph(State)
# 添加节点
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)
# 添加边来连接节点
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()
# 显示工作流
display(Image(parallel_workflow.get_graph().draw_mermaid_png()))
# 调用工作流
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])
Routing
路由对输入进行分类,并将其引导至一个后续的专门任务。正如 Anthropic 博客文章中所述:
路由对输入进行分类,并将其引导至一个专门的后续任务。这种工作流实现了关注点分离,并能构建更具针对性的提示。如果没有这种工作流,针对某一类输入进行优化可能会损害其在其他输入上的性能。
何时使用此工作流: 路由适用于复杂的任务,这些任务存在可以被更好分开处理的、截然不同的类别,并且分类本身可以由大语言模型或更传统的分类模型/算法准确处理。

from typing_extensions import Literal
from langchain_core.messages import HumanMessage, SystemMessage
# 用于结构化输出的模式,用作路由逻辑
class Route(BaseModel):
step: Literal["poem", "story", "joke"] = Field(
None, description="路由流程中的下一步"
)
# 使用结构化输出的模式来增强 LLM
router = llm.with_structured_output(Route)
# 状态
class State(TypedDict):
input: str # 输入
decision: str # 决策
output: str # 输出
# 节点
def llm_call_1(state: State):
"""写一个故事"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_2(state: State):
"""讲一个笑话"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_3(state: State):
"""写一首诗"""
result = llm.invoke(state["input"])
return {"output": result.content}
def llm_call_router(state: State):
"""将输入路由到相应的节点"""
# 运行带有结构化输出的增强型 LLM,以作为路由逻辑
decision = router.invoke(
[
SystemMessage(
content="根据用户的请求,将输入路由到故事、笑话或诗歌。"
),
HumanMessage(content=state["input"]),
]
)
return {"decision": decision.step}
# 条件边函数,用于路由到相应的节点
def route_decision(state: State):
# 返回你接下来想要访问的节点名称
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"
# 构建工作流
router_builder = StateGraph(State)
# 添加节点
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)
# 添加边来连接节点
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
source="llm_call_router", # 源节点
path=route_decision, # 决定路径的函数
{ # route_decision 返回的名称 : 要访问的下一个节点的名称
"llm_call_1": "llm_call_1",
"llm_call_2": "llm_call_2",
"llm_call_3": "llm_call_3",
},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)
# 编译工作流
router_workflow = router_builder.compile()
# 显示工作流图
display(Image(router_workflow.get_graph().draw_mermaid_png()))
# 调用工作流
state = router_workflow.invoke({"input": "给我讲一个关于猫的笑话"})
print(state["output"])
Orchestrator-Worker
在编排者-工作者模式中,一个编排者负责分解任务,并将每个子任务委派给工作者。正如 Anthropic 博客文章中所述:
在编排者-工作者工作流中,一个中心大语言模型动态地分解任务,将其委派给工作者大语言模型,并整合它们的结果。
何时使用此工作流: 此工作流非常适合那些无法预测所需子任务的复杂任务(例如,在编程中,需要修改的文件数量以及每个文件中的修改性质很可能取决于具体任务)。虽然它在拓扑结构上与并行化相似,但关键区别在于其灵活性——子任务不是预先定义的,而是由编排者根据具体输入动态决定的。

from typing import Annotated, List
import operator
# 用于planning的结构化输出模式
class Section(BaseModel):
name: str = Field(
description="Name for this section of the report.",
)
description: str = Field(
description="Brief overview of the main topics and concepts to be covered in this section.",
)
class Sections(BaseModel):
sections: List[Section] = Field(
description="Sections of the report.",
)
# 使用该模式增强大语言模型,以获得结构化输出
planner = llm.with_structured_output(Sections)
Creating Workers in LangGraph
由于Orchestrator-Worker工作流较为常见,LangGraph提供了Send API来支持此功能。它允许您动态创建工作节点,并向每个节点发送特定输入。每个工作者节点都拥有独立状态,所有工作者输出均写入可被orchestrator图访问的共享状态键。这使orchestrator能够获取所有工作者输出,并将其合成最终输出。如下所示,我们遍历分段列表并将每个分段发送至工作者节点。
from langgraph.types import Send
# 图状态
class State(TypedDict):
topic: str # 报告主题
sections: list[Section] # 报告的各个部分列表
completed_sections: Annotated[
list, operator.add
] # 所有工作者会并行地将结果写入此键
final_report: str # 最终报告
# 工作者状态
class WorkerState(TypedDict):
section: Section # 单个部分的信息
completed_sections: Annotated[list, operator.add]
# 节点
def orchestrator(state: State):
"""编排器:为报告生成一个计划"""
# 生成查询
report_sections = planner.invoke(
[
SystemMessage(content="为报告生成一个计划。"),
HumanMessage(content=f"这是报告的主题:{state['topic']}"),
]
)
return {"sections": report_sections.sections}
def llm_call(state: WorkerState):
"""工作者:撰写报告的一个部分"""
# 生成部分内容
section = llm.invoke(
[
SystemMessage(
content="根据提供的名称和描述撰写报告部分。每个部分不要写前言。使用 markdown 格式。"
),
HumanMessage(
content=f"这是部分的名称:{state['section'].name} 和描述:{state['section'].description}"
),
]
)
# 将更新后的部分写入已完成的部分列表
return {"completed_sections": [section.content]}
def synthesizer(state: State):
"""合成器:从各个部分合成完整的报告"""
# 已完成部分的列表
completed_sections = state["completed_sections"]
# 将已完成的部分格式化为字符串,作为最终报告的上下文
completed_report_sections = "\n\n---\n\n".join(completed_sections)
return {"final_report": completed_report_sections}
# 条件边函数,用于创建多个 llm_call 工作者,每个工作者撰写报告的一个部分
def assign_workers(state: State):
"""为计划中的每个部分分配一个工作者"""
# 通过 Send() API 并行启动各个部分的撰写工作
return [Send("llm_call", {"section": s}) for s in state["sections"]]
# 构建工作流
orchestrator_worker_builder = StateGraph(State)
# 添加节点
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)
# 添加边来连接节点
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
"orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)
# 编译工作流
orchestrator_worker = orchestrator_worker_builder.compile()
# 展示工作流图
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))
# 调用工作流
state = orchestrator_worker.invoke({"topic": "创建一份关于 LLM 缩放法则的报告"})
from IPython.display import Markdown
Markdown(state["final_report"])
Evaluator-optimizer
在评估器-优化器工作流中,一个 LLM 调用负责生成响应,而另一个则在一个循环中提供评估和反馈:
何时使用此工作流:当我们有明确的评估标准,并且迭代优化能带来可衡量的价值时,这种工作流尤为有效。判断其是否适用的两个标志是:第一,当人类清晰地表达反馈后,LLM 的响应能被证明得到了改善;第二,LLM 本身也能够提供此类反馈。这好比人类作家在撰写一份精炼文档时所经历的反复修改和打磨的过程。

# 图状态
class State(TypedDict):
joke: str # 存储生成的笑话
topic: str # 存储笑话的主题
feedback: str # 存储评估器提供的反馈
funny_or_not: str # 存储评估结果 ("funny" 或 "not funny")
# 用于评估的结构化输出模式
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="判断这个笑话是否好笑。",
)
feedback: str = Field(
description="如果笑话不好笑,请提供如何改进它的反馈。",
)
# 使用结构化输出模式增强 LLM
# 这会配置 LLM,使其在调用时返回一个符合 Feedback 模式的 Pydantic 对象,而不是纯文本。
evaluator = llm.with_structured_output(Feedback)
# 节点
def llm_call_generator(state: State):
"""LLM 生成一个笑话"""
# 检查状态中是否存在反馈
if state.get("feedback"):
msg = llm.invoke(
f"写一个关于 {state['topic']} 的笑话,但请参考以下反馈进行修改:{state['feedback']}"
)
else:
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
return {"joke": msg.content}
def llm_call_evaluator(state: State):
"""LLM 评估这个笑话"""
# 使用配置好的 evaluator LLM 来评估笑话,它会返回一个结构化的 Feedback 对象
grade = evaluator.invoke(f"请评价这个笑话:{state['joke']}")
# 返回更新后的状态(包含评估结果和反馈)
return {"funny_or_not": grade.grade, "feedback": grade.feedback}
# 条件边函数
# 该函数根据评估器的反馈,决定下一步的流向:是回到笑话生成器,还是结束流程。
def route_joke(state: State):
"""根据评估器的反馈,决定路由回笑话生成器或结束流程"""
if state["funny_or_not"] == "funny":
return "Accepted" # 如果好笑,则接受并结束
elif state["funny_or_not"] == "not funny":
return "Rejected + Feedback" # 如果不好笑,则拒绝并提供反馈,然后返回生成器
# 构建工作流
# 创建一个状态图,并传入我们之前定义的 State 结构。
optimizer_builder = StateGraph(State)
# 添加节点
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)
# 添加边来连接节点
optimizer_builder.add_edge(START, "llm_call_generator") # 流程从 START 开始,进入生成器节点
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator") # 生成器完成后,进入评估器节点
optimizer_builder.add_conditional_edges(
"llm_call_evaluator", # 从评估器节点出发
route_joke, # 使用 route_joke 函数来决定下一步
{ # route_joke 函数返回的名称 : 要访问的下一个节点的名称
"Accepted": END, # 如果返回 "Accepted",则流程结束
"Rejected + Feedback": "llm_call_generator", # 如果返回 "Rejected + Feedback",则回到生成器节点
},
)
# 编译工作流
# 将定义好的图结构编译成一个可执行的工作流对象。
optimizer_workflow = optimizer_builder.compile()
# 显示工作流
# 将工作流渲染为 Mermaid 格式的 PNG 图片并显示出来,便于可视化整个流程。
display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))
# 调用工作流
# 使用初始状态(主题为 "Cats")启动工作流。
# 工作流会循环执行,直到评估器认为笑话好笑为止。
state = optimizer_workflow.invoke({"topic": "Cats"})
# 打印最终生成的笑话
print(state["joke"])
Agent
智能体(Agents)通常的实现方式是:让大语言模型(LLM)在一个循环中,根据环境反馈执行操作(通过工具调用)。正如Anthropic博客《构建高效智能体》中所指出的:
智能体能够处理复杂任务,但其实现往往很简单。它们通常只是大语言模型在循环中根据环境反馈使用工具。因此,清晰而周密地设计工具集及其文档至关重要。
使用智能体的场景:智能体适用于开放式问题,这类问题难以或无法预测所需步骤数量,且无法对固定路径进行硬编码。大语言模型可能会运行多个轮次,此时必须对其决策能力保持一定信任。智能体的自主性使其成为受信任环境中扩展任务的理想选择。

from langchain_core.tools import tool
# Define tools
@tool
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
@tool
def divide(a: int, b: int) -> float:
"""Divide a and b.
Args:
a: first int
b: second int
"""
return a / b
# Augment the LLM with tools
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
from langgraph.graph import MessagesState
from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage
# Nodes
def llm_call(state: MessagesState):
"""LLM decides whether to call a tool or not"""
return {
"messages": [
llm_with_tools.invoke(
[
SystemMessage(
content="You are a helpful assistant tasked with performing arithmetic on a set of inputs."
)
]
+ state["messages"]
)
]
}
def tool_node(state: dict):
"""Performs the tool call"""
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
# Conditional edge function to route to the tool node or end based upon whether the LLM made a tool call
def should_continue(state: MessagesState) -> Literal["Action", END]:
"""Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""
messages = state["messages"]
last_message = messages[-1]
# If the LLM makes a tool call, then perform an action
if last_message.tool_calls:
return "Action"
# Otherwise, we stop (reply to the user)
return END
# Build workflow
agent_builder = StateGraph(MessagesState)
# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("environment", tool_node)
# Add edges to connect nodes
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
"llm_call",
should_continue,
{
# Name returned by should_continue : Name of next node to visit
"Action": "environment",
END: END,
},
)
agent_builder.add_edge("environment", "llm_call")
# Compile the agent
agent = agent_builder.compile()
# Show the agent
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))
# Invoke
messages = [HumanMessage(content="Add 3 and 4.")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()
更多推荐


所有评论(0)