LangChain v1.0 完整学习教程

版本: LangChain 1.0+ | 语言: Python 3.10+ | 更新日期: 2026-06-05


目录

第一部分:基础入门

  1. v1.0 概述与环境搭建
  2. 模型初始化与统一接口
  3. Content Blocks:跨厂商统一内容视图
  4. 提示模板与消息系统
  5. 结构化输出

第二部分:核心框架

  1. LCEL:LangChain 表达式语言
  2. Runnable 协议详解
  3. 工具定义与使用
  4. create_agent:统一 Agent 入口
  5. 中间件系统详解

第三部分:数据与检索

  1. 文档加载器
  2. 文本分割器
  3. 嵌入与向量存储
  4. RAG 检索增强生成
  5. 高级 RAG 策略

第四部分:进阶主题

  1. LangGraph 状态图基础
  2. 多 Agent 协作与 Supervisor 模式
  3. 记忆与持久化
  4. 流式输出详解
  5. Human-in-the-Loop
  6. 回调系统与监控
  7. 生产环境最佳实践
  8. v0.x → v1.0 迁移指南

1. v1.0 概述与环境搭建

1.1 LangChain v1.0 核心变革

LangChain v1.0 于 2025年10月正式发布,是一次彻底的架构升级。核心变化:

维度 v0.x (旧版) v1.0 (新版)
Agent 创建 AgentExecutor, initialize_agent, create_react_agent 多种方式 create_agent() 统一入口
链式调用 LLMChain, ConversationChain 等硬编码链 LCEL `
消息内容 各厂商格式不统一,需写适配代码 content_blocks 跨厂商统一视图
记忆系统 RunnableWithMessageHistory LangGraph Checkpointer
工具定义 需继承 BaseTool 类 纯 Python 函数 + 类型注解
模型初始化 各厂商各自导入 init_chat_model() 统一入口
扩展机制 Callbacks Middleware 中间件系统 (洋葱模型)
运行时 自建运行时 底层基于 LangGraph

1.2 包架构:模块化拆分

v1.0 将原来的 langchain 大包拆分为多个独立包:

包名 定位 安装场景
langchain-core 核心抽象层 + LCEL + Runnable 协议 所有项目必须依赖
langchain 主包,create_agent() 入口 日常开发首选
langchain-openai OpenAI 官方集成 使用 OpenAI 模型
langchain-anthropic Anthropic Claude 官方集成 使用 Claude
langchain-google-genai Google Gemini 官方集成 使用 Gemini
langchain-deepseek DeepSeek 官方集成 使用 DeepSeek
langchain-community 社区第三方集成 原型验证阶段
langchain-classic 旧版兼容包 (v0.x Legacy) v0.x 项目渐进迁移
langchain-text-splitters 文本分割 文档处理场景
langgraph 状态图运行时 复杂工作流、多Agent
langgraph-supervisor Supervisor 多Agent模式 多Agent编排

1.3 安装指南

# === 新项目推荐安装(最小化) ===
pip install langchain langchain-openai

# 或使用 uv(更快)
uv add langchain langchain-openai

# === 完整安装(常用组合) ===
pip install langchain
pip install langchain-openai         # OpenAI
pip install langchain-anthropic      # Anthropic Claude
pip install langchain-google-genai   # Google Gemini
pip install langchain-deepseek       # DeepSeek
pip install langchain-community      # 社区集成
pip install langchain-text-splitters # 文本分割
pip install langgraph                # LangGraph 运行时

# === 向量数据库(按需选装) ===
pip install langchain-chroma chromadb    # Chroma
pip install faiss-cpu                    # FAISS (CPU版)
# pip install faiss-gpu                  # FAISS (GPU版)
pip install langchain-pinecone           # Pinecone

# === v0.x 项目迁移 ===
pip install langchain-classic   # 旧版 Chains/Agents 兼容层

# === 开发工具 ===
pip install langsmith            # 官方监控调试平台
pip install tiktoken             # Token 计数
pip install python-dotenv        # 环境变量管理

1.4 环境变量配置

# .env 文件
"""
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=...
DEEPSEEK_API_KEY=sk-...
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=ls_...
LANGCHAIN_PROJECT=my-project
"""

import os
from dotenv import load_dotenv

load_dotenv()  # 自动加载 .env 文件

# 方式一:环境变量
os.environ["OPENAI_API_KEY"] = "sk-..."

# 方式二:构造时传入
from langchain_openai import ChatOpenAI
model = ChatOpenAI(api_key="sk-...", model="gpt-4o")

1.5 第一个 v1.0 应用

# === 方式一:LCEL 链式调用 ===
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_template("用{language}解释什么是{topic}")
parser = StrOutputParser()

chain = prompt | model | parser
result = chain.invoke({"language": "中文", "topic": "LangChain"})
print(result)

# === 方式二:create_agent (推荐) ===
from langchain.agents import create_agent

agent = create_agent(
    model="openai:gpt-4o",
    system_prompt="你是一个有帮助的AI助手。",
)
result = agent.invoke({
    "messages": [{"role": "user", "content": "用中文解释什么是LangChain"}]
})
print(result["messages"][-1].content)

2. 模型初始化与统一接口

2.1 init_chat_model:统一模型入口

v1.0 推荐使用 init_chat_model() 实现厂商无关的模型初始化:

from langchain.chat_models import init_chat_model

# 方式一:冒号分隔的合并写法(最简洁)
model = init_chat_model("openai:gpt-4o")
model = init_chat_model("anthropic:claude-sonnet-4-6")
model = init_chat_model("google:gemini-2.0-flash")
model = init_chat_model("deepseek:deepseek-chat")

# 方式二:显式声明 provider
model = init_chat_model("gpt-4o", model_provider="openai")
model = init_chat_model("claude-sonnet-4-6", model_provider="anthropic")

# 方式三:直接传入 ChatModel 实例(最灵活)
from langchain_openai import ChatOpenAI
model = init_chat_model(ChatOpenAI(model="gpt-4o", temperature=0.7))

2.2 ChatOpenAI 详解

from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="gpt-4o",              # 模型名称
    temperature=0.7,              # 随机性 (0-2)
    max_tokens=4096,              # 最大输出 token 数
    top_p=0.95,                   # 核采样
    frequency_penalty=0.0,        # 频率惩罚 (-2.0 ~ 2.0)
    presence_penalty=0.0,         # 存在惩罚 (-2.0 ~ 2.0)
    seed=42,                      # 随机种子(确定性输出)
    timeout=60,                   # 请求超时(秒)
    max_retries=3,                # 最大重试次数
    streaming=True,               # 启用流式输出
)

# 基础调用
response = model.invoke("你好,请介绍一下自己")
print(response.content)

# 使用消息列表
from langchain_core.messages import HumanMessage, SystemMessage

messages = [
    SystemMessage(content="你是一个资深的Python工程师。"),
    HumanMessage(content="如何优化Django API的性能?"),
]
response = model.invoke(messages)
print(response.content)

2.3 ChatAnthropic (Claude)

from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(
    model="claude-sonnet-4-6",
    temperature=0.7,
    max_tokens=4096,
    thinking={"type": "enabled", "budget_tokens": 2000},  # 启用思考
)

response = model.invoke("请详细推导一下费马大定理的证明思路。")

# 使用 content_blocks 访问思考过程
for block in response.content_blocks:
    if block["type"] == "reasoning":
        print(f"[思考过程]: {block['reasoning'][:200]}...")
    elif block["type"] == "text":
        print(f"[回答]: {block['text'][:200]}...")

2.4 bind() 绑定参数

# bind() 在运行时绑定额外参数
model = ChatOpenAI(model="gpt-4o")

# 绑定工具
model_with_tools = model.bind_tools(
    tools=[search_tool, calculator_tool],
    tool_choice="auto",  # auto | any | none | required
)

# 绑定停止词
model_with_stop = model.bind(stop=["\n\nHuman:", "\n\nSystem:"])

# 绑定响应格式
model_json = model.bind(response_format={"type": "json_object"})

# 链式绑定
configured_model = (
    model
    .bind(temperature=0.3)
    .bind(max_tokens=2000)
    .bind(stop=["\n\n"])
)

2.5 消息类型详解

from langchain_core.messages import (
    HumanMessage,       # 用户消息
    AIMessage,          # AI 回复
    SystemMessage,      # 系统提示
    ToolMessage,        # 工具调用结果
    AIMessageChunk,     # 流式输出的 AI 消息块
    ToolCallChunk,      # 流式输出的工具调用块
)

# HumanMessage 支持多模态内容
message = HumanMessage(content=[
    {"type": "text", "text": "请描述这张图片"},
    {"type": "image_url", "image_url": {"url": "https://example.com/photo.jpg"}},
])

# 构建对话
conversation = [
    SystemMessage(content="你是一个有帮助的助手。"),
    HumanMessage(content="我叫小明。"),
    AIMessage(content="你好小明!有什么可以帮你的?"),
    HumanMessage(content="我刚才说我的名字是什么?"),
]
response = model.invoke(conversation)

2.6 多模型切换与 Fallback

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

primary = ChatOpenAI(model="gpt-4o")
backup = ChatAnthropic(model="claude-sonnet-4-6")

# with_fallbacks:主模型失败时自动切换
robust_model = primary.with_fallbacks([backup])

# 带重试的模型
from langchain.agents.middleware import ModelFallbackMiddleware

agent = create_agent(
    model=primary,
    middleware=[
        ModelFallbackMiddleware(
            primary,
            ChatAnthropic(model="claude-sonnet-4-6"),
            ChatOpenAI(model="gpt-4o-mini"),  # 第三顺位
        ),
    ],
)

3. Content Blocks:跨厂商统一内容视图

3.1 为什么需要 Content Blocks

不同 LLM 厂商的响应结构各不相同。v1.0 通过 content_blocks 提供统一的内容视图:

from langchain.chat_models import init_chat_model

# 不同厂商的模型
openai_model = init_chat_model("openai:gpt-4o")
anthropic_model = init_chat_model("anthropic:claude-sonnet-4-6")

# 使用 content_blocks 统一处理,无需写厂商适配代码
def print_response(response):
    print(f"[模型: {response.response_metadata.get('model_name', 'unknown')}]")
    for block in response.content_blocks:
        block_type = block["type"]
        if block_type == "text":
            print(f"  文本: {block['text'][:100]}...")
        elif block_type == "reasoning":
            print(f"  思考: {block['reasoning'][:100]}...")
        elif block_type == "tool_use":
            print(f"  工具调用: {block['name']}({block['input']})")
        print("  ---")

# 统一处理逻辑,适用于所有厂商
print_response(openai_model.invoke("hello"))
print_response(anthropic_model.invoke("hello"))

3.2 Content Block 类型详解

# v1.0 支持的 Content Block 类型:
# - text:            标准文本输出(含 citations/annotations)
# - reasoning:       模型推理输出(thinking / chain-of-thought)
# - tool_use:        工具/函数调用请求
# - tool_use_summary: 工具调用结果摘要
# - image:           图像数据
# - audio:           音频数据
# - citation:        引用来源
# - web_search_call: Web搜索查询(厂商特定)
# - web_search_result: Web搜索结果(厂商特定)
# - server_tool_use: 服务端工具调用

3.3 处理含推理的响应

# Claude 启用 thinking 模式
model = init_chat_model(
    "anthropic:claude-sonnet-4-6",
    thinking={"type": "enabled", "budget_tokens": 5000},
)

response = model.invoke("解释量子纠缠的基本原理")

# content_blocks 将推理和回答分开
for block in response.content_blocks:
    if block["type"] == "reasoning":
        print("=" * 40)
        print("【思考过程】")
        print("=" * 40)
        print(block["reasoning"])
        print()
    elif block["type"] == "text":
        print("=" * 40)
        print("【最终回答】")
        print("=" * 40)
        print(block["text"])

# 传统 .content 是 content_blocks 中所有 text 块的拼接
print(response.content)  # 仅文本部分

3.4 处理工具调用响应

model = init_chat_model("openai:gpt-4o")
model_with_tools = model.bind_tools([search_tool, calculator_tool])

response = model_with_tools.invoke([
    HumanMessage(content="搜索2026年AI最新进展并计算 15*23")
])

for block in response.content_blocks:
    if block["type"] == "text":
        print(f"[文本] {block['text']}")
    elif block["type"] == "tool_use":
        print(f"[工具调用] {block['name']}")
        print(f"  参数: {block['input']}")
        print(f"  ID: {block['id']}")

3.5 构建多模态输入消息

# HumanMessage 直接支持 content_blocks 格式
message = HumanMessage(content=[
    {"type": "text", "text": "请分析这些数据:"},
    {"type": "image_url", "image_url": {"url": "https://example.com/chart.png"}},
])

# 或使用本地文件
import base64

with open("document.pdf", "rb") as f:
    pdf_base64 = base64.b64encode(f.read()).decode()

message = HumanMessage(content=[
    {"type": "text", "text": "请总结这份PDF文档"},
    {
        "type": "file",
        "file": {
            "filename": "document.pdf",
            "file_data": pdf_base64,
            "media_type": "application/pdf",
        },
    },
])

response = model.invoke([message])

4. 提示模板与消息系统

4.1 ChatPromptTemplate 基础

from langchain_core.prompts import ChatPromptTemplate

# from_template — 最简方式
prompt = ChatPromptTemplate.from_template(
    "你是一个{role}。请用{style}风格解释{topic}。"
)
messages = prompt.invoke({
    "role": "Python专家", "style": "通俗易懂", "topic": "装饰器"
})
print(messages)

# from_messages — 构建多消息模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个{role}。使用{language}回答。"),
    ("human", "{question}"),
    ("ai", "好的,我会用{language}回答。"),
    ("human", "请继续展开说明。"),
])

messages = prompt.invoke({
    "role": "数学老师", "language": "中文",
    "question": "什么是微积分?",
})

4.2 MessagesPlaceholder:动态插入消息

from langchain_core.prompts import (
    ChatPromptTemplate,
    MessagesPlaceholder,
)

# 动态插入对话历史(Agent 常用模式)
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个有帮助的助手。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{question}"),
])

from langchain_core.messages import HumanMessage, AIMessage

result = prompt.invoke({
    "history": [
        HumanMessage(content="我喜欢Python"),
        AIMessage(content="Python是一门很棒的语言!"),
        HumanMessage(content="有哪些好用的库?"),
        AIMessage(content="NumPy、Pandas、FastAPI等等。"),
    ],
    "question": "那NumPy主要用来做什么?",
})
print(result)

4.3 部分变量 (partial)

prompt = ChatPromptTemplate.from_template(
    "你是一个{role}。当前日期是{current_date}。请回答:{question}"
)

# 方式一:partial 预填充静态值
partial_prompt = prompt.partial(role="法律顾问")
result = partial_prompt.invoke({
    "current_date": "2026-06-05",
    "question": "什么是合同生效的要件?",
})

# 方式二:partial 使用函数动态生成值
from datetime import datetime

def get_today():
    return datetime.now().strftime("%Y年%m月%d日")

prompt_with_date = prompt.partial(current_date=get_today)
result = prompt_with_date.invoke({
    "role": "助手",
    "question": "今天星期几?",
})

4.4 Few-Shot 少样本提示

from langchain_core.prompts import (
    ChatPromptTemplate,
    FewShotChatMessagePromptTemplate,
)

# 定义示例
examples = [
    {"input": "这个产品质量太差了,用了一次就坏了", "output": "negative"},
    {"input": "物流很快,包装也很用心,非常满意", "output": "positive"},
    {"input": "一般般吧,没什么特别的", "output": "neutral"},
    {"input": "贵是贵了点,但是效果确实好", "output": "positive"},
]

# 示例模板
example_prompt = ChatPromptTemplate.from_messages([
    ("human", "{input}"),
    ("ai", "{output}"),
])

# 创建 Few-Shot 提示
few_shot_prompt = FewShotChatMessagePromptTemplate(
    example_prompt=example_prompt,
    examples=examples,
)

# 组合到完整提示
final_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个情感分析专家。将用户评论分类为:positive、negative、neutral。"),
    few_shot_prompt,
    ("human", "{input}"),
])

chain = final_prompt | model | StrOutputParser()
result = chain.invoke({"input": "包装很精美但发货太慢了"})
print(result)  # 预期: neutral

4.5 动态 Few-Shot (基于相似度选择示例)

from langchain_core.prompts import (
    ChatPromptTemplate,
    FewShotChatMessagePromptTemplate,
)
from langchain_core.example_selectors import SemanticSimilarityExampleSelector
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma

# 基于语义相似度自动选择最相关的示例
example_selector = SemanticSimilarityExampleSelector.from_examples(
    examples=examples,
    embeddings=OpenAIEmbeddings(),
    vectorstore_cls=Chroma,
    k=2,  # 每次选择最相关的2个示例
)

dynamic_few_shot = FewShotChatMessagePromptTemplate(
    example_prompt=example_prompt,
    example_selector=example_selector,
)

final_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个情感分析专家。"),
    dynamic_few_shot,
    ("human", "{input}"),
])

4.6 提示模板最佳实践

# 典型的 Agent 系统提示结构
SYSTEM_PROMPT = """你是一个{department}部门的{role}。

## 你的职责
{responsibilities}

## 工作规范
{rules}

## 当前时间
{current_date}

## 注意事项
- 只能基于提供的工具和数据回答
- 不确定时请明确说明
- 复杂问题请分步骤处理
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", SYSTEM_PROMPT),
    MessagesPlaceholder("messages"),
])

# 使用时填充变量
formatted_prompt = prompt.invoke({
    "department": "技术支持",
    "role": "高级工程师",
    "responsibilities": "- 解答用户技术问题\n- 提供代码示例\n- 排查系统故障",
    "rules": "- 优先使用官方文档\n- 给出可运行的代码\n- 标注信息来源",
    "current_date": "2026-06-05",
    "messages": [HumanMessage(content="我的服务启动报错怎么办?")],
})

5. 结构化输出

5.1 with_structured_output (推荐方式)

v1.0 中获取结构化输出最推荐的方式。内部会自动选择最优策略(Tool Calling 或 JSON Mode):

from pydantic import BaseModel, Field
from typing import List, Optional, Literal

# 定义输出结构
class ProductReview(BaseModel):
    product_name: str = Field(description="产品名称")
    rating: float = Field(ge=1, le=5, description="评分 1-5")
    sentiment: Literal["positive", "neutral", "negative"]
    summary: str = Field(description="评价总结,50字以内")
    pros: List[str] = Field(description="优点列表")
    cons: List[str] = Field(description="缺点列表")
    recommend: bool = Field(description="是否推荐购买")

# 一行绑定
model = ChatOpenAI(model="gpt-4o")
structured_model = model.with_structured_output(ProductReview)

# 调用,直接返回 Pydantic 对象
result = structured_model.invoke(
    "iPhone 15 Pro Max 拍照效果惊艳,电池续航也比上一代好一些,"
    "但价格确实太贵了,而且充电速度一般。"
)

print(f"产品: {result.product_name}")
print(f"评分: {result.rating}/5")
print(f"情感: {result.sentiment}")
print(f"推荐: {'是' if result.recommend else '否'}")
print(f"优点: {', '.join(result.pros)}")

5.2 TypedDict 方式

from typing import TypedDict, List, Optional

class ContactInfo(TypedDict):
    name: str
    company: str
    title: str
    email: Optional[str]
    phone: Optional[str]
    skills: List[str]

structured_model = model.with_structured_output(ContactInfo)

result = structured_model.invoke(
    "张三是阿里巴巴的高级Java工程师,擅长微服务和分布式系统,"
    "邮箱是zhangsan@example.com"
)
print(result)
# {'name': '张三', 'company': '阿里巴巴', 'title': '高级Java工程师',
#  'email': 'zhangsan@example.com', 'phone': None,
#  'skills': ['微服务', '分布式系统', 'Java']}

5.3 嵌套结构

from pydantic import BaseModel, Field
from typing import List

class Ingredient(BaseModel):
    name: str = Field(description="食材名称")
    amount: str = Field(description="用量")
    unit: str = Field(description="单位(克/毫升/个等)")

class Step(BaseModel):
    order: int = Field(description="步骤序号")
    instruction: str = Field(description="操作说明")
    duration_min: int = Field(description="预估时间(分钟)")

class NutritionalInfo(BaseModel):
    calories: int = Field(description="卡路里")
    protein_g: float = Field(description="蛋白质(克)")
    carbs_g: float = Field(description="碳水化合物(克)")
    fat_g: float = Field(description="脂肪(克)")

class Recipe(BaseModel):
    """完整的菜谱结构"""
    dish_name: str
    cuisine: str
    difficulty: Literal["简单", "中等", "困难"]
    prep_time_min: int
    cook_time_min: int
    servings: int
    ingredients: List[Ingredient]
    steps: List[Step]
    nutrition: NutritionalInfo
    tips: List[str] = Field(description="烹饪小贴士")

structured_model = model.with_structured_output(Recipe)
recipe = structured_model.invoke("给我一份正宗的红烧肉食谱,4人份")

print(f"菜名: {recipe.dish_name}")
print(f"难度: {recipe.difficulty}")
print(f"食材数: {len(recipe.ingredients)}")
print(f"步骤数: {len(recipe.steps)}")
print(f"热量: {recipe.nutrition.calories}千卡")

5.4 create_agent 中使用结构化输出

from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy
from pydantic import BaseModel, Field
from typing import List

class TravelPlan(BaseModel):
    destination: str = Field(description="推荐目的地")
    duration_days: int = Field(description="建议天数")
    budget_estimate: float = Field(description="预估总花费(元)")
    highlights: List[str] = Field(description="必去景点")
    tips: List[str] = Field(description="旅行建议")

def search_travel_info(query: str) -> str:
    """搜索旅行信息"""
    return f"关于'{query}'的旅行数据:..."

agent = create_agent(
    model="openai:gpt-4o",
    tools=[search_travel_info],
    system_prompt="你是一个旅行规划师。根据用户需求推荐最佳旅行目的地。",
    response_format=ToolStrategy(TravelPlan),
)

result = agent.invoke({
    "messages": [{
        "role": "user",
        "content": "我想去一个适合带孩子的海滨城市度假,预算5000左右,5天时间。"
    }]
})

plan = result["structured_response"]
print(f"推荐: {plan.destination}, {plan.duration_days}天, 约{plan.budget_estimate}元")

5.5 JSON Mode vs Tool Calling 策略选择

# with_structured_output 内部自动选择最优策略

# 方式一:Tool/Function Calling (多数模型默认)
structured_model = model.with_structured_output(
    ProductReview,
    method="function_calling",  # 显式指定
)

# 方式二:JSON Mode (GPT-4o/GPT-4-turbo 支持)
structured_model = model.with_structured_output(
    ProductReview,
    method="json_mode",  # 需要模型原生支持
)

# 方式三:自动选择 (推荐)
structured_model = model.with_structured_output(ProductReview)
# 内部逻辑:
# - 如果模型支持 native JSON Schema → ProviderStrategy
# - 如果模型支持 Tool Calling → ToolStrategy
# - 如果都不支持 → 回退到提示工程方式

5.6 含枚举的复杂结构

from enum import Enum
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import date

class BugSeverity(str, Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

class BugReport(BaseModel):
    title: str = Field(description="Bug 标题")
    severity: BugSeverity = Field(description="严重程度")
    description: str = Field(description="详细描述")
    steps_to_reproduce: List[str] = Field(description="复现步骤")
    expected_behavior: str = Field(description="预期行为")
    actual_behavior: str = Field(description="实际行为")
    affected_version: str = Field(description="影响版本")
    suggested_fix: Optional[str] = Field(description="建议修复方案")

structured_model = model.with_structured_output(BugReport)

bug = structured_model.invoke(
    "用户登录页面在输入超过100个字符的密码时会直接崩溃,"
    "返回500错误。任何带特殊字符的长密码都能触发。v2.3.1 版本。"
    "应该返回友好的错误提示,而不是崩溃。"
)
print(f"Bug: {bug.title}")
print(f"严重度: {bug.severity.value}")
print(f"复现步骤: {len(bug.steps_to_reproduce)}步")

6. LCEL:LangChain 表达式语言

6.1 管道操作符 |

LCEL 使用 | (管道) 操作符将组件串联起来,数据从左到右流动:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_template("将以下文本翻译成{target_lang}:\n{text}")
parser = StrOutputParser()

# LCEL 链:使用 | 串联
chain = prompt | model | parser

# 等价于
# chain = RunnableSequence(prompt, model, parser)

result = chain.invoke({
    "target_lang": "英文",
    "text": "今天天气非常好,适合出去郊游。",
})
print(result)

# 数据流:
# {"target_lang": "英文", "text": "..."}
#   → prompt → ChatPromptValue
#   → model → AIMessage
#   → parser → str

6.2 RunnablePassthrough

from langchain_core.runnables import RunnablePassthrough

# 基础用法:透传数据
passthrough = RunnablePassthrough()
print(passthrough.invoke("hello"))  # "hello"

# assign() — 动态添加字段到 dict
chain = RunnablePassthrough.assign(
    word_count=lambda x: len(x.split()),
    is_long=lambda x: len(x.split()) > 100,
)

result = chain.invoke("Hello world, this is a test sentence.")
print(result)
# {'word_count': 7, 'is_long': False}

# 在 RAG 中使用
chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | parser
)

# 调用时只需传入问题字符串
answer = chain.invoke("什么是机器学习?")

6.3 RunnableParallel 并行执行

from langchain_core.runnables import RunnableParallel

# 同时执行多个任务
chain = RunnableParallel(
    translation=translation_prompt | model | parser,
    summary=summary_prompt | model | parser,
    keywords=keywords_prompt | model | parser,
)

result = chain.invoke({"text": "这是一段需要处理的文本。"})
print(result["translation"])
print(result["summary"])
print(result["keywords"])

# 并行检索 + 并行分析(实际项目示例)
analysis_chain = RunnableParallel(
    sentiment=sentiment_chain,
    entities=ner_chain,
    categories=classification_chain,
    key_phrases=keyword_chain,
)

article_analysis = analysis_chain.invoke({
    "text": "苹果公司今天发布了新款iPhone,股价上涨3%..."
})

6.4 RunnableLambda

from langchain_core.runnables import RunnableLambda

def count_chars(text: str) -> int:
    return len(text)

def format_count(count: int) -> str:
    return f"文本共 {count} 个字符"

# 用 RunnableLambda 包装
count_chain = RunnableLambda(count_chars) | RunnableLambda(format_count)

result = count_chain.invoke("Hello World!")
print(result)  # "文本共 12 个字符"

# 直接作为管道的一环
chain = prompt | model | parser | RunnableLambda(count_chars)

# 使用装饰器
@RunnableLambda
def to_uppercase(text: str) -> str:
    return text.upper()

chain = prompt | model | parser | to_uppercase

6.5 RunnableBranch 条件路由

from langchain_core.runnables import RunnableBranch

# 根据条件选择不同的处理分支
branch = RunnableBranch(
    (lambda x: "数学" in x["subject"], math_chain),
    (lambda x: "编程" in x["subject"], coding_chain),
    (lambda x: "英语" in x["subject"], english_chain),
    # 默认分支
    general_chain,
)

result = branch.invoke({
    "subject": "编程问题",
    "question": "如何优化Python代码性能?",
})

# 在复杂工作流中使用
content_router = RunnableBranch(
    (lambda x: x["type"] == "image", image_processing_chain),
    (lambda x: x["type"] == "audio", audio_processing_chain),
    (lambda x: x["type"] == "video", video_processing_chain),
    text_processing_chain,
)

6.6 RunnableConfig 运行时配置

from langchain_core.runnables import RunnableConfig

# 传递运行时配置
config = RunnableConfig(
    configurable={
        "model_name": "gpt-4o",
        "temperature": 0.3,
        "language": "zh-CN",
    },
    metadata={"user_id": "user-123", "session_id": "sess-abc"},
    tags=["production", "v2.0"],
    max_concurrency=5,
)

result = chain.invoke({"text": "hello"}, config=config)

# .with_config() 设置默认配置
configured_chain = chain.with_config(
    run_name="TranslationPipeline",
    tags=["translation", "v1.0.0"],
    metadata={"team": "ai-platform"},
)

7. Runnable 协议详解

7.1 统一调用接口

所有组件(Prompt、Model、Tool、Chain、Agent)都实现了 Runnable 协议:

# === 同步调用 ===
# invoke: 等待完整结果
result = chain.invoke({"text": "hello"})

# batch: 批量并发处理
results = chain.batch([
    {"text": "first input"},
    {"text": "second input"},
    {"text": "third input"},
])

# stream: 流式输出
for chunk in chain.stream({"text": "tell me a story"}):
    print(chunk, end="", flush=True)

# batch_as_completed: 批量处理,先完成的先返回
for result in chain.batch_as_completed([
    {"text": "input 1"},
    {"text": "input 2"},
]):
    print(f"完成: {result}")

# === 异步调用 ===
import asyncio

# ainvoke: 异步调用
result = await chain.ainvoke({"text": "hello"})

# abatch: 异步批量
results = await chain.abatch([
    {"text": "first"}, {"text": "second"}
])

# astream: 异步流式
async for chunk in chain.astream({"text": "hello"}):
    print(chunk, end="", flush=True)

# abatch_as_completed: 异步批量(先完成的先返回)
async for result in chain.abatch_as_completed([
    {"text": "first"}, {"text": "second"}
]):
    print(f"完成: {result}")

7.2 .assign() 动态添加字段

from langchain_core.runnables import RunnablePassthrough

# 逐步构建丰富的响应结构
chain = (
    RunnablePassthrough.assign(
        # 第一步:添加基础分析
        translated=RunnableLambda(lambda x: translate(x["text"])),
        word_count=RunnableLambda(lambda x: len(x["text"].split())),
    )
    | RunnablePassthrough.assign(
        # 第二步:基于已有结果继续分析
        summary=RunnableLambda(
            lambda x: summarize(x["translated"])
        ),
        is_long_text=RunnableLambda(
            lambda x: x["word_count"] > 200
        ),
    )
)

result = chain.invoke({"text": "This is a sample text for demonstration."})
print(result.keys())
# dict_keys(['text', 'translated', 'word_count', 'summary', 'is_long_text'])

7.3 .map() 列表映射

# .map() 对列表中的每个元素应用链
individual_chain = prompt | model | parser

# 批量处理列表
batch_chain = individual_chain.map()

results = batch_chain.invoke([
    {"topic": "机器学习"},
    {"topic": "深度学习"},
    {"topic": "强化学习"},
])

# 等价于手动的 batch 调用
results = individual_chain.batch([
    {"topic": "机器学习"},
    {"topic": "深度学习"},
    {"topic": "强化学习"},
])

7.4 .pick() 字段选择

from langchain_core.runnables import RunnablePassthrough

# .pick() 从输入字典中选择特定字段
chain = (
    RunnablePassthrough.pick("question")  # 仅提取 question 字段
    | retriever
)

# 等价于
chain = RunnableLambda(lambda x: x["question"]) | retriever

# 常用于 RAG
chain = (
    {
        "context": RunnablePassthrough.pick("user_query") | retriever,
        "question": RunnablePassthrough.pick("user_query"),
        "language": RunnablePassthrough.pick("target_language"),
    }
    | prompt
    | model
    | parser
)

result = chain.invoke({
    "user_query": "What is ML?",
    "target_language": "Chinese",
    "user_id": "123",      # 不会传递到下游
    "session_id": "abc",   # 不会传递到下游
})

7.5 自定义 Runnable

from langchain_core.runnables import Runnable
from typing import Any, Iterator

class TextAnalysisRunnable(Runnable):
    """自定义文本分析 Runnable"""

    def __init__(self, min_word_count: int = 10):
        self.min_word_count = min_word_count

    def invoke(self, input: str, config=None) -> dict:
        words = input.split()
        return {
            "original": input,
            "word_count": len(words),
            "char_count": len(input),
            "is_too_short": len(words) < self.min_word_count,
        }

    async def ainvoke(self, input: str, config=None) -> dict:
        return self.invoke(input, config)

    def stream(self, input: str, config=None) -> Iterator[dict]:
        yield self.invoke(input, config)

# 直接使用
analyzer = TextAnalysisRunnable(min_word_count=20)
result = analyzer.invoke("This is a short sentence.")
print(result)

# 集成到 LCEL 链
chain = prompt | model | parser | analyzer

7.6 .with_fallbacks() 容错

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

primary = ChatOpenAI(model="gpt-4o")
backup1 = ChatAnthropic(model="claude-sonnet-4-6")
backup2 = ChatOpenAI(model="gpt-4o-mini")

# 自动降级
robust_model = primary.with_fallbacks([backup1, backup2])

chain = prompt | robust_model | parser

# 即使主模型不可用,也能自动切换到备用模型
result = chain.invoke({"input": "解释什么是AI"})

7.7 .with_retry() 自动重试

# 为链添加智能重试
robust_chain = chain.with_retry(
    retry_if_exception_type=(TimeoutError, ConnectionError),
    wait_exponential_jitter=True,  # 指数退避 + 随机抖动
    stop_after_attempt=3,          # 最多重试3次
    wait_exponential_multiplier=1,
    wait_exponential_max=30,       # 最大等待30秒
)

result = robust_chain.invoke({"input": "hello"})

8. 工具定义与使用

8.1 纯函数定义(v1.0 最简方式)

v1.0 中工具定义极大简化,推荐直接使用纯 Python 函数 + 类型注解 + Docstring:

# 最简工具定义:纯函数 + 类型注解 + Docstring
def get_weather(city: str) -> str:
    """获取指定城市的当前天气状况。当用户询问天气时必须调用此工具。

    Args:
        city: 城市名称,如 '北京'、'上海'、'深圳'
    """
    weather_db = {
        "北京": "晴天,25°C,湿度40%,风力2级",
        "上海": "多云,28°C,湿度65%,风力3级",
        "深圳": "阵雨,30°C,湿度80%,风力1级",
    }
    return weather_db.get(city, f"未找到{city}的天气数据")

def calculate(expression: str) -> float:
    """执行数学计算。支持基本运算和常用数学函数。

    Args:
        expression: 有效的数学表达式,如 '2+2'、'sin(pi/2)'、'sqrt(16)*3'
    """
    import math
    allowed = {"__builtins__": {}, **math.__dict__}
    return eval(expression, allowed)

def search_web(query: str) -> str:
    """在互联网上搜索最新信息。当需要实时数据、新闻或事实核查时使用。

    Args:
        query: 搜索关键词
    """
    # 实际项目中对接搜索API
    return f"关于'{query}'的搜索结果:..."

# 直接传入 create_agent
from langchain.agents import create_agent

agent = create_agent(
    model="openai:gpt-4o",
    tools=[get_weather, calculate, search_web],
    system_prompt="你是一个全能的助手。",
)

8.2 @tool 装饰器

from langchain_core.tools import tool

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """发送邮件。在确认用户意图后使用。

    Args:
        to: 收件人邮箱地址
        subject: 邮件主题
        body: 邮件正文
    """
    # 实际发送逻辑
    return f"邮件已发送至 {to},主题:{subject}"

@tool
def query_database(sql: str) -> str:
    """执行SQL查询。仅支持SELECT语句。

    Args:
        sql: SQL查询语句
    """
    if not sql.strip().upper().startswith("SELECT"):
        return "错误:仅支持SELECT查询"
    # 实际查询逻辑
    return f"查询结果:..."

# 查看工具元数据
print(send_email.name)         # "send_email"
print(send_email.description)  # "发送邮件。在确认用户意图后使用。"
print(send_email.args_schema)  # Pydantic model

8.3 StructuredTool (Pydantic Schema)

from pydantic import BaseModel, Field
from langchain_core.tools import StructuredTool
from typing import List

class FlightSearchInput(BaseModel):
    """航班搜索参数"""
    origin: str = Field(description="出发城市,如 '北京'、'上海'")
    destination: str = Field(description="到达城市")
    date: str = Field(description="出发日期,格式 YYYY-MM-DD")
    passengers: int = Field(default=1, description="乘客人数")

def search_flights(
    origin: str, destination: str, date: str, passengers: int = 1
) -> str:
    """搜索航班信息"""
    return (
        f"从{origin}{destination}{date}出发,{passengers}人\n"
        f"找到5个航班,最低票价 ¥680"
    )

flight_tool = StructuredTool.from_function(
    func=search_flights,
    name="search_flights",
    description="搜索指定日期和航线的航班信息",
    args_schema=FlightSearchInput,
)

# 或使用 @tool 装饰器 + args_schema
@tool(args_schema=FlightSearchInput)
def search_flights_v2(
    origin: str, destination: str, date: str, passengers: int = 1
) -> str:
    """搜索指定日期和航线的航班信息"""
    return f"从{origin}{destination}的航班搜索结果:..."

8.4 继承 BaseTool 类

from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type

class CodeReviewInput(BaseModel):
    code: str = Field(description="需要审查的代码")
    language: str = Field(description="编程语言")

class CodeReviewTool(BaseTool):
    name: str = "code_review"
    description: str = "审查代码质量,检查潜在bug、性能问题和安全隐患。"
    args_schema: Type[BaseModel] = CodeReviewInput

    def _run(self, code: str, language: str, **kwargs) -> str:
        # 同步审查逻辑
        issues = []
        if "eval(" in code or "exec(" in code:
            issues.append("⚠️ 安全:使用了危险的 eval/exec")
        if "TODO" in code or "FIXME" in code:
            issues.append("⚠️ 质量:存在未完成的 TODO/FIXME")
        if not issues:
            return "✅ 代码审查通过,未发现问题"
        return "\n".join(issues)

    async def _arun(self, code: str, language: str, **kwargs) -> str:
        return self._run(code, language)

8.5 工具错误处理

from langchain_core.tools import ToolException

@tool
def api_caller(endpoint: str) -> str:
    """调用外部API。"""
    try:
        if not endpoint:
            raise ValueError("endpoint 不能为空")
        # 模拟API调用
        if "timeout" in endpoint:
            raise TimeoutError("API请求超时")
        return f"API响应: {{'status': 'ok'}}"
    except Exception as e:
        raise ToolException(f"工具执行失败 [{endpoint}]: {e}")

# 配置错误处理策略
api_caller.handle_tool_error = True  # 返回异常消息给模型
# 或自定义错误消息
api_caller.handle_tool_error = (
    "工具调用失败,请稍后重试或换一种方式查询。"
)

# 使用 handle_tool_error 回调函数
def error_handler(error: ToolException) -> str:
    return f"[错误码: 500] {error}"

api_caller.handle_tool_error = error_handler

8.6 工具组合与内置工具

# 常用内置工具
from langchain_community.tools import (
    WikipediaQueryRun,
    ArxivQueryRun,
)
from langchain_community.utilities import (
    WikipediaAPIWrapper,
    ArxivAPIWrapper,
)

wikipedia = WikipediaQueryRun(
    api_wrapper=WikipediaAPIWrapper(lang="zh", top_k_results=3)
)

arxiv = ArxivQueryRun(
    api_wrapper=ArxivAPIWrapper(top_k_results=5)
)

# Tavily 搜索工具
# pip install tavily-python
from langchain_community.tools.tavily_search import TavilySearchResults

tavily = TavilySearchResults(
    max_results=5,
    search_depth="advanced",  # basic | advanced
)

# 所有工具组合
tools = [
    get_weather,
    calculate,
    search_web,
    wikipedia,
    arxiv,
    tavily,
]

8.7 工具中的流式输出

from langgraph.config import get_stream_writer

def long_running_tool(data_source: str) -> str:
    """处理大量数据,耗时较长,需要实时反馈进度。"""
    writer = get_stream_writer()

    writer(f"🔍 正在连接数据源: {data_source}...")
    # 连接操作...

    writer(f"📥 正在下载数据...")
    # 下载操作...

    writer(f"🔄 正在处理数据...")
    # 处理操作...

    writer(f"✅ 处理完成!")
    return "数据处理结果: ..."

# 使用 stream_mode="custom" 接收进度消息
agent = create_agent(
    model="openai:gpt-4o",
    tools=[long_running_tool],
)

for mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "处理 sales_data 数据源"}]},
    stream_mode=["messages", "custom"],
):
    if mode == "custom":
        print(f"[进度] {chunk}")
    elif mode == "messages":
        print(chunk, end="", flush=True)

9. create_agent:统一 Agent 入口

9.1 create_agent 基础

create_agent() 是 v1.0 中创建 Agent 的唯一推荐入口,替代了旧版所有的 Agent 创建方式:

from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver

# 最简 Agent (~10行代码)
agent = create_agent(
    model="openai:gpt-4o",
    tools=[get_weather, calculate, search_web],
    system_prompt="你是一个全能的AI助手。",
)

# 调用
result = agent.invoke({
    "messages": [{"role": "user", "content": "北京今天天气怎么样?"}]
})

# 获取最终回答
final_message = result["messages"][-1]
print(final_message.content)

9.2 create_agent 完整参数

agent = create_agent(
    # === 核心参数 ===
    model="openai:gpt-4o",           # 模型名称 或 ChatModel 实例
    tools=[get_weather, calculate],  # 工具列表
    system_prompt="你是...",         # Agent 人设/系统指令

    # === 持久化 ===
    checkpointer=InMemorySaver(),    # LangGraph Checkpointer

    # === 中间件 ===
    middleware=[                     # 中间件列表(洋葱模型)
        SummarizationMiddleware(model="openai:gpt-4o-mini"),
        ToolRetryMiddleware(max_retries=3),
        ModelCallLimitMiddleware(run_limit=10),
    ],

    # === 结构化输出 ===
    response_format=ToolStrategy(OutputSchema),  # 期望的输出结构

    # === 自定义上下文 ===
    context_schema=CustomContext,    # 自定义运行时上下文

    # === 中断控制 ===
    interrupt_before=["tools"],      # 在执行工具前暂停
    interrupt_after=["tools"],       # 在执行工具后暂停
)

9.3 Agent 执行循环

create_agent 底层基于 LangGraph,执行循环如下:

User Input
  ↓
[Middleware: before_agent]
  ↓
┌─────────────────────────────────┐
│  Agent 循环                      │
│  1. [before_model] → 2. LLM     │
│  3. [after_model]                │
│  4. 需要工具? → [wrap_tool_call] │
│     ↓ 是                          │
│  5. 执行工具 → [after_tool]      │
│     ↓                             │
│  6. 返回步骤1 (将工具结果传入)    │
│  7. 不需要工具? → 结束循环       │
└─────────────────────────────────┘
  ↓
[Middleware: after_agent]
  ↓
Final Result

9.4 Agent 响应格式详解

result = agent.invoke({
    "messages": [
        {"role": "user", "content": "帮我查一下北京天气,并计算158*23"}
    ]
})

# result 的结构
print(result.keys())  # dict_keys(['messages'])

# 遍历所有消息
for i, msg in enumerate(result["messages"]):
    print(f"\n[{i}] {type(msg).__name__}")
    if hasattr(msg, "content") and msg.content:
        print(f"  内容: {msg.content[:100]}...")
    if hasattr(msg, "tool_calls") and msg.tool_calls:
        for tc in msg.tool_calls:
            print(f"  工具: {tc['name']}({tc['args']})")

# 访问结构化响应(如果设置了 response_format)
if "structured_response" in result:
    print(f"结构化输出: {result['structured_response']}")

9.5 多轮对话

from langgraph.checkpoint.memory import InMemorySaver

agent = create_agent(
    model="openai:gpt-4o",
    tools=[get_weather, calculate],
    checkpointer=InMemorySaver(),
)

# 使用 thread_id 区分不同会话
config = {"configurable": {"thread_id": "conversation-001"}}

# 第一轮
result = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫小明,我在北京。"}]},
    config=config,
)
print(result["messages"][-1].content)

# 第二轮 — Agent 记得之前的上下文
result = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字?我在哪个城市?"}]},
    config=config,
)
print(result["messages"][-1].content)

# 新会话 — 使用不同的 thread_id
new_config = {"configurable": {"thread_id": "conversation-002"}}
result3 = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字?"}]},
    config=new_config,
)
print(result3["messages"][-1].content)
# Agent 会说自己不知道,因为这是一个全新的会话

9.6 多种模型提供商

# OpenAI
agent_openai = create_agent(
    model="openai:gpt-4o",
    tools=[...],
)

# Anthropic Claude
agent_claude = create_agent(
    model="anthropic:claude-sonnet-4-6",
    tools=[...],
)

# Google Gemini
agent_gemini = create_agent(
    model="google:gemini-2.0-flash",
    tools=[...],
)

# DeepSeek
agent_deepseek = create_agent(
    model="deepseek:deepseek-chat",
    tools=[...],
)

# 直接传入 ChatModel 实例
from langchain_openai import ChatOpenAI
custom_model = ChatOpenAI(model="gpt-4o", temperature=0.3)
agent = create_agent(model=custom_model, tools=[...])

9.7 Agent 与 LCEL 对比

# LCEL 适用场景:固定的线性流程
chain = prompt | model | parser
result = chain.invoke({"input": "hello"})

# create_agent 适用场景:动态决策、工具调用
agent = create_agent(model="openai:gpt-4o", tools=[...])
result = agent.invoke({"messages": [{"role": "user", "content": "..."}]})

# 选择指南:
# - 不需要工具、流程固定 → LCEL
# - 需要工具调用和动态决策 → create_agent
# - 复杂分支/循环/多Agent → LangGraph StateGraph

10. 中间件系统详解

10.1 中间件概念

中间件是 v1.0 最重要的扩展机制,采用洋葱模型在 Agent 生命周期的每个节点注入逻辑:

                    ┌──────────────┐
                    │  Middleware1  │
                    │  ┌──────────┐ │
                    │  │  Mid2    │ │
                    │  │ ┌──────┐ │ │
                    │  │ │ Core │ │ │  ← Agent 核心循环
                    │  │ └──────┘ │ │
                    │  │          │ │
                    │  └──────────┘ │
                    └──────────────┘

10.2 中间件钩子一览

钩子 类型 执行时机 典型用途
before_agent Node Agent 启动前,仅一次 加载上下文、验证输入
before_model Node 每次 LLM 调用前 剪裁消息、脱敏、提示注入
wrap_model_call Wrap 包裹 LLM 调用 重试、降级、动态工具选择
after_model Node 每次 LLM 响应后 输出校验、安全护栏
wrap_tool_call Wrap 包裹工具执行 重试、模拟、审批、脱敏
after_agent Node Agent 结束后,仅一次 保存结果、清理资源

10.3 SummarizationMiddleware

from langchain.agents.middleware import SummarizationMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[...],
    checkpointer=InMemorySaver(),
    middleware=[
        SummarizationMiddleware(
            model="openai:gpt-4o-mini",     # 用于总结的模型(建议用小模型)
            max_tokens_before_summary=2000,   # 超过此token数自动触发总结
        ),
    ],
)

# 长对话时自动压缩历史,保持上下文窗口可控
for i in range(50):
    result = agent.invoke(
        {"messages": [{"role": "user", "content": f"第{i}轮问题..."}]},
        config={"configurable": {"thread_id": "long-chat"}},
    )

10.4 ToolRetryMiddleware

from langchain.agents.middleware import ToolRetryMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[api_caller, query_database],
    middleware=[
        ToolRetryMiddleware(
            max_retries=3,            # 最大重试次数
            backoff_factor=2.0,       # 指数退避因子
            initial_delay=1.0,        # 初始延迟(秒)
            max_delay=60.0,           # 最大延迟(秒)
            jitter=True,              # 添加随机抖动
            retry_on=[TimeoutError, ConnectionError],  # 触发重试的异常
            on_failure="continue",    # continue | error | fallback
        ),
    ],
)

10.5 ModelCallLimitMiddleware / ToolCallLimitMiddleware

from langchain.agents.middleware import (
    ModelCallLimitMiddleware,
    ToolCallLimitMiddleware,
)

agent = create_agent(
    model="openai:gpt-4o",
    tools=[get_weather, calculate, search_web],
    middleware=[
        # 限制每次运行的模型调用次数
        ModelCallLimitMiddleware(
            run_limit=10,          # 单次运行最多10次LLM调用
            thread_limit=100,      # 同一会话最多100次
            exit_behavior="end",   # end=优雅结束 | error=抛出异常
        ),
        # 限制特定工具的使用次数
        ToolCallLimitMiddleware(
            tool_name="search_web",
            run_limit=5,           # 单次运行最多5次搜索
            exit_behavior="end",
        ),
    ],
)

10.6 PIIMiddleware (敏感信息脱敏)

from langchain.agents.middleware import PIIMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[send_email, search_web],
    middleware=[
        # 邮箱地址 → 自动脱敏
        PIIMiddleware(
            pii_type="email",
            strategy="redact",       # redact | mask | block | hash
            apply_to_input=True,     # 对用户输入脱敏
            apply_to_output=True,    # 对模型输出脱敏
            apply_to_tool_results=True,  # 对工具结果脱敏
        ),
        # 手机号 → 自动脱敏
        PIIMiddleware(
            pii_type="phone",
            strategy="mask",  # 如 138****1234
        ),
        # 信用卡号 → 自动拦截
        PIIMiddleware(
            pii_type="credit_card",
            strategy="block",  # 直接阻止请求
        ),
        # 自定义正则
        PIIMiddleware(
            pii_type="custom",
            strategy="redact",
            detector=r"身份证号:\s*\d{17}[\dXx]",
        ),
    ],
)

10.7 HumanInTheLoopMiddleware

from langchain.agents.middleware import HumanInTheLoopMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[send_email, transfer_money, delete_record],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                # 发邮件 → 需要审批
                "send_email": {
                    "allowed_decisions": ["approve", "edit", "reject"],
                },
                # 转账 → 需要审批
                "transfer_money": {
                    "allowed_decisions": ["approve", "reject"],
                },
            },
        ),
    ],
)

# Agent 执行到 send_email 时会暂停,等待人工决策
# 同意:
agent.invoke(
    Command(resume={"decision": "approve"}),
    config=config,
)
# 拒绝:
agent.invoke(
    Command(resume={"decision": "reject"}),
    config=config,
)

10.8 ModelFallbackMiddleware

from langchain.agents.middleware import ModelFallbackMiddleware
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

agent = create_agent(
    model="openai:gpt-4o",
    tools=[...],
    middleware=[
        ModelFallbackMiddleware(
            ChatOpenAI(model="gpt-4o"),          # 主模型
            ChatAnthropic(model="claude-sonnet-4-6"),  # 第一降级
            ChatOpenAI(model="gpt-4o-mini"),     # 第二降级
        ),
    ],
)

10.9 ContextEditingMiddleware

from langchain.agents.middleware import ContextEditingMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[...],
    middleware=[
        ContextEditingMiddleware(
            edits=[
                # 自动清理旧的工具调用,避免上下文污染
                ClearToolUsesEdit(),
            ],
            trigger=50_000,   # 超过 50K tokens 触发
            keep=3,           # 保留最近 3 条消息
        ),
    ],
)

10.10 自定义中间件

from langchain.agents.middleware import (
    AgentMiddleware,
    before_model,
    after_model,
    wrap_tool_call,
)
from typing import Any

# 方式一:类继承(完整控制)
class LoggingMiddleware(AgentMiddleware):
    """记录 Agent 所有操作的自定义中间件"""

    def before_model(self, state, runtime):
        print(f"[{self.name}] 即将调用LLM,当前消息数: {len(state.messages)}")
        return None  # 不修改状态

    def after_model(self, state, runtime):
        last_msg = state.messages[-1]
        print(f"[{self.name}] LLM响应完成: {last_msg.content[:100]}...")
        if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
            print(f"[{self.name}] 请求调用 {len(last_msg.tool_calls)} 个工具")
        return None

    def wrap_tool_call(self, request, handler):
        print(f"[{self.name}] 调用工具: {request.tool_call['name']}")
        result = handler(request)  # 执行原始工具
        print(f"[{self.name}] 工具返回: {str(result)[:100]}...")
        return result

# 方式二:装饰器(简化,单个钩子)
@before_model
def inject_user_context(state, runtime):
    """在每次LLM调用前注入用户上下文"""
    user_context = load_user_context(runtime.config.get("user_id"))
    if user_context:
        state.messages.insert(
            0, {"role": "system", "content": f"用户信息: {user_context}"}
        )
    return None

@wrap_tool_call
def retry_on_failure(request, handler):
    """自定义工具重试逻辑"""
    for attempt in range(3):
        try:
            return handler(request)
        except Exception as e:
            if attempt == 2:
                raise
            print(f"重试 {attempt+1}/3: {e}")

agent = create_agent(
    model="openai:gpt-4o",
    tools=[...],
    middleware=[
        LoggingMiddleware(),
        inject_user_context,
        retry_on_failure,
    ],
)

10.11 中间件执行顺序

# before_* 钩子:按注册顺序执行(第一个先执行)
# after_* 钩子:按注册逆序执行(最后一个先执行)
# wrap_* 钩子:嵌套执行 — 第一个中间件包裹所有后续

agent = create_agent(
    model="openai:gpt-4o",
    tools=[...],
    middleware=[
        M1,  # before: 最先 | wrap: 最外层 | after: 最后
        M2,  # before: 第二 | wrap: 中间层 | after: 第二
        M3,  # before: 最后 | wrap: 最内层 | after: 最先
    ],
)
# 执行顺序:
# M1.before_model → M2.before_model → M3.before_model
# → M1.wrap → M2.wrap → M3.wrap → 实际LLM调用
# → M3.wrap返回 → M2.wrap返回 → M1.wrap返回
# → M3.after_model → M2.after_model → M1.after_model

11. 文档加载器

11.1 TextLoader

from langchain_community.document_loaders import TextLoader

loader = TextLoader("document.txt", encoding="utf-8")
documents = loader.load()

for doc in documents:
    print(f"来源: {doc.metadata['source']}")
    print(f"内容预览: {doc.page_content[:200]}...")

11.2 PyPDFLoader

# pip install pypdf
from langchain_community.document_loaders import PyPDFLoader

loader = PyPDFLoader("report.pdf")
documents = loader.load()

print(f"共 {len(documents)} 页")
for i, page in enumerate(documents):
    print(f"\n第{i+1}页: {page.page_content[:200]}...")
    print(f"  元数据: {page.metadata}")

# 异步加载
docs = await loader.aload()

11.3 WebBaseLoader

from langchain_community.document_loaders import WebBaseLoader

# 单个网页
loader = WebBaseLoader("https://docs.langchain.com/")
docs = loader.load()

# 多个网页(并发加载)
urls = [
    "https://docs.langchain.com/oss/python/langchain/overview",
    "https://docs.langchain.com/oss/python/langgraph/overview",
]
loader = WebBaseLoader(
    urls,
    requests_per_second=2,  # 限速
    continue_on_failure=True,  # 某个失败不影响其他
)
docs = loader.load()
print(f"加载了 {len(docs)} 个页面")

11.4 CSV/JSON 加载

# CSV
from langchain_community.document_loaders import CSVLoader

loader = CSVLoader(
    file_path="data.csv",
    encoding="utf-8",
    source_column="title",
)
docs = loader.load()

# JSON
from langchain_community.document_loaders import JSONLoader

loader = JSONLoader(
    file_path="data.json",
    jq_schema=".messages[].content",  # jq 语法提取字段
    text_content=True,
)
docs = loader.load()

11.5 DirectoryLoader

from langchain_community.document_loaders import (
    DirectoryLoader,
    TextLoader,
    PythonLoader,
)

# 加载目录中所有 .py 文件
loader = DirectoryLoader(
    path="./src/",
    glob="**/*.py",
    loader_cls=TextLoader,
    loader_kwargs={"encoding": "utf-8"},
    show_progress=True,
    use_multithreading=True,
)
docs = loader.load()
print(f"共加载 {len(docs)} 个文件")

11.6 数据库加载

from langchain_community.document_loaders import DatabaseLoader

loader = DatabaseLoader(
    engine_connection_string="postgresql://user:pass@localhost/db",
    query="""
        SELECT id, title, content, created_at
        FROM articles
        WHERE status = 'published'
        ORDER BY created_at DESC
    """,
)
docs = loader.load()

11.7 Document 对象

from langchain_core.documents import Document

# 创建
doc = Document(
    page_content="文档的完整文本内容...",
    metadata={
        "source": "handbook.pdf",
        "page": 42,
        "author": "张三",
        "created_at": "2026-01-15",
        "tags": ["tutorial", "python"],
    },
)

# 常用操作
print(doc.page_content[:200])
print(doc.metadata["source"])

# JSON 序列化
json_str = doc.to_json()
restored = Document.from_json(json_str)

12. 文本分割器

12.1 RecursiveCharacterTextSplitter (最常用)

from langchain_text_splitters import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,           # 每块最大字符数
    chunk_overlap=50,         # 块间重叠字符数
    length_function=len,      # 长度计算函数
    separators=[              # 分割符优先级(由粗到细)
        "\n\n", "\n", "。", ";", ",", " ", ""
    ],
    add_start_index=True,     # 在元数据中添加起始索引
)

# 对文本分割
chunks = splitter.split_text(long_text)

# 对 Document 列表分割
split_docs = splitter.split_documents(documents)

for i, doc in enumerate(split_docs[:5]):
    print(f"\n--- Chunk {i+1} ({len(doc.page_content)} 字符) ---")
    print(doc.page_content[:150])
    print(f"  起始位置: {doc.metadata.get('start_index')}")

12.2 TokenTextSplitter

from langchain_text_splitters import TokenTextSplitter

splitter = TokenTextSplitter(
    chunk_size=200,           # token 数
    chunk_overlap=30,         # 重叠 token 数
    encoding_name="cl100k_base",  # GPT-4 编码
    # 或直接指定模型:
    # model_name="gpt-4o",
)

chunks = splitter.split_text(long_text)

12.3 代码分割器

from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    Language,
)

# Python 代码分割
python_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.PYTHON,
    chunk_size=500,
    chunk_overlap=50,
)

python_code = """
class DatabaseService:
    def __init__(self, connection_string: str):
        self.conn = create_engine(connection_string)

    def get_users(self, limit: int = 100) -> List[User]:
        with self.conn.connect() as session:
            return session.query(User).limit(limit).all()

    def create_user(self, data: dict) -> User:
        user = User(**data)
        with self.conn.connect() as session:
            session.add(user)
            session.commit()
        return user
"""
chunks = python_splitter.split_text(python_code)
for i, chunk in enumerate(chunks):
    print(f"Chunk {i+1}: {len(chunk)} chars")
    print(chunk[:100])
    print("---")

# 支持的语言
# Language.PYTHON, Language.JS, Language.TS, Language.GO,
# Language.RUST, Language.JAVA, Language.CPP, Language.HTML, 等

12.4 SemanticChunker

from langchain_text_splitters import SemanticChunker
from langchain_openai import OpenAIEmbeddings

# 基于嵌入相似度的智能分割
splitter = SemanticChunker(
    embeddings=OpenAIEmbeddings(),
    breakpoint_threshold_type="percentile",  # percentile | standard_deviation | interquartile
    breakpoint_threshold_amount=90,
)

chunks = splitter.split_text(long_text)
# 语义相近的句子会放在同一个chunk中

12.5 HTML 分割器

from langchain_text_splitters import HTMLHeaderTextSplitter

html_splitter = HTMLHeaderTextSplitter(
    headers_to_split_on=[
        ("h1", "一级标题"),
        ("h2", "二级标题"),
        ("h3", "三级标题"),
    ],
)

html_docs = html_splitter.split_text_from_file("page.html")

# 结合 RecursiveCharacterTextSplitter 进一步分割
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=30)
final_docs = text_splitter.split_documents(html_docs)

12.6 中文分割最佳实践

def create_chinese_splitter(chunk_size=500, chunk_overlap=50):
    """针对中文优化的分割器"""
    return RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=[
            "\n\n",     # 段落
            "\n",       # 换行
            "。",       # 中文句号
            ";",       # 中文分号
            "!", "?", # 感叹号、问号
            ",",       # 中文逗号
            "、",       # 顿号
            " ",        # 空格
            "",         # 字符级
        ],
    )

13. 嵌入与向量存储

13.1 文本嵌入

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",  # 高性价比
    # model="text-embedding-3-large",  # 更高精度
    dimensions=1536,  # 可选降维
)

# 嵌入单段文本
text = "LangChain 是一个用于构建LLM应用的框架。"
vector = embeddings.embed_query(text)
print(f"向量维度: {len(vector)}")

# 嵌入多段文本
texts = [
    "LangChain 用于LLM应用开发。",
    "PyTorch 是深度学习框架。",
    "FastAPI 用于构建Web API。",
]
vectors = embeddings.embed_documents(texts)
print(f"生成 {len(vectors)} 个向量,每个维度 {len(vectors[0])}")

# 相似度计算
import numpy as np

def cosine_similarity(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

v1 = embeddings.embed_query("机器学习")
v2 = embeddings.embed_query("深度学习")
v3 = embeddings.embed_query("今天天气很好")

print(f"ML vs DL: {cosine_similarity(v1, v2):.4f}")      # 高 (0.85+)
print(f"ML vs 天气: {cosine_similarity(v1, v3):.4f}")    # 低 (<0.5)

13.2 Chroma 向量数据库

# pip install chromadb langchain-chroma
from langchain_chroma import Chroma

# 从文档创建
vectorstore = Chroma.from_documents(
    documents=split_docs,
    embedding=embeddings,
    persist_directory="./chroma_db",
    collection_name="knowledge_base",
)

# 从文本创建
vectorstore = Chroma.from_texts(
    texts=["文本1", "文本2", "文本3"],
    embedding=embeddings,
    metadatas=[
        {"source": "doc1", "page": 1},
        {"source": "doc2", "page": 2},
        {"source": "doc3", "page": 3},
    ],
)

# 持久化(自动)
# 下次使用直接加载
vectorstore = Chroma(
    persist_directory="./chroma_db",
    embedding_function=embeddings,
    collection_name="knowledge_base",
)

13.3 向量检索方法

# 1. 基础相似性搜索
docs = vectorstore.similarity_search("什么是机器学习?", k=3)

# 2. 带分数的搜索
results = vectorstore.similarity_search_with_score("查询文本", k=5)
for doc, score in results:
    print(f"分数: {score:.4f} | 内容: {doc.page_content[:80]}")

# 3. 使用向量搜索
query_vector = embeddings.embed_query("查询文本")
docs = vectorstore.similarity_search_by_vector(query_vector, k=5)

# 4. MMR 多样性搜索(平衡相似性和多样性)
docs = vectorstore.max_marginal_relevance_search(
    "查询文本",
    k=5,           # 最终返回数
    fetch_k=20,    # 先取20个候选
    lambda_mult=0.5,  # 0=最大多样性, 1=最大相似性
)

# 5. 带元数据过滤的搜索
docs = vectorstore.similarity_search(
    "查询文本",
    k=5,
    filter={"source": "handbook.pdf", "page": {"$gte": 10}},
)

13.4 FAISS

# pip install faiss-cpu   (CPU版)
# pip install faiss-gpu   (GPU版)
from langchain_community.vectorstores import FAISS

# 创建
vectorstore = FAISS.from_documents(split_docs, embeddings)
vectorstore.save_local("./faiss_index")

# 加载
vectorstore = FAISS.load_local(
    "./faiss_index",
    embeddings,
    allow_dangerous_deserialization=True,  # 仅加载可信来源
)

# 合并
other_store = FAISS.from_documents(other_docs, embeddings)
vectorstore.merge_from(other_store)

13.5 Retriever 接口

# as_retriever — 将 VectorStore 转换为标准 Retriever 接口
retriever = vectorstore.as_retriever(
    search_type="similarity",  # similarity | mmr | similarity_score_threshold
    search_kwargs={
        "k": 5,
        # "score_threshold": 0.7,       # 仅用于 similarity_score_threshold
        # "fetch_k": 20,                 # 仅用于 mmr
        # "lambda_mult": 0.5,            # 仅用于 mmr
        # "filter": {"source": "doc1"},  # 元数据过滤
    },
)

# Retriever 实现了 Runnable 接口
docs = retriever.invoke("查询文本")

# 直接用于 LCEL
chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | parser
)

14. RAG 检索增强生成

14.1 完整 RAG 管道

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# 1. 加载文档
loader = TextLoader("knowledge.txt", encoding="utf-8")
docs = loader.load()

# 2. 分割
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(docs)

# 3. 创建向量存储
vectorstore = Chroma.from_documents(chunks, OpenAIEmbeddings())

# 4. 创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

# 5. 构建提示
template = """根据以下上下文回答问题。如果上下文中没有答案,请说"无法从资料中找到答案"。

上下文:
{context}

问题: {question}

请提供详细且准确的回答:"""

prompt = ChatPromptTemplate.from_template(template)

# 6. 创建模型
model = ChatOpenAI(model="gpt-4o", temperature=0)

# 7. 构建 RAG 链
def format_docs(docs):
    return "\n\n---\n\n".join(
        f"[{i+1}] {d.page_content}"
        for i, d in enumerate(docs)
    )

rag_chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

# 8. 查询
answer = rag_chain.invoke("什么是机器学习?")
print(answer)

14.2 带来源引用的 RAG

template = """根据以下带编号的上下文回答问题。在回答中标注引用来源,格式为 [来源X]。

上下文:
{context}

问题: {question}

详细回答(请标注 [来源X]):"""

prompt = ChatPromptTemplate.from_template(template)

# 返回完整结构
def rag_with_sources(question: str):
    docs = retriever.invoke(question)
    context = "\n\n---\n\n".join(
        f"[来源{i+1}] (来自: {d.metadata.get('source', '未知')})\n{d.page_content}"
        for i, d in enumerate(docs)
    )
    answer = (prompt | model | StrOutputParser()).invoke({
        "context": context, "question": question
    })

    return {
        "question": question,
        "answer": answer,
        "sources": [
            {
                "index": i+1,
                "content": d.page_content[:300],
                "metadata": d.metadata,
            }
            for i, d in enumerate(docs)
        ],
    }

result = rag_with_sources("解释深度学习的原理")
print(f"回答: {result['answer']}\n")
print("参考来源:")
for src in result["sources"]:
    print(f"  [{src['index']}] {src['metadata'].get('source')} - {src['content'][:60]}...")

14.3 对话式 RAG

from langchain_core.prompts import MessagesPlaceholder

# 步骤1:将用户问题上下文化
contextualize_prompt = ChatPromptTemplate.from_messages([
    ("system", "根据聊天历史,将用户的问题改写为一个独立完整的问题。"),
    MessagesPlaceholder("chat_history"),
    ("human", "{question}"),
])

contextualize_chain = contextualize_prompt | model | StrOutputParser()

# 步骤2:RAG 问答
qa_prompt = ChatPromptTemplate.from_messages([
    ("system", "根据上下文回答问题。\n\n上下文:\n{context}"),
    MessagesPlaceholder("chat_history"),
    ("human", "{question}"),
])

# 完整链
def build_conversational_rag():
    def _contextualize(inputs):
        if inputs.get("chat_history"):
            return contextualize_chain.invoke(inputs)
        return inputs["question"]

    return (
        RunnablePassthrough.assign(
            context=lambda x: retriever.invoke(_contextualize(x)) | format_docs
        )
        | qa_prompt
        | model
        | StrOutputParser()
    )

conv_rag = build_conversational_rag()

# 多轮对话
from langchain_core.messages import HumanMessage, AIMessage

history = []
q1 = "什么是Python?"
a1 = conv_rag.invoke({"question": q1, "chat_history": history})
history.extend([HumanMessage(content=q1), AIMessage(content=a1)])

q2 = "它有什么优缺点?"  # "它"指Python
a2 = conv_rag.invoke({"question": q2, "chat_history": history})
print(a2)

14.4 create_agent RAG

# 使用 create_agent 构建 RAG Agent
from langchain.agents import create_agent
from langchain.tools import tool

@tool
def search_knowledge_base(query: str) -> str:
    """搜索内部知识库。当需要查找文档、手册或内部资料时使用。"""
    docs = retriever.invoke(query)
    if not docs:
        return "未找到相关文档"
    return "\n\n".join(
        f"[{d.metadata.get('source')}]\n{d.page_content}"
        for d in docs
    )

agent = create_agent(
    model="openai:gpt-4o",
    tools=[search_knowledge_base],
    system_prompt="""你是一个基于知识库的问答助手。
    - 始终先搜索知识库再回答
    - 引用来源标注
    - 如果知识库没有相关信息,请明确说明""",
)

result = agent.invoke({
    "messages": [{"role": "user", "content": "公司的远程办公政策是什么?"}]
})

15. 高级 RAG 策略

15.1 多查询检索 (Multi-Query)

from langchain.retrievers import MultiQueryRetriever
from langchain_openai import ChatOpenAI

# 自动生成多个不同角度的查询,合并检索结果
multi_retriever = MultiQueryRetriever.from_llm(
    llm=ChatOpenAI(model="gpt-4o", temperature=0.5),
    retriever=base_retriever,
    num_queries=3,  # 生成3个角度的查询
)

# 原查询:"如何提高代码质量?"
# 自动生成:
#   "代码质量改进的最佳实践有哪些?"
#   "如何通过代码审查和测试提高代码质量?"
#   "代码质量的衡量标准和改进方法?"

docs = multi_retriever.invoke("如何提高代码质量?")

15.2 自查询检索 (Self-Query)

from langchain.retrievers import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

# 定义文档的元数据字段
metadata_field_info = [
    AttributeInfo(name="source", description="文档来源", type="string"),
    AttributeInfo(name="date", description="发布日期", type="string"),
    AttributeInfo(name="author", description="作者", type="string"),
    AttributeInfo(name="category", description="分类", type="string"),
]

self_query_retriever = SelfQueryRetriever.from_llm(
    llm=model,
    vectorstore=vectorstore,
    document_contents="技术文档和教程合集",
    metadata_field_info=metadata_field_info,
)

# 查询:"2025年后张三写的Python分类的文档"
# 自动解析为 filter + query,精准检索
docs = self_query_retriever.invoke("2025年后张三写的Python分类的文档")

15.3 父文档检索器

from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore

# 用小块检索,返回完整大块上下文
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)

store = InMemoryStore()

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=store,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
    search_kwargs={"k": 3},
)

retriever.add_documents(documents)
# 返回完整的父文档(2000字符),而不是小片段(400字符)
docs = retriever.invoke("什么是机器学习?")

15.4 上下文压缩

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 用LLM提取文档中与查询真正相关的部分
compressor = LLMChainExtractor.from_llm(model)

compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever,
)

# 检索结果被自动压缩,去除无关内容
compressed = compression_retriever.invoke("Python的GIL是什么?")
for doc in compressed:
    print(f"[压缩后 {len(doc.page_content)} 字符] {doc.page_content[:150]}...")

15.5 Ensemble 集成检索

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

# BM25 关键词检索器
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 5

# 语义检索器
semantic_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

# 集成两者
ensemble = EnsembleRetriever(
    retrievers=[bm25_retriever, semantic_retriever],
    weights=[0.3, 0.7],  # BM25 30% + 语义 70%
)

docs = ensemble.invoke("Python编程基础教程")

15.6 HyDE (假设文档嵌入)

# HyDE: 先生成假设性答案,再用答案向量去检索
hyde_prompt = ChatPromptTemplate.from_template(
    "请写一段回答以下问题的短文(内容不需要完全准确,目的是用于向量检索):\n{question}"
)

hyde_chain = hyde_prompt | model | StrOutputParser()

def hyde_retrieve(question: str, k: int = 5):
    # 1. 生成假设答案
    hypothetical_doc = hyde_chain.invoke({"question": question})

    # 2. 用假设答案的向量进行检索
    docs = vectorstore.similarity_search(hypothetical_doc, k=k)
    return docs

# 适用于问句和文档表述差异大的场景
docs = hyde_retrieve("怎么让Python跑得更快?")

15.7 RAG-Fusion

def rag_fusion(question: str, num_queries: int = 4, top_k: int = 5):
    """RAG-Fusion: 多查询 + 倒数排名融合"""
    # 1. 生成多个相关查询
    query_prompt = ChatPromptTemplate.from_template(
        "为以下问题生成{num}个不同角度的搜索查询,每行一个:\n{question}"
    )
    queries_str = (query_prompt | model | StrOutputParser()).invoke({
        "question": question, "num": num_queries,
    })
    queries = [q.strip() for q in queries_str.split("\n") if q.strip()]

    # 2. 为每个查询检索
    all_results = []
    for q in queries:
        docs = retriever.invoke(q)
        all_results.append(docs)

    # 3. 倒数排名融合 (RRF)
    fused = {}
    for doc_list in all_results:
        for rank, doc in enumerate(doc_list):
            key = doc.page_content[:200]  # 简化key
            if key not in fused:
                fused[key] = {"doc": doc, "score": 0}
            fused[key]["score"] += 1 / (rank + 60)  # RRF公式

    # 4. 排序返回
    ranked = sorted(fused.values(), key=lambda x: x["score"], reverse=True)
    return [item["doc"] for item in ranked[:top_k]]

15.8 Step-Back 检索

# 生成更通用的"后退"问题辅助检索
step_back_prompt = ChatPromptTemplate.from_template(
    "针对以下具体问题,生成一个更通用、更高层次的'后退'问题,"
    "用于检索背景知识:\n具体问题: {question}\n后退问题:"
)

step_back_chain = step_back_prompt | model | StrOutputParser()

def step_back_retrieve(question: str, k: int = 3):
    back_question = step_back_chain.invoke({"question": question})

    # 原问题 + 后退问题分别检索
    original_docs = retriever.invoke(question)
    back_docs = retriever.invoke(back_question)

    # 合并去重
    seen = set()
    merged = []
    for doc in original_docs + back_docs:
        key = doc.page_content[:200]
        if key not in seen:
            seen.add(key)
            merged.append(doc)
    return merged[:k]

# 示例:
# 具体问题:"如何强制重启iPhone 15?"
# 后退问题:"移动设备故障排除的通用方法是什么?"

16. LangGraph 状态图基础

16.1 概述

LangGraph 是 LangChain 生态中的有状态图编排框架。v1.0 中 create_agent 底层就是 LangGraph。

  • StateGraph: 将工作流建模为节点(Nodes)和边(Edges)组成的状态机
  • Checkpointer: 在每个节点后自动保存状态快照
  • Streaming: 支持多种流式模式
  • HITL: 原生支持人工干预

16.2 第一个 StateGraph

from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage

# 1. 定义状态
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]

# 2. 定义节点函数
def chatbot(state: AgentState):
    """LLM节点:调用模型"""
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# 3. 构建图
graph = StateGraph(AgentState)

graph.add_node("chatbot", chatbot)
graph.set_entry_point("chatbot")
graph.add_edge("chatbot", END)

# 4. 编译
app = graph.compile()

# 5. 运行
result = app.invoke({
    "messages": [HumanMessage(content="你好!")]
})
print(result["messages"][-1].content)

16.3 Agent + 工具执行图

from langchain_core.messages import ToolMessage
from langgraph.graph import StateGraph, END

class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]

# 绑定工具的模型
model_with_tools = model.bind_tools([get_weather, calculate])

def agent_node(state: AgentState):
    """Agent决策节点"""
    response = model_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def tool_executor(state: AgentState):
    """工具执行节点"""
    last_msg = state["messages"][-1]
    results = []
    for tc in last_msg.tool_calls:
        # 查找并执行工具
        for tool in [get_weather, calculate]:
            if tool.name == tc["name"]:
                output = tool.invoke(tc["args"])
                break
        results.append(ToolMessage(
            content=str(output),
            tool_call_id=tc["id"],
        ))
    return {"messages": results}

def should_continue(state: AgentState) -> str:
    """路由决策"""
    last_msg = state["messages"][-1]
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
        return "tools"
    return END

# 构建图
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_executor)
workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", should_continue, {
    "tools": "tools",
    END: END,
})
workflow.add_edge("tools", "agent")

app = workflow.compile()

# 运行
result = app.invoke({
    "messages": [HumanMessage(content="北京天气如何?另外算一下156*23")]
})

16.4 Checkpointer 持久化

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver

# 开发测试
checkpointer = InMemorySaver()

# 生产环境(SQLite)
import sqlite3
conn = sqlite3.connect("agent_state.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)

# 编译时传入
app = workflow.compile(checkpointer=checkpointer)

# 使用 thread_id 隔离会话
config = {"configurable": {"thread_id": "user-001"}}

# 多轮对话 — 状态自动持久化
app.invoke({"messages": [HumanMessage(content="记住:我喜欢Python")]}, config)
app.invoke({"messages": [HumanMessage(content="我刚才说了我喜欢什么?")]}, config)
# Agent 能回答 "你喜欢Python"

16.5 时间旅行

# 查看历史状态
history = list(app.get_state_history(config))

# 查看最新的状态
current_state = app.get_state(config)
print(f"当前步骤: {current_state.values}")

# 回滚到之前某个状态
target_state = history[3]  # 回滚3步
app.update_state(
    config,
    target_state.values,
)

# 从那个状态继续执行(产生新分支)
result = app.invoke(None, config)

16.6 自定义流式模式

# LangGraph 支持5种流式模式
for mode, chunk in app.stream(
    {"messages": [HumanMessage(content="帮我查天气")]},
    stream_mode=["updates", "messages", "custom"],
    config=config,
):
    if mode == "updates":
        print(f"[状态更新] 节点: {list(chunk.keys())}")
    elif mode == "messages":
        msg, metadata = chunk
        print(f"[消息] {metadata['langgraph_node']}: {msg.content}", end="")
    elif mode == "custom":
        print(f"[自定义] {chunk}")

17. 多 Agent 协作与 Supervisor 模式

17.1 create_supervisor 基础

# pip install langgraph-supervisor
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent

# 创建专业 Agent
math_agent = create_react_agent(
    model="openai:gpt-4o",
    tools=[add, multiply, divide],
    name="math_expert",
)

research_agent = create_react_agent(
    model="openai:gpt-4o",
    tools=[search_web, wikipedia],
    name="research_expert",
)

writer_agent = create_react_agent(
    model="openai:gpt-4o",
    tools=[],
    name="writer",
)

# 创建 Supervisor
supervisor = create_supervisor(
    agents=[research_agent, math_agent, writer_agent],
    model=ChatOpenAI(model="gpt-4o"),
    prompt=(
        "你是一个团队主管。将任务分配给最合适的专家。"
        "当任务完成时,综合各专家的结果给出最终答案。"
    ),
    output_mode="last_message",
)

# 编译运行
app = supervisor.compile()

result = app.invoke({
    "messages": [{
        "role": "user",
        "content": "研究AI最新趋势,分析关键数据,写一份200字的简报。"
    }]
})

17.2 Supervisor 高级配置

supervisor = create_supervisor(
    agents=[research_agent, math_agent, writer_agent],
    model=ChatOpenAI(model="gpt-4o"),
    prompt="你是团队主管...",

    # 并行调用:允许同时分配给多个Agent
    parallel_tool_calls=True,

    # 输出模式
    output_mode="last_message",  # last_message | full_history

    # 自定义 handoff 工具前缀
    handoff_tool_prefix="transfer_to_",

    # 在 Supervisor 模型调用前后插入逻辑
    pre_model_hook=lambda state, runtime: print("Supervisor 思考中..."),
    post_model_hook=lambda state, runtime: print("Supervisor 决策完成"),

    # Agent 名称展示方式
    include_agent_name="inline",  # 在消息内容中嵌入Agent名称
)

app = supervisor.compile(checkpointer=InMemorySaver())

17.3 自定义多 Agent 工作流

from langgraph.graph import StateGraph, END

class MultiAgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    next_agent: str
    task_result: dict

# 创建各 Agent
researcher = create_react_agent(model, [search_web], name="researcher")
analyst = create_react_agent(model, [calculate], name="analyst")

def research_node(state):
    result = researcher.invoke({"messages": state["messages"]})
    return {"messages": result["messages"], "next_agent": "analyst"}

def analyst_node(state):
    result = analyst.invoke({"messages": state["messages"]})
    return {"messages": result["messages"], "next_agent": END}

def quality_check(state):
    """质量检查:如果不满意,退回研究员重新处理"""
    last_msg = state["messages"][-1]
    if "需要更多信息" in last_msg.content:
        return "research"
    return END

# 构建工作流
workflow = StateGraph(MultiAgentState)
workflow.add_node("research", research_node)
workflow.add_node("analyst", analyst_node)
workflow.set_entry_point("research")
workflow.add_edge("research", "analyst")
workflow.add_conditional_edges("analyst", quality_check, {
    "research": "research",
    END: END,
})

app = workflow.compile()

17.4 Swarm 模式 (手拉手)

# Swarm: Agent之间直接互传,无中心Supervisor
def create_swarm_agent(name, tools, handoff_targets):
    system_prompt = f"""你是{name}。你负责{name}相关的任务。
    如果遇到超出你能力范围的问题,使用 handoff 工具转给对应的专家。"""

    # 创建 handoff 工具
    handoff_tools = []
    for target in handoff_targets:
        def make_handoff(target_name):
            @tool
            def handoff(reason: str) -> str:
                """将任务转交给其他专家。"""
                return f"任务已转交给{target_name}。原因: {reason}"
            handoff.name = f"transfer_to_{target_name}"
            return handoff
        handoff_tools.append(make_handoff(target))

    return create_react_agent(
        model="openai:gpt-4o",
        tools=tools + handoff_tools,
        name=name,
        state_modifier=system_prompt,
    )

17.5 多 Agent 模式对比

模式 协调方式 适合场景 Agent数量
Supervisor 中心LLM路由 任务明确可分派 3-7个
Swarm Agent间直接handoff 动态协作 不限
Hierarchical 嵌套Supervisor 复杂组织结构 10+
Sequential 固定流水线 线性处理流程 2-5个

18. 记忆与持久化

18.1 v1.0 记忆策略总览

v1.0 中 Agent 本身无状态。记忆通过 LangGraph Checkpointer 实现:

策略 原理 优缺点
全量记忆 完整保存所有消息 零信息损失,但Token爆炸
摘要记忆 SummarizationMiddleware自动压缩 Token稳定,细节可能丢失
窗口记忆 只保留最近N条消息 实现简单,丢失早期上下文
向量记忆 存入向量库,检索注入 支持跨会话长期记忆

18.2 Checkpointer 后端

# 1. 内存(开发测试)
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()

# 2. SQLite(轻量生产)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("sqlite:///./agent.db")

# 3. Postgres(生产推荐)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/agentdb"
)

# 4. MongoDB
from langgraph.checkpoint.mongodb import MongoDBSaver
checkpointer = MongoDBSaver.from_conn_string(
    "mongodb://localhost:27017/agentdb"
)

18.3 Agent + Checkpointer 完整示例

from langchain.agents import create_agent
from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("sqlite:///./agent_memory.db")

agent = create_agent(
    model="openai:gpt-4o",
    tools=[get_weather, search_web],
    checkpointer=checkpointer,
    system_prompt="你是一个有记忆的AI助手。记得用户之前说过的话。",
)

# 用户首次会话
config_user1 = {"configurable": {"thread_id": "user-alice"}}
result = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫Alice,我在上海。"}]},
    config=config_user1,
)

# 同一用户稍后回来 — Agent 记得一切
result = agent.invoke(
    {"messages": [{"role": "user", "content": "我叫什么名字?我在哪个城市?"}]},
    config=config_user1,
)
print(result["messages"][-1].content)
# "你叫Alice,你在上海。"

# 不同用户 — 完全隔离
config_user2 = {"configurable": {"thread_id": "user-bob"}}
result = agent.invoke(
    {"messages": [{"role": "user", "content": "我之前说了什么?"}]},
    config=config_user2,
)
# "这是我们第一次对话,你没有告诉我任何信息。"

18.4 摘要记忆 (SummarizationMiddleware)

from langchain.agents.middleware import SummarizationMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[...],
    checkpointer=InMemorySaver(),
    middleware=[
        SummarizationMiddleware(
            model="openai:gpt-4o-mini",
            max_tokens_before_summary=2000,  # 超过2000 tokens自动总结
            messages_to_keep=5,  # 保留最近5条原始消息
        ),
    ],
)

# 长对话自动管理 — 旧消息被总结,新消息保持完整

18.5 Long-Term Memory (Store)

# Checkpointer = 短期记忆(同一thread内的对话历史)
# Store = 长期记忆(跨thread持久化知识)

from langgraph.store.memory import InMemoryStore

store = InMemoryStore()

# 在 agent 节点中存取长期记忆
def node_with_memory(state, config, *, store):
    user_id = config["configurable"]["user_id"]

    # 读取用户记忆
    memories = store.search(("users", user_id))

    # 处理逻辑...

    # 保存新记忆
    store.put(
        ("users", user_id, "preferences"),
        "preferences",
        {"language": "Chinese", "expertise": "beginner"},
    )

18.6 记忆管理最佳实践

class MemoryManager:
    """统一管理短期/长期记忆"""

    def __init__(self, checkpointer, store):
        self.checkpointer = checkpointer
        self.store = store

    def get_conversation_history(self, thread_id: str):
        """获取会话历史(短期记忆)"""
        config = {"configurable": {"thread_id": thread_id}}
        state = self.checkpointer.get(config)
        return state.values["messages"] if state else []

    def get_user_memories(self, user_id: str):
        """获取用户长期记忆"""
        return self.store.search(("users", user_id))

    def save_user_fact(self, user_id: str, key: str, value):
        """保存用户事实"""
        self.store.put(
            ("users", user_id, "facts"),
            key, value
        )

    def build_context(self, thread_id: str, user_id: str):
        """构建完整的上下文"""
        history = self.get_conversation_history(thread_id)
        memories = self.get_user_memories(user_id)

        context = {
            "conversation_history": history[-10:],  # 最近10条
            "user_memories": memories,
        }
        return context

19. 流式输出详解

19.1 LCEL 流式

# 同步流式
chain = prompt | model | parser

for chunk in chain.stream({"text": "请介绍人工智能的发展历程"}):
    print(chunk, end="", flush=True)

# 异步流式
async def stream_chain():
    async for chunk in chain.astream({"text": "介绍深度学习"}):
        print(chunk, end="", flush=True)

import asyncio
asyncio.run(stream_chain())

19.2 Agent 流式模式

# Agent 支持5种流式模式
agent = create_agent(model="openai:gpt-4o", tools=[get_weather, calculate])

# 1. messages 模式 — 逐token流式 + 元数据
for mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "北京天气如何?"}]},
    stream_mode=["messages"],
):
    msg, metadata = chunk
    node = metadata["langgraph_node"]
    if hasattr(msg, "content") and msg.content:
        print(f"[{node}] {msg.content}", end="", flush=True)

# 2. updates 模式 — 状态更新
for mode, chunk in agent.stream(
    {"messages": [...]},
    stream_mode=["updates"],
):
    print(f"节点更新: {chunk}")

# 3. custom 模式 — 工具自定义流式
for mode, chunk in agent.stream(
    {"messages": [...]},
    stream_mode=["custom"],
):
    print(f"[工具进度] {chunk}")

# 4. 组合多种模式
for mode, chunk in agent.stream(
    {"messages": [...]},
    stream_mode=["messages", "updates", "custom"],
):
    if mode == "messages":
        msg, meta = chunk
        print(msg.content, end="", flush=True)
    elif mode == "updates":
        print(f"\n[状态变更] {list(chunk.keys())}")
    elif mode == "custom":
        print(f"\n[进度] {chunk}")

19.3 astream_events (详细事件)

# astream_events 提供最详细的执行事件
async for event in chain.astream_events({"text": "hello"}, version="v2"):
    kind = event["event"]
    name = event.get("name", "unknown")

    if kind == "on_chat_model_start":
        print(f"\n[模型开始] {name}")
    elif kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)
    elif kind == "on_chat_model_end":
        print(f"\n[模型结束]")
    elif kind == "on_tool_start":
        print(f"\n[工具开始] {name}")
    elif kind == "on_tool_end":
        print(f"\n[工具结束] {name}: {event['data'].get('output', '')[:100]}")
    elif kind == "on_chain_start":
        print(f"\n[链开始] {name}")
    elif kind == "on_chain_end":
        print(f"\n[链结束] {name}")

19.4 FastAPI SSE 流式服务

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

app = FastAPI()

class ChatRequest(BaseModel):
    message: str
    thread_id: str = "default"

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    async def generate():
        config = {"configurable": {"thread_id": req.thread_id}}
        async for mode, chunk in agent.astream(
            {"messages": [{"role": "user", "content": req.message}]},
            stream_mode=["messages", "custom"],
            config=config,
        ):
            if mode == "messages":
                msg, _ = chunk
                content = msg.content if hasattr(msg, "content") else ""
                if content:
                    yield f"data: {json.dumps({'type': 'text', 'content': content})}\n\n"
            elif mode == "custom":
                yield f"data: {json.dumps({'type': 'progress', 'content': chunk})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

19.5 工具中的流式进度

from langgraph.config import get_stream_writer

def data_analysis_tool(dataset: str) -> str:
    """分析大型数据集,返回分析报告。"""
    writer = get_stream_writer()

    writer(f"📊 开始分析 {dataset}...")
    # 步骤1
    writer(f"  ├─ [1/4] 加载数据...")
    # 加载...

    # 步骤2
    writer(f"  ├─ [2/4] 数据清洗...")
    # 清洗...

    # 步骤3
    writer(f"  ├─ [3/4] 统计分析...")
    # 分析...

    # 步骤4
    writer(f"  └─ [4/4] 生成报告...")
    # 报告...

    writer(f"✅ 分析完成!")
    return "分析报告: ..."

# 消费进度消息
for mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "分析sales_2025数据集"}]},
    stream_mode=["messages", "custom"],
):
    if mode == "custom":
        print(f"\n{chunk}")
    elif mode == "messages":
        msg, _ = chunk
        if hasattr(msg, "content") and msg.content:
            print(msg.content, end="", flush=True)

20. Human-in-the-Loop

20.1 interrupt_before / interrupt_after

# create_agent 原生支持 HITL
agent = create_agent(
    model="openai:gpt-4o",
    tools=[send_email, transfer_money],
    interrupt_before=["tools"],  # 执行任何工具前暂停
)

# 运行,会在工具执行前暂停
result = agent.invoke(
    {"messages": [{"role": "user", "content": "给admin@company.com发一封请假邮件"}]},
    config={"configurable": {"thread_id": "session-123"}},
)

# 检查状态
state = agent.get_state({"configurable": {"thread_id": "session-123"}})
print(f"暂停: {state.next}")  # "tools"

# 批准执行
from langgraph.types import Command
agent.invoke(
    Command(resume={"action": "approve"}),
    config={"configurable": {"thread_id": "session-123"}},
)

20.2 HumanInTheLoopMiddleware

from langchain.agents.middleware import HumanInTheLoopMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[send_email, delete_record, transfer_money],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "send_email": {
                    "allowed_decisions": ["approve", "edit", "reject"],
                },
                "delete_record": {
                    "allowed_decisions": ["approve", "reject"],
                },
                "transfer_money": {
                    "allowed_decisions": ["approve", "reject"],
                    "require_comment": True,   # 需要审批意见
                },
            },
        ),
    ],
)

# 不同的审批决策
from langgraph.types import Command

# 批准
agent.invoke(Command(resume={"decision": "approve"}), config)

# 编辑后批准
agent.invoke(Command(resume={
    "decision": "edit",
    "edited_input": {"to": "new_email@company.com"}
}), config)

# 拒绝
agent.invoke(Command(resume={
    "decision": "reject",
    "comment": "未经授权的操作"
}), config)

20.3 自定义 HITL 工作流

# 在 LangGraph 中自定义 HITL
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command

class WorkflowState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    approval_status: str

def process_node(state):
    """处理节点"""
    # 处理逻辑...
    return {"messages": [...], "approval_status": "pending"}

def review_node(state):
    """审查节点:在此暂停等待人工"""
    # interrupt() 会暂停图执行,等待外部恢复
    decision = interrupt("请人工审查此操作")
    return {"approval_status": decision}

def finalize_node(state):
    return {"messages": [AIMessage(content="操作已完成")]}

# 构建图
workflow = StateGraph(WorkflowState)
workflow.add_node("process", process_node)
workflow.add_node("review", review_node)
workflow.add_node("finalize", finalize_node)

workflow.set_entry_point("process")
workflow.add_edge("process", "review")
workflow.add_edge("review", "finalize")
workflow.add_edge("finalize", END)

app = workflow.compile(checkpointer=InMemorySaver())

# 运行到 review_node 时会暂停
config = {"configurable": {"thread_id": "workflow-001"}}
app.invoke({"messages": [...]}, config)

# 人工审查后恢复执行
app.invoke(Command(resume="approved"), config)

20.4 HITL 审批 UI 示例

# 后端:提供待审批列表
@app.get("/pending-approvals")
async def get_pending():
    """获取所有暂停等待审批的会话"""
    pending = []
    # 查询所有处于interrupt状态的thread
    # ...
    return pending

@app.post("/approve/{thread_id}")
async def approve(thread_id: str, decision: dict):
    """审批并恢复执行"""
    config = {"configurable": {"thread_id": thread_id}}
    result = agent.invoke(
        Command(resume=decision),
        config=config,
    )
    return {"status": "resumed", "thread_id": thread_id}

21. 回调系统与监控

21.1 基础回调处理器

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any, Dict, List

class LoggingCallback(BaseCallbackHandler):
    """自定义回调:记录所有操作"""

    def on_llm_start(self, serialized, prompts, **kwargs):
        print(f"[LLM Start] 提示数: {len(prompts)}")
        for i, p in enumerate(prompts):
            print(f"  提示{i+1}: {str(p)[:100]}...")

    def on_llm_end(self, response: LLMResult, **kwargs):
        usage = response.llm_output.get("token_usage", {})
        print(f"[LLM End] Tokens: {usage.get('total_tokens', '?')}")

    def on_llm_error(self, error, **kwargs):
        print(f"[LLM Error] {error}")

    def on_tool_start(self, serialized, input_str, **kwargs):
        print(f"[Tool Start] {serialized['name']}: {input_str[:80]}")

    def on_tool_end(self, output, **kwargs):
        print(f"[Tool End] {str(output)[:200]}")

    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"[Chain Start] {serialized.get('name', 'Unknown')}")

    def on_chain_end(self, outputs, **kwargs):
        print(f"[Chain End] keys: {list(outputs.keys())}")

# 使用
callback = LoggingCallback()
result = chain.invoke({"input": "hello"}, config={"callbacks": [callback]})

21.2 成本追踪回调

class CostTracker(BaseCallbackHandler):
    """追踪 API 调用成本"""

    PRICING = {
        "gpt-4o": {"prompt": 2.50, "completion": 10.00},     # 每1M tokens
        "gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
        "claude-sonnet-4-6": {"prompt": 3.00, "completion": 15.00},
    }

    def __init__(self):
        self.calls = 0
        self.prompt_tokens = 0
        self.completion_tokens = 0
        self.total_cost = 0.0

    def on_llm_end(self, response, **kwargs):
        self.calls += 1
        usage = response.llm_output.get("token_usage", {})
        prompt_t = usage.get("prompt_tokens", 0)
        completion_t = usage.get("completion_tokens", 0)

        self.prompt_tokens += prompt_t
        self.completion_tokens += completion_t

        model = response.llm_output.get("model_name", "unknown")
        price = self.PRICING.get(model, {"prompt": 2.50, "completion": 10.00})
        cost = (prompt_t / 1_000_000) * price["prompt"] + \
               (completion_t / 1_000_000) * price["completion"]
        self.total_cost += cost

    def report(self):
        return {
            "calls": self.calls,
            "prompt_tokens": self.prompt_tokens,
            "completion_tokens": self.completion_tokens,
            "total_tokens": self.prompt_tokens + self.completion_tokens,
            "estimated_cost_usd": round(self.total_cost, 6),
        }

tracker = CostTracker()
chain.invoke({"input": "hello"}, config={"callbacks": [tracker]})
print(tracker.report())

21.3 LangSmith 集成

# 设置环境变量即可自动追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls_..."
os.environ["LANGCHAIN_PROJECT"] = "my-production-app"

# 所有执行自动追踪到 LangSmith
result = agent.invoke({
    "messages": [{"role": "user", "content": "hello"}]
})

# 自定义追踪名称和标签
from langchain_core.tracers.context import tracing_v2_enabled

with tracing_v2_enabled(project_name="experiment"):
    result = chain.invoke({"input": "test"}, config={
        "metadata": {"experiment_id": "exp-42"},
        "tags": ["evaluation", "v2"],
    })

21.4 请求缓存

from langchain_core.caches import InMemoryCache, SQLiteCache
from langchain_core.globals import set_llm_cache

# 内存缓存
set_llm_cache(InMemoryCache())

# SQLite 持久化缓存
set_llm_cache(SQLiteCache(database_path=".langchain_cache.db"))

# 相同请求直接从缓存返回,不调用API
response1 = model.invoke("什么是机器学习?")  # API调用
response2 = model.invoke("什么是机器学习?")  # 缓存命中

22. 生产环境最佳实践

22.1 配置管理

from dataclasses import dataclass, field
from typing import List, Optional
import os

@dataclass
class AppConfig:
    """应用配置"""
    # LLM
    model_name: str = os.getenv("LLM_MODEL", "gpt-4o")
    model_provider: str = os.getenv("LLM_PROVIDER", "openai")
    temperature: float = 0.0
    max_tokens: int = 4096

    # 检索
    chunk_size: int = 500
    chunk_overlap: int = 50
    retriever_k: int = 5
    embedding_model: str = "text-embedding-3-small"

    # Agent
    max_iterations: int = 15
    max_execution_time: int = 60

    # 限流
    requests_per_second: float = 5.0
    max_concurrency: int = 10

    # 持久化
    checkpoint_db_url: str = "sqlite:///./agent.db"
    chroma_persist_dir: str = "./chroma_db"

    # 缓存
    cache_enabled: bool = True
    cache_db_path: str = "./llm_cache.db"

    # 安全
    pii_detection_enabled: bool = True
    sensitive_tools: List[str] = field(default_factory=lambda: [
        "send_email", "delete_record", "transfer_money"
    ])
    require_approval: bool = True

config = AppConfig()

22.2 生产级 Agent 工厂

class ProductionAgent:
    """生产级 Agent 封装"""

    def __init__(self, config: AppConfig):
        self.config = config

        # 初始化模型
        self.model = init_chat_model(
            f"{config.model_provider}:{config.model_name}",
            temperature=config.temperature,
            max_tokens=config.max_tokens,
        )

        # 初始化 Checkpointer
        self.checkpointer = SqliteSaver.from_conn_string(
            config.checkpoint_db_url
        )

        # 构建中间件栈
        self.middleware = self._build_middleware()

        # 创建 Agent
        self.agent = create_agent(
            model=self.model,
            tools=self._get_tools(),
            checkpointer=self.checkpointer,
            middleware=self.middleware,
            system_prompt=self._get_system_prompt(),
        )

    def _build_middleware(self):
        middleware = []

        # 敏感信息脱敏
        if self.config.pii_detection_enabled:
            middleware.append(PIIMiddleware("email", strategy="redact"))
            middleware.append(PIIMiddleware("credit_card", strategy="block"))

        # 调用限制
        middleware.append(ModelCallLimitMiddleware(run_limit=15))
        middleware.append(ToolCallLimitMiddleware(
            tool_name="search_web", run_limit=10
        ))

        # 摘要压缩
        middleware.append(SummarizationMiddleware(
            model="openai:gpt-4o-mini",
            max_tokens_before_summary=3000,
        ))

        # 敏感工具需审批
        if self.config.require_approval:
            middleware.append(HumanInTheLoopMiddleware(
                interrupt_on={
                    tool: {"allowed_decisions": ["approve", "reject"]}
                    for tool in self.config.sensitive_tools
                },
            ))

        # 降级
        middleware.append(ModelFallbackMiddleware(
            self.model,
            init_chat_model("openai:gpt-4o-mini"),
        ))

        return middleware

    def invoke(self, user_id: str, session_id: str, message: str):
        config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}
        return self.agent.invoke(
            {"messages": [{"role": "user", "content": message}]},
            config=config,
        )

    def stream(self, user_id: str, session_id: str, message: str):
        config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}
        return self.agent.stream(
            {"messages": [{"role": "user", "content": message}]},
            stream_mode=["messages", "custom"],
            config=config,
        )

22.3 错误处理策略

from tenacity import (
    retry, stop_after_attempt, wait_exponential,
    retry_if_exception_type,
)

@retry(
    retry=retry_if_exception_type((TimeoutError, ConnectionError)),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    stop=stop_after_attempt(3),
)
def robust_invoke(chain, inputs, config=None):
    try:
        return chain.invoke(inputs, config=config)
    except Exception as e:
        # 记录错误日志
        logger.error(f"调用失败: {e}", extra={
            "inputs": str(inputs)[:200],
            "error_type": type(e).__name__,
        })
        raise

# 优雅降级
def safe_invoke(chain, inputs, fallback="抱歉,服务暂时不可用。"):
    try:
        return chain.invoke(inputs)
    except Exception as e:
        return fallback

22.4 日志与可观测性

import logging
import json
import time

# 结构化日志
logger = logging.getLogger("langchain-app")
logger.setLevel(logging.INFO)

class StructuredLogHandler(logging.Handler):
    def emit(self, record):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
        }
        if hasattr(record, "extra_data"):
            log_entry.update(record.extra_data)
        print(json.dumps(log_entry, ensure_ascii=False))

# 性能监控
def monitor_latency(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = (time.time() - start) * 1000
        logger.info(f"{func.__name__} 耗时: {duration:.0f}ms")
        return result
    return wrapper

@monitor_latency
def answer_question(question: str):
    return agent.invoke({
        "messages": [{"role": "user", "content": question}]
    })

22.5 安全最佳实践

class SecurityMiddleware:
    """安全中间件"""

    # 1. 输入校验
    @staticmethod
    def validate_input(message: str, max_length: int = 10000):
        if len(message) > max_length:
            raise ValueError(f"输入过长(>{max_length}字符)")
        # 其他校验...

    # 2. Prompt Injection 防护
    @staticmethod
    def sanitize_system_prompt(prompt: str) -> str:
        dangerous_patterns = [
            r"忽略.*指令",
            r"ignore.*instruction",
            r"system:\s*",
        ]
        for pattern in dangerous_patterns:
            if re.search(pattern, prompt, re.IGNORECASE):
                raise ValueError("检测到潜在的Prompt Injection")
        return prompt

    # 3. 工具权限控制
    @staticmethod
    def validate_tool_access(user_role: str, tool_name: str) -> bool:
        TOOL_PERMISSIONS = {
            "admin": ["send_email", "delete_record", "transfer_money"],
            "user": ["search_web", "get_weather", "calculate"],
            "guest": ["search_web"],
        }
        allowed = TOOL_PERMISSIONS.get(user_role, [])
        return tool_name in allowed

22.6 健康检查与监控

# FastAPI 健康检查端点
@app.get("/health")
async def health_check():
    checks = {}

    # 检查 LLM 连接
    try:
        model.invoke("ping")
        checks["llm"] = "healthy"
    except Exception as e:
        checks["llm"] = f"unhealthy: {e}"

    # 检查向量数据库
    try:
        vectorstore.similarity_search("test", k=1)
        checks["vectorstore"] = "healthy"
    except Exception as e:
        checks["vectorstore"] = f"unhealthy: {e}"

    # 检查 Checkpointer
    try:
        checkpointer.get({"configurable": {"thread_id": "health-check"}})
        checks["checkpointer"] = "healthy"
    except Exception as e:
        checks["checkpointer"] = f"unhealthy: {e}"

    all_healthy = all(v == "healthy" for v in checks.values())
    status_code = 200 if all_healthy else 503

    return JSONResponse(
        content={"status": "ok" if all_healthy else "degraded", "checks": checks},
        status_code=status_code,
    )

# 指标收集
from prometheus_client import Counter, Histogram, generate_latest

request_count = Counter("agent_requests_total", "Total agent requests")
request_duration = Histogram("agent_request_duration_seconds", "Request duration")
tool_call_count = Counter("tool_calls_total", "Total tool calls", ["tool_name"])

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")

23. v0.x → v1.0 迁移指南

23.1 导入变更速查

# ===== 模型 =====
# OLD (v0.x)
from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI  # 已废弃

# NEW (v1.0)
from langchain.chat_models import init_chat_model  # 统一入口
from langchain_openai import ChatOpenAI

# ===== Agent =====
# OLD (v0.x)
from langchain.agents import initialize_agent, AgentExecutor
from langgraph.prebuilt import create_react_agent

# NEW (v1.0)
from langchain.agents import create_agent  # 唯一推荐入口

# ===== Chains =====
# OLD (v0.x)
from langchain.chains import LLMChain, ConversationChain
from langchain.chains import RetrievalQA

# NEW (v1.0)
# 使用 LCEL 管道 或 create_agent
chain = prompt | model | parser

# ===== 记忆 =====
# OLD (v0.x)
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

# NEW (v1.0)
# 使用 LangGraph Checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver

# ===== 旧版兼容 =====
# 如需使用旧版 Chains/Agents
# pip install langchain-classic
from langchain_classic.chains import LLMChain

23.2 典型迁移场景

# === 场景1:简单对话链 ===
# OLD
from langchain.chains import ConversationChain
chain = ConversationChain(llm=model, memory=memory)
result = chain.run("你好")

# NEW — LCEL
chain = prompt | model | parser
result = chain.invoke({"input": "你好"})

# NEW — create_agent
agent = create_agent(model="openai:gpt-4o")
result = agent.invoke({"messages": [{"role": "user", "content": "你好"}]})

# === 场景2:RAG链 ===
# OLD
from langchain.chains import RetrievalQA
qa = RetrievalQA.from_chain_type(llm=model, retriever=retriever)
result = qa.run("question")

# NEW — LCEL
def format_docs(docs):
    return "\n\n".join(d.page_content for d in docs)
rag = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt | model | parser
)
result = rag.invoke("question")

# === 场景3:带工具的Agent ===
# OLD
from langchain.agents import initialize_agent, AgentType
agent = initialize_agent(tools, model, AgentType.ZERO_SHOT_REACT_DESCRIPTION)
result = agent.run("任务")

# NEW
agent = create_agent(model="openai:gpt-4o", tools=tools)
result = agent.invoke({"messages": [{"role": "user", "content": "任务"}]})

# === 场景4:记忆 ===
# OLD
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(return_messages=True)

# NEW
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
agent = create_agent(
    model="openai:gpt-4o",
    checkpointer=checkpointer,
)
config = {"configurable": {"thread_id": "user-123"}}

23.3 迁移检查清单

# ✅ 检查清单
MIGRATION_CHECKLIST = """
1. [ ] 移除所有 langchain.chains 导入
       → 替换为 LCEL | 管道 或 create_agent

2. [ ] 移除 AgentExecutor / initialize_agent
       → 替换为 create_agent

3. [ ] 移除 create_react_agent (已废弃)
       → 替换为 create_agent

4. [ ] 移除 RunnableWithMessageHistory
       → 替换为 LangGraph Checkpointer

5. [ ] 移除 ConversationBufferMemory 等记忆类
       → 替换为 Checkpointer + Middleware

6. [ ] 检查所有 import 路径
       → langchain.llms → langchain.chat_models
       → langchain.chains → LCEL 或 langchain-classic

7. [ ] 使用 init_chat_model 统一模型创建
       → 替代各厂商独立的 ChatModel 导入

8. [ ] 处理结构化输出时使用 with_structured_output
       → 替代各种 OutputParser 组合

9. [ ] 跨厂商响应处理使用 content_blocks
       → 替代厂商特定的响应解析逻辑

10.[ ] 添加 langchain-classic 依赖(如需渐进迁移)
"""

附录 A:常用代码片段

A.1 翻译链

translate_prompt = ChatPromptTemplate.from_template(
    "将以下{source}文本翻译成{target}:\n{text}"
)
translate_chain = translate_prompt | model | StrOutputParser()

A.2 总结链

summarize_prompt = ChatPromptTemplate.from_template(
    "请用{max_words}字以内总结以下内容:\n\n{text}\n\n总结:"
)
summarize_chain = summarize_prompt | model | StrOutputParser()

A.3 代码审查链

review_prompt = ChatPromptTemplate.from_template("""
请审查以下{language}代码,从以下角度分析:
1. 潜在Bug
2. 性能问题
3. 安全隐患
4. 代码风格

```{language}
{code}

