LangGraph 核心模块及代码详解

LangGraph是LangChain生态中专门用于构建有状态、多步骤、可循环的Agent系统的图计算框架。它的核心思想是将Agent的工作流抽象为有向图,通过节点(Node)和边(Edge)来定义执行逻辑。

一、核心架构概览

LangGraph采用三层架构

层级 作用 核心组件
图结构层 定义任务流程 StateGraph, Node, Edge
执行引擎层 调度与状态管理 Pregel (运行时), Checkpointer
扩展接口层 生态集成 MCP协议, LangSmith

二、四大核心模块详解

1. State(状态)—— 数据的共享容器

State是贯穿整个工作流的共享数据结构,存储对话历史、工具执行结果、中间变量等信息。

定义方式

from typing import TypedDict, Annotated, List
import operator

# 基础定义
class AgentState(TypedDict):
    messages: Annotated[List, operator.add]  # 自动累加,不是覆盖
    user_query: str
    intermediate_steps: List[tuple]

# 带合并策略的状态
from langgraph.graph.message import add_messages

class ChatState(TypedDict):
    messages: Annotated[List, add_messages]  # 专用消息合并函数
    next_step: str

关键点

  • Annotated + operator.add:列表自动拼接
  • 节点只需返回状态子集(要更新的字段),框架自动合并

代码示例(完整工作流):

from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, END

# 1. 定义状态
class State(TypedDict):
    text: str
    classification: str
    entities: List[str]
    summary: str

2. Node(节点)—— 执行的基本单元

每个节点是一个处理函数,接收State,返回State的更新子集。

节点设计原则

  • 单一职责:一个节点只做一件事
  • 无状态:不保存内部状态,所有数据通过State传递
  • 可测试:纯函数风格

代码示例(三个功能节点):

from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import HumanMessage

llm = ChatOpenAI(model="gpt-4", temperature=0)

# 节点1:分类
def classification_node(state: State) -> dict:
    prompt = PromptTemplate(
        input_variables=["text"],
        template="将以下文本分类为:新闻、博客、研究、其他。\n文本:{text}\n类别:"
    )
    message = HumanMessage(content=prompt.format(text=state["text"]))
    classification = llm.invoke([message]).content.strip()
    return {"classification": classification}

# 节点2:实体提取
def entity_extraction_node(state: State) -> dict:
    prompt = PromptTemplate(
        input_variables=["text"],
        template="从以下文本中提取实体(人物、组织、地点),逗号分隔。\n文本:{text}\n实体:"
    )
    message = HumanMessage(content=prompt.format(text=state["text"]))
    entities = llm.invoke([message]).content.strip().split(", ")
    return {"entities": entities}

# 节点3:摘要生成
def summarize_node(state: State) -> dict:
    prompt = PromptTemplate.from_template("用一句话总结:\n{text}\n摘要:")
    chain = prompt | llm
    response = chain.invoke({"text": state["text"]})
    return {"summary": response.content}

高级特性

# 带重试策略的节点
from langgraph.graph import RetryPolicy

retry_policy = RetryPolicy(
    max_attempts=3,
    initial_interval=1,
    backoff_factor=2,
    retry_on=[Exception]
)

graph.add_node("llm_call", llm_node, retry=retry_policy)

# 带缓存的节点
from langgraph.graph import CachePolicy

graph.add_node("expensive_op", expensive_node, 
               cache_policy=CachePolicy(ttl=60))

3. Edge(边)—— 控制流的连接线

边定义了节点之间的执行顺序跳转逻辑

边的类型

类型 说明 适用场景
普通边 固定顺序执行 线性流程
条件边 根据状态动态选择 分支逻辑、工具调用判断
并行边 多个节点同时执行 Map-Reduce模式

代码示例

from langgraph.graph import StateGraph, START, END

# 创建图
graph = StateGraph(State)

# 添加节点
graph.add_node("classify", classification_node)
graph.add_node("extract", entity_extraction_node)
graph.add_node("summarize", summarize_node)

# 普通边:顺序执行
graph.add_edge(START, "classify")
graph.add_edge("classify", "extract")
graph.add_edge("extract", "summarize")
graph.add_edge("summarize", END)

