langgraph核心模块及代码详解
·
LangGraph 核心模块及代码详解
LangGraph是LangChain生态中专门用于构建有状态、多步骤、可循环的Agent系统的图计算框架。它的核心思想是将Agent的工作流抽象为有向图,通过节点(Node)和边(Edge)来定义执行逻辑。
一、核心架构概览
LangGraph采用三层架构:
| 层级 | 作用 | 核心组件 |
|---|---|---|
| 图结构层 | 定义任务流程 | StateGraph, Node, Edge |
| 执行引擎层 | 调度与状态管理 | Pregel (运行时), Checkpointer |
| 扩展接口层 | 生态集成 | MCP协议, LangSmith |
二、四大核心模块详解
1. State(状态)—— 数据的共享容器
State是贯穿整个工作流的共享数据结构,存储对话历史、工具执行结果、中间变量等信息。
定义方式:
from typing import TypedDict, Annotated, List
import operator
# 基础定义
class AgentState(TypedDict):
messages: Annotated[List, operator.add] # 自动累加,不是覆盖
user_query: str
intermediate_steps: List[tuple]
# 带合并策略的状态
from langgraph.graph.message import add_messages
class ChatState(TypedDict):
messages: Annotated[List, add_messages] # 专用消息合并函数
next_step: str
关键点:
Annotated+operator.add:列表自动拼接- 节点只需返回状态子集(要更新的字段),框架自动合并
代码示例(完整工作流):
from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, END
# 1. 定义状态
class State(TypedDict):
text: str
classification: str
entities: List[str]
summary: str
2. Node(节点)—— 执行的基本单元
每个节点是一个处理函数,接收State,返回State的更新子集。
节点设计原则:
- 单一职责:一个节点只做一件事
- 无状态:不保存内部状态,所有数据通过State传递
- 可测试:纯函数风格
代码示例(三个功能节点):
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import HumanMessage
llm = ChatOpenAI(model="gpt-4", temperature=0)
# 节点1:分类
def classification_node(state: State) -> dict:
prompt = PromptTemplate(
input_variables=["text"],
template="将以下文本分类为:新闻、博客、研究、其他。\n文本:{text}\n类别:"
)
message = HumanMessage(content=prompt.format(text=state["text"]))
classification = llm.invoke([message]).content.strip()
return {"classification": classification}
# 节点2:实体提取
def entity_extraction_node(state: State) -> dict:
prompt = PromptTemplate(
input_variables=["text"],
template="从以下文本中提取实体(人物、组织、地点),逗号分隔。\n文本:{text}\n实体:"
)
message = HumanMessage(content=prompt.format(text=state["text"]))
entities = llm.invoke([message]).content.strip().split(", ")
return {"entities": entities}
# 节点3:摘要生成
def summarize_node(state: State) -> dict:
prompt = PromptTemplate.from_template("用一句话总结:\n{text}\n摘要:")
chain = prompt | llm
response = chain.invoke({"text": state["text"]})
return {"summary": response.content}
高级特性:
# 带重试策略的节点
from langgraph.graph import RetryPolicy
retry_policy = RetryPolicy(
max_attempts=3,
initial_interval=1,
backoff_factor=2,
retry_on=[Exception]
)
graph.add_node("llm_call", llm_node, retry=retry_policy)
# 带缓存的节点
from langgraph.graph import CachePolicy
graph.add_node("expensive_op", expensive_node,
cache_policy=CachePolicy(ttl=60))
3. Edge(边)—— 控制流的连接线
边定义了节点之间的执行顺序和跳转逻辑。
边的类型:
| 类型 | 说明 | 适用场景 |
|---|---|---|
| 普通边 | 固定顺序执行 | 线性流程 |
| 条件边 | 根据状态动态选择 | 分支逻辑、工具调用判断 |
| 并行边 | 多个节点同时执行 | Map-Reduce模式 |
代码示例:
from langgraph.graph import StateGraph, START, END
# 创建图
graph = StateGraph(State)
# 添加节点
graph.add_node("classify", classification_node)
graph.add_node("extract", entity_extraction_node)
graph.add_node("summarize", summarize_node)
# 普通边:顺序执行
graph.add_edge(START, "classify")
graph.add_edge("classify", "extract")
graph.add_edge("extract", "summarize")
graph.add_edge("summarize", END)
# 条件边:动态路由
def route_after_classification(state: State) -> str:
if state["classification"] == "研究":
return "extract" # 研究类文章需要实体提取
else:
return "summarize" # 其他类型只做摘要
graph.add_conditional_edges(
"classify",
route_after_classification,
{
"extract": "extract",
"summarize": "summarize"
}
)
工具调用场景的条件边:
def exists_tool_call(state: State) -> bool:
"""检测是否需要调用工具"""
last_message = state["messages"][-1]
return hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0
graph.add_conditional_edges(
"llm",
exists_tool_call,
{True: "tool_executor", False: END}
)
4. Pregel(运行时引擎)—— 编译后的执行器
当调用.compile()后,StateGraph被编译成CompiledGraph(即Pregel实例),这是真正的执行引擎。
核心能力:
# 编译图
app = graph.compile()
# 此时app的类型是CompiledGraph,继承自Pregel
print(type(app)) # <class 'langgraph.graph.state.CompiledGraph'>
Pregel提供的方法:
| 方法 | 作用 | 返回类型 |
|---|---|---|
invoke(input) |
同步执行,返回最终状态 | State |
ainvoke(input) |
异步执行 | State |
stream(input) |
流式输出中间状态 | Iterator |
get_state(config) |
获取当前状态快照 | StateSnapshot |
update_state(config, values) |
手动更新状态(人机交互) | - |
get_state_history(config) |
获取历史状态(时间旅行) | Iterator |
代码示例:
# 1. 基本调用
result = app.invoke({"text": "Anthropic发布了MCP协议..."})
print(f"分类: {result['classification']}")
print(f"实体: {result['entities']}")
print(f"摘要: {result['summary']}")
# 2. 流式执行(观察每一步)
for step in app.stream({"text": sample_text}):
print(step) # 每次迭代输出状态变化
# 3. 异步调用
result = await app.ainvoke({"text": sample_text})
三、完整代码示例:文本分析Agent
以下是一个可直接运行的完整LangGraph Agent:
import os
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv
load_dotenv()
# 1. 定义状态
class AnalysisState(TypedDict):
text: str
classification: str
entities: List[str]
summary: str
# 2. 初始化LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0,
api_key=os.getenv("OPENAI_API_KEY")
)
# 3. 定义节点函数
def classify(state: AnalysisState) -> dict:
"""分类节点"""
prompt = PromptTemplate(
input_variables=["text"],
template="将文本分类为:技术、新闻、其他。\n文本:{text}\n类别:"
)
chain = prompt | llm
result = chain.invoke({"text": state["text"]})
return {"classification": result.content.strip()}
def extract_entities(state: AnalysisState) -> dict:
"""实体提取节点"""
prompt = PromptTemplate(
input_variables=["text"],
template="提取文本中的关键实体(人名、组织、产品),逗号分隔。\n文本:{text}\n实体:"
)
chain = prompt | llm
result = chain.invoke({"text": state["text"]})
entities = [e.strip() for e in result.content.split(",")]
return {"entities": entities}
def summarize(state: AnalysisState) -> dict:
"""摘要节点"""
prompt = PromptTemplate(
input_variables=["text"],
template="用一句话总结:\n{text}\n摘要:"
)
chain = prompt | llm
result = chain.invoke({"text": state["text"]})
return {"summary": result.content}
# 4. 构建图
def build_agent():
graph = StateGraph(AnalysisState)
# 添加节点
graph.add_node("classify", classify)
graph.add_node("extract", extract_entities)
graph.add_node("summarize", summarize)
# 添加边
graph.add_edge(START, "classify")
graph.add_edge("classify", "extract")
graph.add_edge("extract", "summarize")
graph.add_edge("summarize", END)
return graph.compile()
# 5. 执行
if __name__ == "__main__":
agent = build_agent()
result = agent.invoke({
"text": "Anthropic的Claude 3模型在编程和推理任务上表现出色"
})
print(f"分类: {result['classification']}")
print(f"实体: {result['entities']}")
print(f"摘要: {result['summary']}")
输出示例:
分类: 技术
实体: ['Anthropic', 'Claude 3']
摘要: Anthropic开发的Claude 3模型在编程和推理任务上具有出色表现。
四、高级特性速览
1. 状态持久化与断点续跑
from langgraph.checkpoint.sqlite import SqliteSaver
# 创建内存检查点(生产环境用Redis/PostgreSQL)
memory = SqliteSaver.from_conn_string("checkpoints.db")
# 编译时传入checkpointer
app = graph.compile(checkpointer=memory)
# 使用thread_id隔离不同会话
config = {"configurable": {"thread_id": "user_123"}}
# 第一次执行
result = app.invoke(input_data, config)
# 从断点恢复(自动加载上次状态)
result = app.invoke(new_input, config)
2. 人机交互(Human-in-the-Loop)
# 编译时设置中断点
app = graph.compile(
checkpointer=memory,
interrupt_before=["dangerous_action"] # 执行前中断
)
# 执行到中断点
app.invoke(input_data, config)
# 获取当前状态
state = app.get_state(config)
# 手动更新后继续
app.update_state(config, {"approved": True})
app.invoke(None, config) # 从中断点继续
3. 并行执行
from langgraph.graph import add_nodes
# 添加多个并行节点
graph.add_node("search_web", web_search)
graph.add_node("query_db", database_query)
graph.add_node("call_api", api_call)
# 并行执行后聚合
graph.add_edge("classify", ["search_web", "query_db", "call_api"])
graph.add_edge(["search_web", "query_db", "call_api"], "aggregate")
五、与传统方案的对比
| 维度 | 手写循环 | LangChain Agent | LangGraph |
|---|---|---|---|
| 状态管理 | 手动维护列表 | 内置但有限 | 自动合并+持久化 |
| 循环控制 | 嵌套while | 有限支持 | 原生支持 |
| 分支逻辑 | if-else堆砌 | 简单条件 | 条件边+动态路由 |
| 断点恢复 | 不支持 | 不支持 | 原生支持 |
| 可视化调试 | 无 | 基础 | 生成拓扑图 |
| 代码量 | 200+行 | 50行 | 50行内 |
六、学习资源
- 官方文档:LangGraph.js Reference
- 快速创建项目:
npx create-agent-chat-app@latest - LangGraph Academy:免费结构化课程
- 社区:LangChain Forum
总结
LangGraph的核心价值在于将复杂的Agent逻辑图形化:
- State 作为数据总线
- Node 作为处理单元
- Edge 作为控制流
- Pregel 作为执行引擎
这种设计让开发者能够以"绘制流程图"的方式构建企业级Agent,同时获得状态持久化、断点续跑、人机交互等高级特性。
什么时候用state,什么时候用持久化数据库,比如postgresql?
State vs 持久化数据库:何时使用
这是LangGraph开发中最常见的架构决策问题。核心区别在于生命周期和作用域。
一、核心原则速查表
| 维度 | State | PostgreSQL/持久化DB |
|---|---|---|
| 生命周期 | 单次会话执行期间 | 跨会话、永久保存 |
| 存储位置 | 内存(可序列化到checkpoint) | 磁盘 |
| 访问速度 | 微秒级 | 毫秒级 |
| 数据量 | 适合小规模(<1MB) | 无限制 |
| 并发访问 | 单线程 | 多线程/多进程 |
| 查询能力 | 无(只能整体读取) | 强大的SQL查询 |
| 典型用途 | 中间结果、对话历史、临时状态 | 用户配置、知识库、审计日志 |
二、LangGraph State的定位
什么时候用State?
1. Agent执行过程中的临时数据
class AgentState(TypedDict):
messages: Annotated[List, add_messages] # 当前对话
current_step: str # 当前执行到哪一步
intermediate_results: List[dict] # 中间计算结果
tool_calls: List[dict] # 待执行的工具
2. 需要在节点间共享的上下文
# ✅ 正确:用State传递节点间数据
def research_node(state: AgentState):
# 搜索结果存入State
return {"search_results": perform_search(state["query"])}
def analyze_node(state: AgentState):
# 从State读取搜索结果
results = state["search_results"] # 直接访问
return {"analysis": analyze(results)}
3. 需要自动合并的数据(使用Annotated)
# 自动追加,不需要手动管理列表
from typing import Annotated
import operator
class State(TypedDict):
# 每次返回新元素时自动append
visited_urls: Annotated[List[str], operator.add]
errors: Annotated[List[str], operator.add]
State的局限性
# ❌ 不要这样做:State不适合大文件
class BadState(TypedDict):
large_file_content: str # 10MB的日志文件 → 内存爆炸
# ❌ 不要这样做:State不适合跨Agent共享
class SharedState(TypedDict):
user_profile: dict # 多个Agent实例需要访问 → 数据不一致
三、PostgreSQL的定位
什么时候用PostgreSQL?
1. 用户配置和偏好
# ✅ 从DB读取配置,State只存会话临时数据
async def load_user_config(user_id: str) -> dict:
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT preferences, api_keys FROM user_config WHERE user_id = $1",
user_id
)
return dict(row)
def agent_node(state: AgentState):
# 配置存DB,State只存当前会话的cursor
user_config = load_user_config(state["user_id"])
return {"config_cursor": user_config["preferences"]["model"]}
2. 知识库和RAG数据
# 使用pgvector存储 embeddings
async def retrieve_context(query: str) -> List[dict]:
query_embedding = await get_embedding(query)
async with db_pool.acquire() as conn:
results = await conn.fetch(
"""
SELECT content, metadata
FROM documents
ORDER BY embedding <-> $1
LIMIT 5
""",
query_embedding
)
return [dict(r) for r in results]
def rag_node(state: AgentState):
# 检索到的上下文存State(临时)
context = retrieve_context(state["query"])
return {"retrieved_context": context} # State只存小批量
3. 审计日志和监控
async def log_agent_action(
session_id: str,
action: str,
timestamp: datetime,
metadata: dict
):
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO agent_logs (session_id, action, timestamp, metadata)
VALUES ($1, $2, $3, $4)
""",
session_id, action, timestamp, metadata
)
def tool_calling_node(state: AgentState):
result = call_tool(state["tool_name"])
# 记录到DB,不占State空间
asyncio.create_task(log_agent_action(
state["session_id"],
"tool_call",
datetime.now(),
{"tool": state["tool_name"], "result": result[:100]}
))
return {"tool_result": result} # State只存关键结果
4. 长历史记录的持久化
# 当单次会话对话超过100轮时,压缩旧消息到DB
class State(TypedDict):
recent_messages: Annotated[List, add_messages] # 只保留最近20条
session_id: str
async def compact_history(state: State):
if len(state["recent_messages"]) > 20:
# 压缩前10条到DB
old_messages = state["recent_messages"][:10]
async with db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO session_history (session_id, messages, compressed_at)
VALUES ($1, $2, NOW())
""",
state["session_id"],
json.dumps(old_messages)
)
# State只保留最近10条
return {"recent_messages": state["recent_messages"][-10:]}
四、混合架构实战
典型的三层存储架构
class HybridAgent:
"""
L1: State(内存) - 当前会话活跃数据
L2: Redis(可选) - 会话级缓存
L3: PostgreSQL - 永久存储
"""
def __init__(self, db_pool, redis_client):
self.db_pool = db_pool
self.redis = redis_client
self.graph = self._build_graph()
async def process(self, user_id: str, query: str):
# 1. 从DB加载用户配置(L3)
user_config = await self._load_user_config(user_id)
# 2. 尝试从Redis恢复会话状态(L2)
session_state = await self.redis.get(f"session:{user_id}")
# 3. 初始化LangGraph State(L1)
initial_state = {
"user_id": user_id,
"query": query,
"user_config": user_config, # 只存当前会话需要的配置
"recent_messages": session_state.get("messages", []) if session_state else [],
"temp_results": [] # 临时计算结果
}
# 4. 执行Agent
final_state = await self.graph.ainvoke(initial_state)
# 5. 保存关键状态到Redis(会话缓存,TTL 1小时)
await self.redis.setex(
f"session:{user_id}",
3600,
{"messages": final_state["recent_messages"][-10:]}
)
# 6. 异步保存审计日志到PostgreSQL(L3)
asyncio.create_task(self._log_interaction(user_id, query, final_state))
return final_state["answer"]
async def _load_user_config(self, user_id: str) -> dict:
"""从PostgreSQL加载(L3)"""
async with self.db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT config FROM users WHERE id = $1",
user_id
)
return row["config"] if row else {}
async def _log_interaction(self, user_id: str, query: str, state: dict):
"""异步写日志到PostgreSQL(L3)"""
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO agent_interactions
(user_id, query, answer, tokens_used, created_at)
VALUES ($1, $2, $3, $4, NOW())
""",
user_id, query, state["answer"], state.get("total_tokens")
)
PostgreSQL表结构示例
-- 用户配置表(L3永久存储)
CREATE TABLE user_config (
user_id VARCHAR(255) PRIMARY KEY,
config JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- 会话历史表(压缩后的旧消息)
CREATE TABLE session_history (
id SERIAL PRIMARY KEY,
session_id VARCHAR(255) NOT NULL,
messages JSONB NOT NULL, -- 压缩后的消息数组
compressed_at TIMESTAMP DEFAULT NOW(),
INDEX idx_session_id (session_id)
);
-- 审计日志表(只追加)
CREATE TABLE agent_interactions (
id BIGSERIAL PRIMARY KEY,
user_id VARCHAR(255),
query TEXT,
answer TEXT,
tokens_used INT,
latency_ms INT,
created_at TIMESTAMP DEFAULT NOW(),
INDEX idx_user_created (user_id, created_at)
);
-- 知识库表(RAG用pgvector)
CREATE EXTENSION vector;
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding VECTOR(1536), -- OpenAI embedding维度
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
五、决策树
是否需要数据在Agent执行结束后保留?
├─ 否 → 使用State
└─ 是 → 是否需要被多个Agent实例/用户共享?
├─ 否 → 使用State + Checkpointer(单用户断点续跑)
└─ 是 → 是否需要复杂查询(WHERE, JOIN, 全文搜索)?
├─ 否 → Redis(KV缓存,高性能)
└─ 是 → PostgreSQL(关系型需求)
六、性能对比测试
# 模拟不同存储策略的性能
import asyncio
import time
async def benchmark_storage():
# State(内存)
start = time.time()
state = {"data": "x" * 1000}
for _ in range(1000):
state["data"] = state["data"][:100] # 模拟更新
print(f"State: {time.time() - start:.3f}s")
# PostgreSQL(网络+磁盘)
start = time.time()
async with db_pool.acquire() as conn:
for i in range(1000):
await conn.execute(
"UPDATE test SET data = $1 WHERE id = 1",
"x" * 100
)
print(f"PostgreSQL: {time.time() - start:.3f}s")
# 典型结果:
# State: 0.002s
# PostgreSQL: 2.345s (慢1000倍)
七、反模式警示
# ❌ 反模式1:在State中存储大数据
class BadState(TypedDict):
uploaded_file_base64: str # 10MB图片 → 内存爆炸
# ✅ 正确:存S3 URL
class GoodState(TypedDict):
uploaded_file_url: str # 只存引用
# ❌ 反模式2:用PostgreSQL做临时缓存
async def bad_node(state):
# 每次工具调用都写DB
await db.execute("INSERT INTO temp VALUES ($1)", state["temp_data"])
# ✅ 正确:临时数据用State,定期flush到DB
class GoodNode:
def __init__(self):
self.buffer = [] # 批量积累
async def __call__(self, state):
self.buffer.append(state["temp_data"])
if len(self.buffer) > 100:
await db.executemany("INSERT INTO temp VALUES ($1)", self.buffer)
self.buffer = []
总结
- State:Agent的"工作台",放正在处理的数据,追求极致性能
- PostgreSQL:Agent的"仓库",放需要永久保存、跨会话共享、复杂查询的数据
实际项目中,90%的数据用State + Checkpointer就够了,只有用户配置、知识库、审计日志等才需要PostgreSQL。先用State快速迭代,遇到痛点再引入DB。
更多推荐

所有评论(0)