审查报告:“”")
review_chain = review_prompt | model | StrOutputParser()

### A.4 实体提取

```python
class Entities(BaseModel):
    people: List[str] = Field(description="人名列表")
    organizations: List[str] = Field(description="组织名列表")
    locations: List[str] = Field(description="地名列表")
    dates: List[str] = Field(description="日期列表")

extractor = model.with_structured_output(Entities)
entities = extractor.invoke("2026年6月,张三在北京参加了阿里巴巴的技术大会。")

A.5 文本分类

class Category(str, Enum):
    TECH = "科技"
    SPORTS = "体育"
    FINANCE = "财经"
    ENTERTAINMENT = "娱乐"

class Classification(BaseModel):
    category: Category
    confidence: float = Field(ge=0, le=1)
    keywords: List[str]

classifier = model.with_structured_output(Classification)
result = classifier.invoke("苹果公司发布了新一代MacBook Pro,搭载M4芯片。")

附录 B:常见问题

Q: create_agent 和 old create_react_agent 的区别?
A: create_agent 是 v1.0 统一入口,运行在 LangGraph 上,原生支持 middleware、checkpointer、structured output。create_react_agent 在 v1.2 已废弃。

Q: 什么时候用 LCEL,什么时候用 create_agent?
A: 固定流程用 LCEL(| 管道),需要工具调用和动态决策用 create_agent,复杂工作流用 StateGraph

Q: Checkpointer 选哪个?
A: 开发用 InMemorySaver,轻量生产用 SqliteSaver,正式生产用 PostgresSaver

Q: 如何降低 Token 成本?
A: (1) 启用 LLM 缓存 (2) 用 SummarizationMiddleware (3) 小模型做总结,大模型做推理 (4) 优化 prompt 长度 (5) 使用更便宜的结构化输出模型。

Q: v0.x 项目如何渐进迁移?
A: 安装 langchain-classic 兼容包,用 from langchain_classic.chains import LLMChain 替代旧导入,逐步替换为 LCEL 和 create_agent

Q: content_blocks 和 .content 有什么区别?
A: .content 返回纯文本拼接,厂商格式不统一。.content_blocks 返回类型化列表,跨厂商统一,支持 reasoning、tool_use、citation 等类型。


学习路线建议

  1. 从 LCEL 和 create_agent 入手
  2. 熟练掌握 with_structured_output 和 content_blocks
  3. 理解中间件系统(Middleware)
  4. 深入 LangGraph 的 StateGraph 和 Checkpointer
  5. 掌握多 Agent 编排(Supervisor/Swarm)

本文档基于 LangChain v1.0 官方文档编写,持续更新中。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