# 条件边:动态路由
def route_after_classification(state: State) -> str:
    if state["classification"] == "研究":
        return "extract"  # 研究类文章需要实体提取
    else:
        return "summarize"  # 其他类型只做摘要

graph.add_conditional_edges(
    "classify",
    route_after_classification,
    {
        "extract": "extract",
        "summarize": "summarize"
    }
)

工具调用场景的条件边

def exists_tool_call(state: State) -> bool:
    """检测是否需要调用工具"""
    last_message = state["messages"][-1]
    return hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0

graph.add_conditional_edges(
    "llm",
    exists_tool_call,
    {True: "tool_executor", False: END}
)

4. Pregel(运行时引擎)—— 编译后的执行器

当调用.compile()后,StateGraph被编译成CompiledGraph(即Pregel实例),这是真正的执行引擎。

核心能力

# 编译图
app = graph.compile()

# 此时app的类型是CompiledGraph,继承自Pregel
print(type(app))  # <class 'langgraph.graph.state.CompiledGraph'>

Pregel提供的方法

方法 作用 返回类型
invoke(input) 同步执行,返回最终状态 State
ainvoke(input) 异步执行 State
stream(input) 流式输出中间状态 Iterator
get_state(config) 获取当前状态快照 StateSnapshot
update_state(config, values) 手动更新状态(人机交互) -
get_state_history(config) 获取历史状态(时间旅行) Iterator

代码示例

# 1. 基本调用
result = app.invoke({"text": "Anthropic发布了MCP协议..."})

print(f"分类: {result['classification']}")
print(f"实体: {result['entities']}")
print(f"摘要: {result['summary']}")

# 2. 流式执行(观察每一步)
for step in app.stream({"text": sample_text}):
    print(step)  # 每次迭代输出状态变化

# 3. 异步调用
result = await app.ainvoke({"text": sample_text})

三、完整代码示例:文本分析Agent

以下是一个可直接运行的完整LangGraph Agent:

import os
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv

load_dotenv()

# 1. 定义状态
class AnalysisState(TypedDict):
    text: str
    classification: str
    entities: List[str]
    summary: str

# 2. 初始化LLM
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0,
    api_key=os.getenv("OPENAI_API_KEY")
)

# 3. 定义节点函数
def classify(state: AnalysisState) -> dict:
    """分类节点"""
    prompt = PromptTemplate(
        input_variables=["text"],
        template="将文本分类为:技术、新闻、其他。\n文本:{text}\n类别:"
    )
    chain = prompt | llm
    result = chain.invoke({"text": state["text"]})
    return {"classification": result.content.strip()}

def extract_entities(state: AnalysisState) -> dict:
    """实体提取节点"""
    prompt = PromptTemplate(
        input_variables=["text"],
        template="提取文本中的关键实体(人名、组织、产品),逗号分隔。\n文本:{text}\n实体:"
    )
    chain = prompt | llm
    result = chain.invoke({"text": state["text"]})
    entities = [e.strip() for e in result.content.split(",")]
    return {"entities": entities}

def summarize(state: AnalysisState) -> dict:
    """摘要节点"""
    prompt = PromptTemplate(
        input_variables=["text"],
        template="用一句话总结:\n{text}\n摘要:"
    )
    chain = prompt | llm
    result = chain.invoke({"text": state["text"]})
    return {"summary": result.content}

# 4. 构建图
def build_agent():
    graph = StateGraph(AnalysisState)
    
    # 添加节点
    graph.add_node("classify", classify)
    graph.add_node("extract", extract_entities)
    graph.add_node("summarize", summarize)
    
    # 添加边
    graph.add_edge(START, "classify")
    graph.add_edge("classify", "extract")
    graph.add_edge("extract", "summarize")
    graph.add_edge("summarize", END)
    
    return graph.compile()

# 5. 执行
if __name__ == "__main__":
    agent = build_agent()
    
    result = agent.invoke({
        "text": "Anthropic的Claude 3模型在编程和推理任务上表现出色"
    })
    
    print(f"分类: {result['classification']}")
    print(f"实体: {result['entities']}")
    print(f"摘要: {result['summary']}")

输出示例

