LangChain 系列 · 第十一篇:从 Demo 到上线——生产部署实战

🎯 适合人群:已完成 LangChain 应用开发,想将其部署为生产可用服务的工程师
⏱️ 阅读时间:约 50 分钟
💬 本文介绍如何用 FastAPI 封装 LangChain 应用、实现 SSE 流式输出、处理并发与异步、设置重试与降级,以及控制 Token 成本


一、Demo 与生产环境的差距

一个在本地跑通的 LangChain Demo 和一个生产可用的服务之间,隔着一批工程问题:

维度 Demo 生产环境
并发 单用户顺序执行 多用户同时请求,需要异步处理
响应体验 等待完整结果 流式输出,用户实时看到生成过程
错误处理 抛出异常程序崩溃 重试、降级、错误响应规范化
成本控制 不关心 Token 消耗 监控用量,防止超支
超时 无限等待 请求超时保护
密钥管理 硬编码或 .env 文件 环境变量注入,密钥轮换

本文逐一解决这些问题,最终交付一个可以直接上线的 LangChain 服务骨架。


二、技术栈选型

  Production Stack:

  ┌─────────────────────────────────────────────┐
  │  Client (Browser / Mobile / Other Service)  │
  └──────────────────┬──────────────────────────┘
                     │ HTTP / SSE
  ┌──────────────────▼──────────────────────────┐
  │  FastAPI  (async, streaming, validation)    │
  └──────────────────┬──────────────────────────┘
                     │
  ┌──────────────────▼──────────────────────────┐
  │  LangChain / LangGraph  (async chains)      │
  └──────────────────┬──────────────────────────┘
                     │
  ┌──────────────────▼──────────────────────────┐
  │  OpenAI / Anthropic / Other LLM Provider   │
  └─────────────────────────────────────────────┘
  • FastAPI:Python 生态中性能最强的异步 Web 框架,原生支持 async/await 和 SSE(Server-Sent Events)
  • SSE(Server-Sent Events):服务器向客户端推送流式数据的 HTTP 标准协议,比 WebSocket 更轻量,适合单向流式输出场景

📝 SSE(Server-Sent Events) 是一种基于 HTTP 的服务器推送技术。服务器保持连接打开,持续向客户端发送 data: ...\n\n 格式的文本事件,客户端通过 EventSource API 或普通 HTTP 流读取。LLM 流式输出是 SSE 最典型的应用场景之一。

pip install fastapi uvicorn langchain-openai python-dotenv

三、基础结构:同步接口

先搭建最简单的同步版本,验证接口结构:

# app.py
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

load_dotenv()

# --- 请求/响应模型 ---

class ChatRequest(BaseModel):
    question: str
    session_id: str = "default"

class ChatResponse(BaseModel):
    answer: str
    session_id: str

# --- Chain 初始化 ---
# 在模块级别初始化,避免每次请求都重新创建模型实例

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的 LangChain 技术助手,回答简洁准确,适当提供代码示例。"),
    ("human", "{question}"),
])

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = prompt | llm | StrOutputParser()

# --- FastAPI 应用 ---

app = FastAPI(title="LangChain API", version="1.0.0")

@app.get("/health")
def health_check():
    """健康检查接口,用于负载均衡器探活"""
    return {"status": "ok"}

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        # ainvoke 是 invoke 的异步版本,不阻塞事件循环
        answer = await chain.ainvoke({"question": request.question})
        return ChatResponse(answer=answer, session_id=request.session_id)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
uvicorn app:app --reload --port 8000

💡 所有 LangChain 的 Runnable 都有对应的异步方法:ainvokeastreamabatch。在 FastAPI 这类异步框架中,必须使用异步方法,否则 LLM 的阻塞调用会占用事件循环,导致服务无法处理其他并发请求。


四、流式输出:SSE 实现

流式输出是 LLM 服务的标配体验——用户不必等待完整响应,可以实时看到生成过程。

from fastapi.responses import StreamingResponse
import asyncio
import json

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    """
    流式输出接口,返回 SSE 格式的数据流。

    SSE 格式:每条消息以 "data: " 开头,以 "\n\n" 结尾
    客户端用 EventSource 或读取 HTTP 流接收
    """
    async def generate():
        try:
            # astream 逐 token 返回生成结果
            async for chunk in chain.astream({"question": request.question}):
                if chunk:
                    # SSE 标准格式
                    data = json.dumps({"token": chunk, "done": False},
                                      ensure_ascii=False)
                    yield f"data: {data}\n\n"

            # 发送结束信号
            yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"

        except Exception as e:
            error_data = json.dumps({"error": str(e), "done": True})
            yield f"data: {error_data}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 关闭 Nginx 缓冲,确保实时推送
        },
    )

客户端接收示例(Python):

import httpx

async def receive_stream(question: str):
    async with httpx.AsyncClient(timeout=60.0) as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/chat/stream",
            json={"question": question},
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = json.loads(line[6:])
                    if data.get("done"):
                        print()
                        break
                    print(data["token"], end="", flush=True)

import asyncio
asyncio.run(receive_stream("什么是 LangGraph 的 Checkpointer?"))

客户端接收示例(JavaScript):

const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ question: '什么是 LCEL?' }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const lines = decoder.decode(value).split('\n\n');
  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = JSON.parse(line.slice(6));
      if (data.done) break;
      process.stdout.write(data.token);  // 或更新 DOM
    }
  }
}

五、重试与降级

LLM API 调用会遇到各种瞬时错误:速率限制(429)、服务超时(504)、临时不可用(503)。生产服务必须处理这些情况。

5.1 LangChain 内置重试

from langchain_openai import ChatOpenAI

# with_retry:在 LangChain 层面自动重试
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).with_retry(
    retry_if_exception_type=(Exception,),  # 哪些异常触发重试
    stop_after_attempt=3,                  # 最多重试 3 次
    wait_exponential_jitter=True,          # 指数退避 + 随机抖动,避免重试风暴
)

📝 指数退避(Exponential Backoff) 是一种重试策略:每次重试的等待时间按指数增长(1s → 2s → 4s),配合随机抖动(jitter)防止多个客户端同时重试造成的请求洪峰。

5.2 模型降级:主备切换

当主要模型不可用时,自动切换到备用模型:

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# 主模型
primary_llm = ChatOpenAI(model="gpt-4o", temperature=0)

# 备用模型(成本更低,但能力稍弱)
fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# with_fallbacks:主模型失败时自动切换到备用
llm_with_fallback = primary_llm.with_fallbacks(
    [fallback_llm],
    exceptions_to_handle=(Exception,),
)

chain = prompt | llm_with_fallback | StrOutputParser()

5.3 请求超时保护

from langchain_core.runnables import RunnableConfig
import asyncio

async def chat_with_timeout(question: str, timeout: float = 30.0) -> str:
    """带超时保护的 Chain 调用"""
    try:
        result = await asyncio.wait_for(
            chain.ainvoke(
                {"question": question},
                config=RunnableConfig(max_concurrency=1),
            ),
            timeout=timeout,
        )
        return result
    except asyncio.TimeoutError:
        raise HTTPException(
            status_code=504,
            detail=f"请求超时({timeout}s),请稍后重试或简化问题"
        )

5.4 完整的错误处理层

from fastapi import Request
from fastapi.responses import JSONResponse
import logging

logger = logging.getLogger(__name__)

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    """全局异常处理,统一错误响应格式"""
    logger.error(f"未处理异常: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={
            "error": "服务内部错误",
            "detail": str(exc) if app.debug else "请联系管理员",
        },
    )

六、并发控制与连接池

LLM API 通常有并发限制(如 OpenAI 的 RPM/TPM 限制)。不加控制地并发请求会触发 429 错误。

6.1 信号量限制并发数

import asyncio
from typing import Optional

# 信号量:限制同时进行的 LLM 调用数量
llm_semaphore = asyncio.Semaphore(10)  # 最多 10 个并发 LLM 调用

async def call_llm_with_semaphore(question: str) -> str:
    async with llm_semaphore:
        return await chain.ainvoke({"question": question})

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    answer = await call_llm_with_semaphore(request.question)
    return ChatResponse(answer=answer, session_id=request.session_id)

📝 信号量(Semaphore) 是并发控制的基本原语,限制同时执行某段代码的协程数量。asyncio.Semaphore(10) 表示最多允许 10 个协程同时持有该信号量,第 11 个协程会等待直到有一个释放。

6.2 批量请求

from langchain_core.runnables import RunnableConfig

@app.post("/chat/batch")
async def chat_batch(requests: list[ChatRequest]):
    """批量处理多个问题,比串行调用更高效"""
    questions = [{"question": r.question} for r in requests]

    # abatch 并发处理所有请求,max_concurrency 控制并发数
    answers = await chain.abatch(
        questions,
        config=RunnableConfig(max_concurrency=5),
    )

    return [
        ChatResponse(answer=ans, session_id=req.session_id)
        for req, ans in zip(requests, answers)
    ]

七、Token 用量监控与成本控制

Token 费用是 LLM 应用最主要的变动成本,不加监控很容易超支。

7.1 记录每次调用的 Token 用量

from langchain_core.callbacks import UsageMetadata
from langchain_openai import ChatOpenAI

# callback_manager 在每次调用后触发回调
token_usage_log = []

from langchain_core.callbacks.base import BaseCallbackHandler

class TokenUsageCallback(BaseCallbackHandler):
    """记录每次 LLM 调用的 Token 用量"""

    def on_llm_end(self, response, **kwargs):
        usage = response.llm_output.get("token_usage", {})
        token_usage_log.append({
            "prompt_tokens": usage.get("prompt_tokens", 0),
            "completion_tokens": usage.get("completion_tokens", 0),
            "total_tokens": usage.get("total_tokens", 0),
        })

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0,
    callbacks=[TokenUsageCallback()],
)

7.2 估算请求成本

# gpt-4o-mini 当前价格(美元/1M tokens,以实际价格为准)
PRICING = {
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "gpt-4o":      {"input": 2.50, "output": 10.00},
}

def estimate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
    """估算本次调用的美元成本"""
    if model not in PRICING:
        return 0.0
    price = PRICING[model]
    cost = (prompt_tokens * price["input"] + completion_tokens * price["output"]) / 1_000_000
    return round(cost, 6)