分类: 技术
实体: ['Anthropic', 'Claude 3']
摘要: Anthropic开发的Claude 3模型在编程和推理任务上具有出色表现。

四、高级特性速览

1. 状态持久化与断点续跑

from langgraph.checkpoint.sqlite import SqliteSaver

# 创建内存检查点(生产环境用Redis/PostgreSQL)
memory = SqliteSaver.from_conn_string("checkpoints.db")

# 编译时传入checkpointer
app = graph.compile(checkpointer=memory)

# 使用thread_id隔离不同会话
config = {"configurable": {"thread_id": "user_123"}}

# 第一次执行
result = app.invoke(input_data, config)

# 从断点恢复(自动加载上次状态)
result = app.invoke(new_input, config)

2. 人机交互(Human-in-the-Loop)

# 编译时设置中断点
app = graph.compile(
    checkpointer=memory,
    interrupt_before=["dangerous_action"]  # 执行前中断
)

# 执行到中断点
app.invoke(input_data, config)

# 获取当前状态
state = app.get_state(config)

# 手动更新后继续
app.update_state(config, {"approved": True})
app.invoke(None, config)  # 从中断点继续

3. 并行执行

from langgraph.graph import add_nodes

# 添加多个并行节点
graph.add_node("search_web", web_search)
graph.add_node("query_db", database_query)
graph.add_node("call_api", api_call)

# 并行执行后聚合
graph.add_edge("classify", ["search_web", "query_db", "call_api"])
graph.add_edge(["search_web", "query_db", "call_api"], "aggregate")

五、与传统方案的对比

维度 手写循环 LangChain Agent LangGraph
状态管理 手动维护列表 内置但有限 自动合并+持久化
循环控制 嵌套while 有限支持 原生支持
分支逻辑 if-else堆砌 简单条件 条件边+动态路由
断点恢复 不支持 不支持 原生支持
可视化调试 基础 生成拓扑图
代码量 200+行 50行 50行内

六、学习资源

总结

LangGraph的核心价值在于将复杂的Agent逻辑图形化

  • State 作为数据总线
  • Node 作为处理单元
  • Edge 作为控制流
  • Pregel 作为执行引擎

这种设计让开发者能够以"绘制流程图"的方式构建企业级Agent,同时获得状态持久化、断点续跑、人机交互等高级特性。


什么时候用state,什么时候用持久化数据库,比如postgresql?

State vs 持久化数据库:何时使用

这是LangGraph开发中最常见的架构决策问题。核心区别在于生命周期作用域

一、核心原则速查表

维度 State PostgreSQL/持久化DB
生命周期 单次会话执行期间 跨会话、永久保存
存储位置 内存(可序列化到checkpoint) 磁盘
访问速度 微秒级 毫秒级
数据量 适合小规模(<1MB) 无限制
并发访问 单线程 多线程/多进程
查询能力 无(只能整体读取) 强大的SQL查询
典型用途 中间结果、对话历史、临时状态 用户配置、知识库、审计日志

二、LangGraph State的定位

什么时候用State?

1. Agent执行过程中的临时数据

class AgentState(TypedDict):
    messages: Annotated[List, add_messages]  # 当前对话
    current_step: str                         # 当前执行到哪一步
    intermediate_results: List[dict]         # 中间计算结果
    tool_calls: List[dict]                   # 待执行的工具

2. 需要在节点间共享的上下文

# ✅ 正确:用State传递节点间数据
def research_node(state: AgentState):
    # 搜索结果存入State
    return {"search_results": perform_search(state["query"])}

def analyze_node(state: AgentState):
    # 从State读取搜索结果
    results = state["search_results"]  # 直接访问
    return {"analysis": analyze(results)}

3. 需要自动合并的数据(使用Annotated)

# 自动追加,不需要手动管理列表
from typing import Annotated
import operator

class State(TypedDict):
    # 每次返回新元素时自动append
    visited_urls: Annotated[List[str], operator.add]
    errors: Annotated[List[str], operator.add]

State的局限性

# ❌ 不要这样做:State不适合大文件
class BadState(TypedDict):
    large_file_content: str  # 10MB的日志文件 → 内存爆炸