# 在接口中返回用量信息
@app.post("/chat/with-usage")
async def chat_with_usage(request: ChatRequest):
    from langchain_community.callbacks import get_openai_callback

    with get_openai_callback() as cb:
        answer = await chain.ainvoke({"question": request.question})

    cost = estimate_cost("gpt-4o-mini", cb.prompt_tokens, cb.completion_tokens)

    return {
        "answer": answer,
        "usage": {
            "prompt_tokens": cb.prompt_tokens,
            "completion_tokens": cb.completion_tokens,
            "total_tokens": cb.total_tokens,
            "estimated_cost_usd": cost,
        },
    }

7.3 输入长度限制

import tiktoken

def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
    """计算文本的 Token 数量"""
    enc = tiktoken.encoding_for_model(model)
    return len(enc.encode(text))

MAX_INPUT_TOKENS = 2000  # 限制用户输入的最大 Token 数

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    token_count = count_tokens(request.question)
    if token_count > MAX_INPUT_TOKENS:
        raise HTTPException(
            status_code=400,
            detail=f"输入过长({token_count} tokens),请控制在 {MAX_INPUT_TOKENS} tokens 以内",
        )

    answer = await chain.ainvoke({"question": request.question})
    return ChatResponse(answer=answer, session_id=request.session_id)

八、完整的生产服务骨架

将以上所有组件整合为一个完整的生产就绪服务:

# production_app.py
import asyncio
import json
import logging
import os
from contextlib import asynccontextmanager

from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ─── 配置 ────────────────────────────────────────────────────────────────────

class Settings:
    model: str = os.getenv("LLM_MODEL", "gpt-4o-mini")
    max_concurrency: int = int(os.getenv("MAX_CONCURRENCY", "10"))
    request_timeout: float = float(os.getenv("REQUEST_TIMEOUT", "30.0"))
    max_input_tokens: int = int(os.getenv("MAX_INPUT_TOKENS", "2000"))
    env: str = os.getenv("APP_ENV", "development")

settings = Settings()

# ─── 模型与 Chain ─────────────────────────────────────────────────────────────

prompt = ChatPromptTemplate.from_messages([
    (
        "system",
        "你是一个专业的 LangChain 技术助手。\n"
        "回答简洁准确,技术问题附代码示例,回答不超过 300 字。"
    ),
    ("human", "{question}"),
])

# 主模型 + 备用模型 + 重试
primary_llm = ChatOpenAI(model=settings.model, temperature=0).with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True,
)
fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm = primary_llm.with_fallbacks([fallback_llm])

chain = prompt | llm | StrOutputParser()

# 并发信号量
semaphore = asyncio.Semaphore(settings.max_concurrency)

# ─── 请求/响应模型 ────────────────────────────────────────────────────────────

class ChatRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=1000, description="用户问题")
    session_id: str = Field(default="default", description="会话 ID")

class ChatResponse(BaseModel):
    answer: str
    session_id: str

# ─── FastAPI 应用 ─────────────────────────────────────────────────────────────

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info(f"服务启动,模型:{settings.model},环境:{settings.env}")
    yield
    logger.info("服务关闭")

app = FastAPI(
    title="LangChain Production API",
    version="1.0.0",
    lifespan=lifespan,
)

# ─── 全局异常处理 ─────────────────────────────────────────────────────────────

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"未处理异常 [{request.method} {request.url}]: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={
            "error": "服务内部错误",
            "detail": str(exc) if settings.env == "development" else "请稍后重试",
        },
    )

# ─── 接口 ─────────────────────────────────────────────────────────────────────

@app.get("/health")
def health():
    return {"status": "ok", "model": settings.model, "env": settings.env}

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    async with semaphore:
        try:
            answer = await asyncio.wait_for(
                chain.ainvoke({"question": request.question}),
                timeout=settings.request_timeout,
            )
            return ChatResponse(answer=answer, session_id=request.session_id)
        except asyncio.TimeoutError:
            raise HTTPException(status_code=504, detail="请求超时,请简化问题后重试")

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        try:
            async with semaphore:
                async for chunk in chain.astream({"question": request.question}):
                    if chunk:
                        data = json.dumps(
                            {"token": chunk, "done": False}, ensure_ascii=False
                        )
                        yield f"data: {data}\n\n"
            yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"
        except Exception as e:
            logger.error(f"流式输出异常: {e}", exc_info=True)
            yield f"data: {json.dumps({'error': str(e), 'done': True})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )
# 启动服务
uvicorn production_app:app --host 0.0.0.0 --port 8000 --workers 4

九、缓存:相同问题不重复调用 LLM

LLM 调用是整个链路中最昂贵的操作。对于相同的问题,重复调用既浪费成本又增加延迟。语义缓存能有效解决这个问题。

9.1 精确缓存(InMemoryCache)

适合开发环境和单进程部署,完全相同的输入命中缓存:

from langchain_core.globals import set_llm_cache
from langchain_community.cache import InMemoryCache

# 设置全局 LLM 缓存,之后所有 LLM 调用自动走缓存
set_llm_cache(InMemoryCache())

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 第一次调用:实际请求 LLM
result1 = llm.invoke([HumanMessage(content="什么是向量数据库?")])

# 第二次相同输入:直接从缓存返回,不调用 LLM
result2 = llm.invoke([HumanMessage(content="什么是向量数据库?")])

9.2 语义缓存(SemanticSimilarityExactMatchCache)

精确缓存只能命中完全一致的输入。语义缓存通过向量相似度匹配,让"什么是向量数据库"和"向量数据库是什么"命中同一条缓存:

from langchain_openai import OpenAIEmbeddings
from langchain_community.cache import RedisSemanticCache

# 语义缓存:基于向量相似度,相似问题共享同一条缓存
semantic_cache = RedisSemanticCache(
    redis_url="redis://localhost:6379",
    embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
    score_threshold=0.95,   # 相似度超过 0.95 才命中缓存,越高越严格
)
set_llm_cache(semantic_cache)

⚠️ 语义缓存需要额外的 Embedding 调用,本身也有成本。适合问题重复率高(如知识库问答、FAQ)的场景;对于每次都不同的创意生成类任务,缓存收益接近零。

9.3 Redis 缓存(生产多进程环境)

InMemoryCache 是进程内的,多进程部署下各进程缓存独立,命中率低。生产环境应使用 Redis 实现跨进程共享的缓存:

pip install redis langchain-community
from langchain_community.cache import RedisCache
import redis

redis_client = redis.Redis(
    host=os.getenv("REDIS_HOST", "localhost"),
    port=int(os.getenv("REDIS_PORT", "6379")),
    decode_responses=True,
)

set_llm_cache(RedisCache(redis_=redis_client, ttl=3600))  # 缓存 1 小时

十、速率限制:防止接口被滥用

生产环境的 LLM 服务必须对单用户的请求频率加以限制,否则一个用户的大量请求就能耗尽 API 配额或使账单暴涨。

10.1 基于 Redis 的滑动窗口限流

pip install redis
import time
import redis

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

def check_rate_limit(
    user_id: str,
    max_requests: int = 20,
    window_seconds: int = 60,
) -> tuple[bool, int]:
    """
    滑动窗口速率限制。

    Returns:
        (allowed, remaining): 是否允许请求,以及当前窗口内剩余次数
    """
    key = f"rate_limit:{user_id}"
    now = time.time()
    window_start = now - window_seconds

    pipe = redis_client.pipeline()
    # 删除窗口外的旧记录
    pipe.zremrangebyscore(key, 0, window_start)
    # 记录本次请求时间戳
    pipe.zadd(key, {str(now): now})
    # 查询窗口内的请求总数
    pipe.zcard(key)
    # 设置 key 过期时间,防止内存泄漏
    pipe.expire(key, window_seconds)
    results = pipe.execute()

    count = results[2]
    remaining = max(0, max_requests - count)
    allowed = count <= max_requests

    return allowed, remaining

在 FastAPI 中集成限流:

from fastapi import Header
from fastapi.responses import JSONResponse

@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    x_user_id: str = Header(default="anonymous"),  # 从请求头获取用户 ID
):
    allowed, remaining = check_rate_limit(
        user_id=x_user_id,
        max_requests=20,    # 每分钟最多 20 次
        window_seconds=60,
    )

    if not allowed:
        return JSONResponse(
            status_code=429,
            content={"error": "请求过于频繁,请稍后重试"},
            headers={"Retry-After": "60", "X-RateLimit-Remaining": "0"},
        )

    async with semaphore:
        answer = await asyncio.wait_for(
            chain.ainvoke({"question": request.question}),
            timeout=settings.request_timeout,
        )

    return ChatResponse(answer=answer, session_id=request.session_id)

十一、多轮对话的会话管理

第八、九篇演示的多轮对话都是在内存中维护 chat_history,进程重启后状态丢失。生产环境需要将会话历史持久化。

11.1 基于 Redis 的会话存储

import json
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage

class RedisSessionStore:
    """将对话历史持久化到 Redis"""

    def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
        self.redis = redis_client
        self.ttl = ttl   # 会话过期时间(秒),超时自动清理

    def _key(self, session_id: str) -> str:
        return f"session:{session_id}"

    def get_history(self, session_id: str) -> list[BaseMessage]:
        """获取会话历史"""
        raw = self.redis.get(self._key(session_id))
        if not raw:
            return []
        messages = []
        for item in json.loads(raw):
            if item["type"] == "human":
                messages.append(HumanMessage(content=item["content"]))
            elif item["type"] == "ai":
                messages.append(AIMessage(content=item["content"]))
        return messages

    def append(self, session_id: str, human: str, ai: str):
        """追加一轮对话"""
        history = self.get_history(session_id)
        history.extend([
            HumanMessage(content=human),
            AIMessage(content=ai),
        ])
        # 只保留最近 20 轮,防止 context 过长
        if len(history) > 40:
            history = history[-40:]
        serialized = [
            {"type": "human" if isinstance(m, HumanMessage) else "ai",
             "content": m.content}
            for m in history
        ]
        self.redis.setex(self._key(session_id), self.ttl, json.dumps(serialized))

    def clear(self, session_id: str):
        """清除会话"""
        self.redis.delete(self._key(session_id))