# ❌ 不要这样做:State不适合跨Agent共享
class SharedState(TypedDict):
    user_profile: dict  # 多个Agent实例需要访问 → 数据不一致

三、PostgreSQL的定位

什么时候用PostgreSQL?

1. 用户配置和偏好

# ✅ 从DB读取配置,State只存会话临时数据
async def load_user_config(user_id: str) -> dict:
    async with db_pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT preferences, api_keys FROM user_config WHERE user_id = $1",
            user_id
        )
        return dict(row)

def agent_node(state: AgentState):
    # 配置存DB,State只存当前会话的cursor
    user_config = load_user_config(state["user_id"])
    return {"config_cursor": user_config["preferences"]["model"]}

2. 知识库和RAG数据

# 使用pgvector存储 embeddings
async def retrieve_context(query: str) -> List[dict]:
    query_embedding = await get_embedding(query)
    async with db_pool.acquire() as conn:
        results = await conn.fetch(
            """
            SELECT content, metadata 
            FROM documents 
            ORDER BY embedding <-> $1 
            LIMIT 5
            """,
            query_embedding
        )
        return [dict(r) for r in results]

def rag_node(state: AgentState):
    # 检索到的上下文存State(临时)
    context = retrieve_context(state["query"])
    return {"retrieved_context": context}  # State只存小批量

3. 审计日志和监控

async def log_agent_action(
    session_id: str,
    action: str,
    timestamp: datetime,
    metadata: dict
):
    async with db_pool.acquire() as conn:
        await conn.execute(
            """
            INSERT INTO agent_logs (session_id, action, timestamp, metadata)
            VALUES ($1, $2, $3, $4)
            """,
            session_id, action, timestamp, metadata
        )

def tool_calling_node(state: AgentState):
    result = call_tool(state["tool_name"])
    # 记录到DB,不占State空间
    asyncio.create_task(log_agent_action(
        state["session_id"],
        "tool_call",
        datetime.now(),
        {"tool": state["tool_name"], "result": result[:100]}
    ))
    return {"tool_result": result}  # State只存关键结果

4. 长历史记录的持久化

# 当单次会话对话超过100轮时,压缩旧消息到DB
class State(TypedDict):
    recent_messages: Annotated[List, add_messages]  # 只保留最近20条
    session_id: str

async def compact_history(state: State):
    if len(state["recent_messages"]) > 20:
        # 压缩前10条到DB
        old_messages = state["recent_messages"][:10]
        async with db_pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO session_history (session_id, messages, compressed_at)
                VALUES ($1, $2, NOW())
                """,
                state["session_id"],
                json.dumps(old_messages)
            )
        # State只保留最近10条
        return {"recent_messages": state["recent_messages"][-10:]}

四、混合架构实战

典型的三层存储架构

class HybridAgent:
    """
    L1: State(内存) - 当前会话活跃数据
    L2: Redis(可选) - 会话级缓存
    L3: PostgreSQL - 永久存储
    """
    
    def __init__(self, db_pool, redis_client):
        self.db_pool = db_pool
        self.redis = redis_client
        self.graph = self._build_graph()
    
    async def process(self, user_id: str, query: str):
        # 1. 从DB加载用户配置(L3)
        user_config = await self._load_user_config(user_id)
        
        # 2. 尝试从Redis恢复会话状态(L2)
        session_state = await self.redis.get(f"session:{user_id}")
        
        # 3. 初始化LangGraph State(L1)
        initial_state = {
            "user_id": user_id,
            "query": query,
            "user_config": user_config,  # 只存当前会话需要的配置
            "recent_messages": session_state.get("messages", []) if session_state else [],
            "temp_results": []  # 临时计算结果
        }
        
        # 4. 执行Agent
        final_state = await self.graph.ainvoke(initial_state)
        
        # 5. 保存关键状态到Redis(会话缓存,TTL 1小时)
        await self.redis.setex(
            f"session:{user_id}",
            3600,
            {"messages": final_state["recent_messages"][-10:]}
        )
        
        # 6. 异步保存审计日志到PostgreSQL(L3)
        asyncio.create_task(self._log_interaction(user_id, query, final_state))
        
        return final_state["answer"]
    
    async def _load_user_config(self, user_id: str) -> dict:
        """从PostgreSQL加载(L3)"""
        async with self.db_pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT config FROM users WHERE id = $1",
                user_id
            )
            return row["config"] if row else {}
    
    async def _log_interaction(self, user_id: str, query: str, state: dict):
        """异步写日志到PostgreSQL(L3)"""
        async with self.db_pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO agent_interactions 
                (user_id, query, answer, tokens_used, created_at)
                VALUES ($1, $2, $3, $4, NOW())
                """,
                user_id, query, state["answer"], state.get("total_tokens")
            )