session_store = RedisSessionStore(redis_client)

11.2 带会话记忆的多轮对话接口

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

# Prompt 中加入 chat_history 占位符
multi_turn_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的 LangChain 技术助手,记住对话上下文,前后保持连贯。"),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    ("human", "{question}"),
])

multi_turn_chain = multi_turn_prompt | llm | StrOutputParser()

@app.post("/chat/session", response_model=ChatResponse)
async def chat_with_session(request: ChatRequest):
    session_id = request.session_id

    # 从 Redis 取出历史
    history = session_store.get_history(session_id)

    async with semaphore:
        answer = await asyncio.wait_for(
            multi_turn_chain.ainvoke({
                "question": request.question,
                "chat_history": history,
            }),
            timeout=settings.request_timeout,
        )

    # 将本轮对话追加到 Redis
    session_store.append(session_id, request.question, answer)

    return ChatResponse(answer=answer, session_id=session_id)

@app.delete("/chat/session/{session_id}")
async def clear_session(session_id: str):
    """清除指定会话的历史"""
    session_store.clear(session_id)
    return {"message": f"会话 {session_id} 已清除"}

十二、结构化日志

生产环境的日志应输出 JSON 格式,便于 ELK、Loki 等日志采集系统解析和过滤,而不是人类可读的纯文本。

import logging
import json
import time
from datetime import datetime

class JSONFormatter(logging.Formatter):
    """将日志格式化为 JSON,便于日志平台采集"""

    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        # 附加 extra 字段(如 trace_id、user_id)
        for key, value in record.__dict__.items():
            if key not in logging.LogRecord.__dict__ and not key.startswith("_"):
                log_data[key] = value
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_data, ensure_ascii=False)

def setup_logging():
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    root_logger.handlers = [handler]

setup_logging()
logger = logging.getLogger(__name__)

在请求处理中注入上下文信息:

import uuid
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """为每个请求生成 trace_id,记录请求耗时"""

    async def dispatch(self, request: Request, call_next):
        trace_id = str(uuid.uuid4())[:8]
        start_time = time.time()

        # 将 trace_id 注入请求状态,方便在路由函数中引用
        request.state.trace_id = trace_id

        logger.info(
            "Request started",
            extra={
                "trace_id": trace_id,
                "method": request.method,
                "path": request.url.path,
            },
        )

        response = await call_next(request)
        duration_ms = round((time.time() - start_time) * 1000, 2)

        logger.info(
            "Request completed",
            extra={
                "trace_id": trace_id,
                "status_code": response.status_code,
                "duration_ms": duration_ms,
            },
        )

        response.headers["X-Trace-Id"] = trace_id
        return response

app.add_middleware(RequestLoggingMiddleware)

十三、优雅关闭与健康检查

13.1 优雅关闭

进程收到 SIGTERM 信号时(如 Kubernetes 滚动更新),应等待正在处理的请求完成后再退出,而不是立即中断:

import signal
import asyncio

# 记录当前正在处理的请求数
active_requests = 0
shutting_down = False

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info(f"服务启动,模型:{settings.model},环境:{settings.env}")

    yield  # 服务运行期间

    # 进入关闭阶段
    global shutting_down
    shutting_down = True
    logger.info("开始优雅关闭,等待进行中的请求完成...")

    # 等待所有进行中的请求完成(最多等 30 秒)
    for _ in range(300):
        if active_requests == 0:
            break
        await asyncio.sleep(0.1)

    logger.info(f"服务关闭,剩余未完成请求:{active_requests}")

app = FastAPI(title="LangChain Production API", version="1.0.0", lifespan=lifespan)

@app.middleware("http")
async def track_requests(request: Request, call_next):
    global active_requests
    if shutting_down:
        return JSONResponse(status_code=503, content={"error": "服务正在重启,请稍后重试"})

    active_requests += 1
    try:
        return await call_next(request)
    finally:
        active_requests -= 1

13.2 健康检查分级

生产环境的健康检查应分为两个级别:

@app.get("/health/live")
def liveness():
    """
    存活探针(Liveness Probe):进程是否存活。
    Kubernetes 用此判断是否需要重启容器。
    返回简单的 200 即可,不做复杂检查。
    """
    if shutting_down:
        raise HTTPException(status_code=503, detail="服务正在关闭")
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """
    就绪探针(Readiness Probe):服务是否准备好接收流量。
    Kubernetes 用此判断是否将流量路由到该实例。
    检查依赖服务(Redis、LLM API)是否可用。
    """
    checks = {}

    # 检查 Redis 连接
    try:
        redis_client.ping()
        checks["redis"] = "ok"
    except Exception as e:
        checks["redis"] = f"error: {e}"

    # 检查 LLM API 可达性(可选,避免增加太多延迟)
    checks["llm_model"] = settings.model

    all_ok = all(v == "ok" or not v.startswith("error") for v in checks.values()
                 if isinstance(v, str))

    if not all_ok:
        raise HTTPException(
            status_code=503,
            detail={"status": "not ready", "checks": checks},
        )

    return {"status": "ready", "checks": checks}

💡 Kubernetes 的 livenessProbereadinessProbe 应分别指向 /health/live/health/ready。存活探针失败会触发容器重启;就绪探针失败只会暂停流量路由,不重启容器。两者用途不同,不能混用。


十四、部署注意事项

14.1 多进程部署

pip install gunicorn

gunicorn production_app:app \
  --worker-class uvicorn.workers.UvicornWorker \
  --workers 4 \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --graceful-timeout 30     # 优雅关闭等待时间

⚠️ --workers 4 会启动 4 个独立进程,每个进程有自己的事件循环和信号量。如果需要全局并发控制(如 API Rate Limit),应在 Redis 中实现,而不是用进程内的 asyncio.Semaphore

14.2 密钥管理

# ❌ 不要在代码或镜像中存放密钥
OPENAI_API_KEY = "sk-xxxxxxxx"

# ✅ 生产环境通过外部密钥管理服务注入
# Kubernetes Secret:
#   kubectl create secret generic llm-secrets \
#     --from-literal=OPENAI_API_KEY=sk-xxx
# AWS Secrets Manager / GCP Secret Manager / Vault 同理

14.3 Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 先复制依赖文件,利用 Docker 层缓存
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENV PYTHONUNBUFFERED=1
ENV APP_ENV=production

EXPOSE 8000

# 使用非 root 用户运行,提升安全性
RUN useradd -m appuser && chown -R appuser /app
USER appuser

CMD ["gunicorn", "production_app:app", \
     "--worker-class", "uvicorn.workers.UvicornWorker", \
     "--workers", "2", \
     "--bind", "0.0.0.0:8000", \
     "--timeout", "120", \
     "--graceful-timeout", "30", \
     "--access-logfile", "-", \
     "--error-logfile", "-"]

14.4 Kubernetes 部署片段

# deployment.yaml 关键配置片段
apiVersion: apps/v1
kind: Deployment
spec:
  replicas: 3
  template:
    spec:
      containers:
        - name: langchain-api
          image: your-registry/langchain-api:latest
          env:
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: llm-secrets
                  key: OPENAI_API_KEY
            - name: APP_ENV
              value: production
            - name: REDIS_HOST
              value: redis-service
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          livenessProbe:
            httpGet:
              path: /health/live
              port: 8000
            initialDelaySeconds: 10
            periodSeconds: 15
          readinessProbe:
            httpGet:
              path: /health/ready
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 10
          lifecycle:
            preStop:
              exec:
                # 给 gunicorn 时间完成优雅关闭
                command: ["/bin/sh", "-c", "sleep 5"]

十五、完整的生产服务骨架(整合版)

将本文所有组件整合为一份完整的、可直接用作项目起点的代码:

# production_app.py — 完整整合版
import asyncio
import json
import logging
import os
import time
import uuid
from contextlib import asynccontextmanager

import redis
from dotenv import load_dotenv
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from starlette.middleware.base import BaseHTTPMiddleware

load_dotenv()

# ─── 配置 ────────────────────────────────────────────────────────────────────

class Settings:
    model: str = os.getenv("LLM_MODEL", "gpt-4o-mini")
    max_concurrency: int = int(os.getenv("MAX_CONCURRENCY", "10"))
    request_timeout: float = float(os.getenv("REQUEST_TIMEOUT", "30.0"))
    max_input_tokens: int = int(os.getenv("MAX_INPUT_TOKENS", "2000"))
    env: str = os.getenv("APP_ENV", "development")
    redis_host: str = os.getenv("REDIS_HOST", "localhost")
    redis_port: int = int(os.getenv("REDIS_PORT", "6379"))
    rate_limit_max: int = int(os.getenv("RATE_LIMIT_MAX", "20"))
    rate_limit_window: int = int(os.getenv("RATE_LIMIT_WINDOW", "60"))

settings = Settings()

# ─── 日志 ─────────────────────────────────────────────────────────────────────

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
        }
        for key, value in record.__dict__.items():
            if key not in logging.LogRecord.__dict__ and not key.startswith("_"):
                log_data[key] = value
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_data, ensure_ascii=False)

handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
logger = logging.getLogger(__name__)

# ─── Redis ────────────────────────────────────────────────────────────────────

redis_client = redis.Redis(
    host=settings.redis_host,
    port=settings.redis_port,
    decode_responses=True,
)
set_llm_cache(RedisCache(redis_=redis_client, ttl=3600))

# ─── 模型与 Chain ─────────────────────────────────────────────────────────────

primary_llm = ChatOpenAI(model=settings.model, temperature=0).with_retry(
    stop_after_attempt=3, wait_exponential_jitter=True,
)
fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm = primary_llm.with_fallbacks([fallback_llm])

single_turn_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的 LangChain 技术助手,回答简洁准确,技术问题附代码示例。"),
    ("human", "{question}"),
])

multi_turn_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的 LangChain 技术助手,记住对话上下文,前后保持连贯。"),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    ("human", "{question}"),
])

parser = StrOutputParser()
chain = single_turn_prompt | llm | parser
session_chain = multi_turn_prompt | llm | parser