PostgreSQL表结构示例

-- 用户配置表(L3永久存储)
CREATE TABLE user_config (
    user_id VARCHAR(255) PRIMARY KEY,
    config JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

-- 会话历史表(压缩后的旧消息)
CREATE TABLE session_history (
    id SERIAL PRIMARY KEY,
    session_id VARCHAR(255) NOT NULL,
    messages JSONB NOT NULL,  -- 压缩后的消息数组
    compressed_at TIMESTAMP DEFAULT NOW(),
    INDEX idx_session_id (session_id)
);

-- 审计日志表(只追加)
CREATE TABLE agent_interactions (
    id BIGSERIAL PRIMARY KEY,
    user_id VARCHAR(255),
    query TEXT,
    answer TEXT,
    tokens_used INT,
    latency_ms INT,
    created_at TIMESTAMP DEFAULT NOW(),
    INDEX idx_user_created (user_id, created_at)
);

-- 知识库表(RAG用pgvector)
CREATE EXTENSION vector;
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    content TEXT,
    embedding VECTOR(1536),  -- OpenAI embedding维度
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

五、决策树

是否需要数据在Agent执行结束后保留?
├─ 否 → 使用State
└─ 是 → 是否需要被多个Agent实例/用户共享?
    ├─ 否 → 使用State + Checkpointer(单用户断点续跑)
    └─ 是 → 是否需要复杂查询(WHERE, JOIN, 全文搜索)?
        ├─ 否 → Redis(KV缓存,高性能)
        └─ 是 → PostgreSQL(关系型需求)

六、性能对比测试

# 模拟不同存储策略的性能
import asyncio
import time

async def benchmark_storage():
    # State(内存)
    start = time.time()
    state = {"data": "x" * 1000}
    for _ in range(1000):
        state["data"] = state["data"][:100]  # 模拟更新
    print(f"State: {time.time() - start:.3f}s")
    
    # PostgreSQL(网络+磁盘)
    start = time.time()
    async with db_pool.acquire() as conn:
        for i in range(1000):
            await conn.execute(
                "UPDATE test SET data = $1 WHERE id = 1",
                "x" * 100
            )
    print(f"PostgreSQL: {time.time() - start:.3f}s")
    
# 典型结果:
# State: 0.002s
# PostgreSQL: 2.345s  (慢1000倍)

七、反模式警示

# ❌ 反模式1:在State中存储大数据
class BadState(TypedDict):
    uploaded_file_base64: str  # 10MB图片 → 内存爆炸

# ✅ 正确:存S3 URL
class GoodState(TypedDict):
    uploaded_file_url: str  # 只存引用

# ❌ 反模式2:用PostgreSQL做临时缓存
async def bad_node(state):
    # 每次工具调用都写DB
    await db.execute("INSERT INTO temp VALUES ($1)", state["temp_data"])
    
# ✅ 正确:临时数据用State,定期flush到DB
class GoodNode:
    def __init__(self):
        self.buffer = []  # 批量积累
    
    async def __call__(self, state):
        self.buffer.append(state["temp_data"])
        if len(self.buffer) > 100:
            await db.executemany("INSERT INTO temp VALUES ($1)", self.buffer)
            self.buffer = []

总结

  • State:Agent的"工作台",放正在处理的数据,追求极致性能
  • PostgreSQL:Agent的"仓库",放需要永久保存、跨会话共享、复杂查询的数据

实际项目中,90%的数据用State + Checkpointer就够了,只有用户配置、知识库、审计日志等才需要PostgreSQL。先用State快速迭代,遇到痛点再引入DB。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