semaphore = asyncio.Semaphore(settings.max_concurrency)

# ─── 请求/响应模型 ────────────────────────────────────────────────────────────

class ChatRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=1000)
    session_id: str = Field(default="default")

class ChatResponse(BaseModel):
    answer: str
    session_id: str

# ─── 会话存储 ─────────────────────────────────────────────────────────────────

class RedisSessionStore:
    def __init__(self, client: redis.Redis, ttl: int = 3600):
        self.redis = client
        self.ttl = ttl

    def _key(self, sid: str) -> str:
        return f"session:{sid}"

    def get_history(self, sid: str):
        raw = self.redis.get(self._key(sid))
        if not raw:
            return []
        messages = []
        for item in json.loads(raw):
            cls = HumanMessage if item["type"] == "human" else AIMessage
            messages.append(cls(content=item["content"]))
        return messages

    def append(self, sid: str, human: str, ai: str):
        history = self.get_history(sid)
        history.extend([HumanMessage(content=human), AIMessage(content=ai)])
        if len(history) > 40:
            history = history[-40:]
        serialized = [
            {"type": "human" if isinstance(m, HumanMessage) else "ai",
             "content": m.content}
            for m in history
        ]
        self.redis.setex(self._key(sid), self.ttl, json.dumps(serialized))

    def clear(self, sid: str):
        self.redis.delete(self._key(sid))

session_store = RedisSessionStore(redis_client)

# ─── 速率限制 ─────────────────────────────────────────────────────────────────

def check_rate_limit(user_id: str) -> tuple[bool, int]:
    key = f"rate_limit:{user_id}"
    now = time.time()
    window_start = now - settings.rate_limit_window
    pipe = redis_client.pipeline()
    pipe.zremrangebyscore(key, 0, window_start)
    pipe.zadd(key, {str(now): now})
    pipe.zcard(key)
    pipe.expire(key, settings.rate_limit_window)
    results = pipe.execute()
    count = results[2]
    return count <= settings.rate_limit_max, max(0, settings.rate_limit_max - count)

# ─── 生命周期与中间件 ─────────────────────────────────────────────────────────

active_requests = 0
shutting_down = False

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Service starting", extra={"model": settings.model, "env": settings.env})
    yield
    global shutting_down
    shutting_down = True
    for _ in range(300):
        if active_requests == 0:
            break
        await asyncio.sleep(0.1)
    logger.info("Service shutdown complete")

app = FastAPI(title="LangChain Production API", version="1.0.0", lifespan=lifespan)

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        global active_requests
        if shutting_down:
            return JSONResponse(status_code=503, content={"error": "服务正在重启"})
        trace_id = str(uuid.uuid4())[:8]
        request.state.trace_id = trace_id
        start = time.time()
        active_requests += 1
        try:
            response = await call_next(request)
            response.headers["X-Trace-Id"] = trace_id
            logger.info("Request", extra={
                "trace_id": trace_id,
                "path": request.url.path,
                "status": response.status_code,
                "duration_ms": round((time.time() - start) * 1000, 2),
            })
            return response
        finally:
            active_requests -= 1

app.add_middleware(RequestLoggingMiddleware)

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error("Unhandled exception", exc_info=True,
                 extra={"path": str(request.url)})
    return JSONResponse(
        status_code=500,
        content={"error": "服务内部错误",
                 "detail": str(exc) if settings.env == "development" else "请稍后重试"},
    )

# ─── 接口 ─────────────────────────────────────────────────────────────────────

@app.get("/health/live")
def liveness():
    if shutting_down:
        raise HTTPException(status_code=503, detail="shutting down")
    return {"status": "alive"}

@app.get("/health/ready")
def readiness():
    try:
        redis_client.ping()
    except Exception as e:
        raise HTTPException(status_code=503, detail={"redis": str(e)})
    return {"status": "ready"}

@app.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    x_user_id: str = Header(default="anonymous"),
):
    allowed, remaining = check_rate_limit(x_user_id)
    if not allowed:
        return JSONResponse(
            status_code=429,
            content={"error": "请求过于频繁,请稍后重试"},
            headers={"Retry-After": str(settings.rate_limit_window),
                     "X-RateLimit-Remaining": "0"},
        )
    async with semaphore:
        try:
            answer = await asyncio.wait_for(
                chain.ainvoke({"question": request.question}),
                timeout=settings.request_timeout,
            )
        except asyncio.TimeoutError:
            raise HTTPException(status_code=504, detail="请求超时,请简化问题后重试")
    return ChatResponse(answer=answer, session_id=request.session_id)

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        try:
            async with semaphore:
                async for chunk in chain.astream({"question": request.question}):
                    if chunk:
                        yield f"data: {json.dumps({'token': chunk, 'done': False}, ensure_ascii=False)}\n\n"
            yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"
        except Exception as e:
            logger.error("Stream error", exc_info=True)
            yield f"data: {json.dumps({'error': str(e), 'done': True})}\n\n"
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

@app.post("/chat/session", response_model=ChatResponse)
async def chat_session(request: ChatRequest):
    history = session_store.get_history(request.session_id)
    async with semaphore:
        try:
            answer = await asyncio.wait_for(
                session_chain.ainvoke({
                    "question": request.question,
                    "chat_history": history,
                }),
                timeout=settings.request_timeout,
            )
        except asyncio.TimeoutError:
            raise HTTPException(status_code=504, detail="请求超时")
    session_store.append(request.session_id, request.question, answer)
    return ChatResponse(answer=answer, session_id=request.session_id)

@app.delete("/chat/session/{session_id}")
def clear_session(session_id: str):
    session_store.clear(session_id)
    return {"message": f"会话 {session_id} 已清除"}
# 启动
gunicorn production_app:app \
  --worker-class uvicorn.workers.UvicornWorker \
  --workers 4 \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --graceful-timeout 30

十六、常见坑与最佳实践

坑一:在 async 函数中调用同步 invoke

# ❌ 阻塞整个事件循环,并发性能归零
answer = chain.invoke({"question": request.question})

# ✅ 使用异步版本
answer = await chain.ainvoke({"question": request.question})

坑二:每次请求都重新初始化模型

# ❌ 每次请求都 new 一个 ChatOpenAI,建立 HTTP 连接池开销大
@app.post("/chat")
async def chat(request: ChatRequest):
    llm = ChatOpenAI(model="gpt-4o-mini")   # 不要在这里初始化
    ...

# ✅ 模块级别初始化,复用连接池
llm = ChatOpenAI(model="gpt-4o-mini")       # 全局一次

坑三:流式接口忘记关闭 Nginx 缓冲

# ❌ Nginx 默认缓冲响应,流式效果消失,用户等完整响应才收到数据
return StreamingResponse(generate(), media_type="text/event-stream")

# ✅ 显式关闭缓冲
return StreamingResponse(
    generate(),
    media_type="text/event-stream",
    headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)

坑四:asyncio.Semaphore 在多进程下失效

# ❌ Semaphore 只在单进程内有效
# gunicorn --workers 4 启动 4 个进程,每个进程各有独立的 Semaphore(10)
# 实际并发上限是 40,而不是 10
semaphore = asyncio.Semaphore(10)

# ✅ 全局并发控制应放在 Redis 中,用 SETNX 或 Lua 脚本实现分布式限流
# 进程内 Semaphore 只用于控制单进程内的并发,不能跨进程

坑五:语义缓存的相似度阈值设置不当

# ❌ 阈值过低(0.7):不相似的问题共享答案,出现答非所问
RedisSemanticCache(score_threshold=0.7)

# ❌ 阈值过高(0.99):几乎不命中缓存,形同虚设
RedisSemanticCache(score_threshold=0.99)

# ✅ 0.93~0.96 是经验上的合理区间,上线前在实际数据集上验证
RedisSemanticCache(score_threshold=0.95)

坑六:会话历史无上限导致 context 过长

# ❌ 历史消息无限增长,最终超出模型 context 窗口
history.append(HumanMessage(...))
history.append(AIMessage(...))

# ✅ 保留最近 N 轮,并在存储层截断
if len(history) > 40:      # 最多保留 20 轮(40 条消息)
    history = history[-40:]

十七、总结

环节 关键实现 核心价值
异步接口 ainvoke / astream 不阻塞事件循环,支持高并发
流式输出 StreamingResponse + SSE 用户实时看到生成过程
重试 .with_retry(stop_after_attempt=3) 透明处理瞬时 API 错误
降级 .with_fallbacks([备用模型]) 主模型故障时自动切换
超时保护 asyncio.wait_for(timeout=N) 防止慢请求拖垮服务
并发控制 进程内 Semaphore + Redis 分布式限流 避免触发 API Rate Limit
缓存 RedisCache / RedisSemanticCache 相同问题不重复调用 LLM,降成本
速率限制 Redis 滑动窗口 防单用户滥用,保护 API 配额
会话管理 Redis 持久化 chat_history 多轮对话跨进程、跨重启保持上下文
结构化日志 JSON 格式输出 + trace_id 便于日志平台采集、过滤和告警
健康检查 存活探针 + 就绪探针分离 Kubernetes 滚动更新零停机
优雅关闭 lifespan 等待请求完成 避免更新时丢失正在处理的请求
成本监控 get_openai_callback + Token 统计 量化每次调用费用,及时发现异常消耗

🎯 LLM 应用上线的本质是一次工程化过程:把"能跑"变成"可靠地跑"。重试解决偶发错误,降级解决系统故障,缓存降成本,速率限制防滥用,会话管理保体验——每一项都是生产环境的标配,缺一不可。


参考资料


系列完结

至此,LangChain 系列十一篇全部完成。从第一篇"为什么不直接调 API"到最后一篇"从 Demo 到上线",完整覆盖了:

阶段 篇章 主题
打地基 第 1-3 篇 LangChain 基础、LCEL、Prompt 工程
做 RAG 第 4-6 篇 RAG 基础、RAG 进阶、RAG 评估
做 Agent 第 7-9 篇 Tools、Agent、LangGraph
生产实战 第 10-11 篇 LangSmith 监控、生产部署

🎯 框架只是工具,解决真实问题才是目标。这套技术栈的价值,在工程师把它用在具体产品上的那一刻才真正兑现。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