LangChain 系列·(十一):从 Demo 到上线——生产部署实战
LangChain 系列 · 第十一篇:从 Demo 到上线——生产部署实战
🎯 适合人群:已完成 LangChain 应用开发,想将其部署为生产可用服务的工程师
⏱️ 阅读时间:约 50 分钟
💬 本文介绍如何用 FastAPI 封装 LangChain 应用、实现 SSE 流式输出、处理并发与异步、设置重试与降级,以及控制 Token 成本
一、Demo 与生产环境的差距
一个在本地跑通的 LangChain Demo 和一个生产可用的服务之间,隔着一批工程问题:
| 维度 | Demo | 生产环境 |
|---|---|---|
| 并发 | 单用户顺序执行 | 多用户同时请求,需要异步处理 |
| 响应体验 | 等待完整结果 | 流式输出,用户实时看到生成过程 |
| 错误处理 | 抛出异常程序崩溃 | 重试、降级、错误响应规范化 |
| 成本控制 | 不关心 Token 消耗 | 监控用量,防止超支 |
| 超时 | 无限等待 | 请求超时保护 |
| 密钥管理 | 硬编码或 .env 文件 | 环境变量注入,密钥轮换 |
本文逐一解决这些问题,最终交付一个可以直接上线的 LangChain 服务骨架。
二、技术栈选型
Production Stack:
┌─────────────────────────────────────────────┐
│ Client (Browser / Mobile / Other Service) │
└──────────────────┬──────────────────────────┘
│ HTTP / SSE
┌──────────────────▼──────────────────────────┐
│ FastAPI (async, streaming, validation) │
└──────────────────┬──────────────────────────┘
│
┌──────────────────▼──────────────────────────┐
│ LangChain / LangGraph (async chains) │
└──────────────────┬──────────────────────────┘
│
┌──────────────────▼──────────────────────────┐
│ OpenAI / Anthropic / Other LLM Provider │
└─────────────────────────────────────────────┘
- FastAPI:Python 生态中性能最强的异步 Web 框架,原生支持 async/await 和 SSE(Server-Sent Events)
- SSE(Server-Sent Events):服务器向客户端推送流式数据的 HTTP 标准协议,比 WebSocket 更轻量,适合单向流式输出场景
📝 SSE(Server-Sent Events) 是一种基于 HTTP 的服务器推送技术。服务器保持连接打开,持续向客户端发送
data: ...\n\n格式的文本事件,客户端通过EventSourceAPI 或普通 HTTP 流读取。LLM 流式输出是 SSE 最典型的应用场景之一。
pip install fastapi uvicorn langchain-openai python-dotenv
三、基础结构:同步接口
先搭建最简单的同步版本,验证接口结构:
# app.py
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
load_dotenv()
# --- 请求/响应模型 ---
class ChatRequest(BaseModel):
question: str
session_id: str = "default"
class ChatResponse(BaseModel):
answer: str
session_id: str
# --- Chain 初始化 ---
# 在模块级别初始化,避免每次请求都重新创建模型实例
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的 LangChain 技术助手,回答简洁准确,适当提供代码示例。"),
("human", "{question}"),
])
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = prompt | llm | StrOutputParser()
# --- FastAPI 应用 ---
app = FastAPI(title="LangChain API", version="1.0.0")
@app.get("/health")
def health_check():
"""健康检查接口,用于负载均衡器探活"""
return {"status": "ok"}
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
# ainvoke 是 invoke 的异步版本,不阻塞事件循环
answer = await chain.ainvoke({"question": request.question})
return ChatResponse(answer=answer, session_id=request.session_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
uvicorn app:app --reload --port 8000
💡 所有 LangChain 的 Runnable 都有对应的异步方法:
ainvoke、astream、abatch。在 FastAPI 这类异步框架中,必须使用异步方法,否则 LLM 的阻塞调用会占用事件循环,导致服务无法处理其他并发请求。
四、流式输出:SSE 实现
流式输出是 LLM 服务的标配体验——用户不必等待完整响应,可以实时看到生成过程。
from fastapi.responses import StreamingResponse
import asyncio
import json
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""
流式输出接口,返回 SSE 格式的数据流。
SSE 格式:每条消息以 "data: " 开头,以 "\n\n" 结尾
客户端用 EventSource 或读取 HTTP 流接收
"""
async def generate():
try:
# astream 逐 token 返回生成结果
async for chunk in chain.astream({"question": request.question}):
if chunk:
# SSE 标准格式
data = json.dumps({"token": chunk, "done": False},
ensure_ascii=False)
yield f"data: {data}\n\n"
# 发送结束信号
yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"
except Exception as e:
error_data = json.dumps({"error": str(e), "done": True})
yield f"data: {error_data}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 关闭 Nginx 缓冲,确保实时推送
},
)
客户端接收示例(Python):
import httpx
async def receive_stream(question: str):
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream(
"POST",
"http://localhost:8000/chat/stream",
json={"question": question},
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data.get("done"):
print()
break
print(data["token"], end="", flush=True)
import asyncio
asyncio.run(receive_stream("什么是 LangGraph 的 Checkpointer?"))
客户端接收示例(JavaScript):
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ question: '什么是 LCEL?' }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split('\n\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.done) break;
process.stdout.write(data.token); // 或更新 DOM
}
}
}
五、重试与降级
LLM API 调用会遇到各种瞬时错误:速率限制(429)、服务超时(504)、临时不可用(503)。生产服务必须处理这些情况。
5.1 LangChain 内置重试
from langchain_openai import ChatOpenAI
# with_retry:在 LangChain 层面自动重试
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).with_retry(
retry_if_exception_type=(Exception,), # 哪些异常触发重试
stop_after_attempt=3, # 最多重试 3 次
wait_exponential_jitter=True, # 指数退避 + 随机抖动,避免重试风暴
)
📝 指数退避(Exponential Backoff) 是一种重试策略:每次重试的等待时间按指数增长(1s → 2s → 4s),配合随机抖动(jitter)防止多个客户端同时重试造成的请求洪峰。
5.2 模型降级:主备切换
当主要模型不可用时,自动切换到备用模型:
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# 主模型
primary_llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 备用模型(成本更低,但能力稍弱)
fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# with_fallbacks:主模型失败时自动切换到备用
llm_with_fallback = primary_llm.with_fallbacks(
[fallback_llm],
exceptions_to_handle=(Exception,),
)
chain = prompt | llm_with_fallback | StrOutputParser()
5.3 请求超时保护
from langchain_core.runnables import RunnableConfig
import asyncio
async def chat_with_timeout(question: str, timeout: float = 30.0) -> str:
"""带超时保护的 Chain 调用"""
try:
result = await asyncio.wait_for(
chain.ainvoke(
{"question": question},
config=RunnableConfig(max_concurrency=1),
),
timeout=timeout,
)
return result
except asyncio.TimeoutError:
raise HTTPException(
status_code=504,
detail=f"请求超时({timeout}s),请稍后重试或简化问题"
)
5.4 完整的错误处理层
from fastapi import Request
from fastapi.responses import JSONResponse
import logging
logger = logging.getLogger(__name__)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""全局异常处理,统一错误响应格式"""
logger.error(f"未处理异常: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "服务内部错误",
"detail": str(exc) if app.debug else "请联系管理员",
},
)
六、并发控制与连接池
LLM API 通常有并发限制(如 OpenAI 的 RPM/TPM 限制)。不加控制地并发请求会触发 429 错误。
6.1 信号量限制并发数
import asyncio
from typing import Optional
# 信号量:限制同时进行的 LLM 调用数量
llm_semaphore = asyncio.Semaphore(10) # 最多 10 个并发 LLM 调用
async def call_llm_with_semaphore(question: str) -> str:
async with llm_semaphore:
return await chain.ainvoke({"question": question})
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
answer = await call_llm_with_semaphore(request.question)
return ChatResponse(answer=answer, session_id=request.session_id)
📝 信号量(Semaphore) 是并发控制的基本原语,限制同时执行某段代码的协程数量。
asyncio.Semaphore(10)表示最多允许 10 个协程同时持有该信号量,第 11 个协程会等待直到有一个释放。
6.2 批量请求
from langchain_core.runnables import RunnableConfig
@app.post("/chat/batch")
async def chat_batch(requests: list[ChatRequest]):
"""批量处理多个问题,比串行调用更高效"""
questions = [{"question": r.question} for r in requests]
# abatch 并发处理所有请求,max_concurrency 控制并发数
answers = await chain.abatch(
questions,
config=RunnableConfig(max_concurrency=5),
)
return [
ChatResponse(answer=ans, session_id=req.session_id)
for req, ans in zip(requests, answers)
]
七、Token 用量监控与成本控制
Token 费用是 LLM 应用最主要的变动成本,不加监控很容易超支。
7.1 记录每次调用的 Token 用量
from langchain_core.callbacks import UsageMetadata
from langchain_openai import ChatOpenAI
# callback_manager 在每次调用后触发回调
token_usage_log = []
from langchain_core.callbacks.base import BaseCallbackHandler
class TokenUsageCallback(BaseCallbackHandler):
"""记录每次 LLM 调用的 Token 用量"""
def on_llm_end(self, response, **kwargs):
usage = response.llm_output.get("token_usage", {})
token_usage_log.append({
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
})
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
callbacks=[TokenUsageCallback()],
)
7.2 估算请求成本
# gpt-4o-mini 当前价格(美元/1M tokens,以实际价格为准)
PRICING = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00},
}
def estimate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
"""估算本次调用的美元成本"""
if model not in PRICING:
return 0.0
price = PRICING[model]
cost = (prompt_tokens * price["input"] + completion_tokens * price["output"]) / 1_000_000
return round(cost, 6)
# 在接口中返回用量信息
@app.post("/chat/with-usage")
async def chat_with_usage(request: ChatRequest):
from langchain_community.callbacks import get_openai_callback
with get_openai_callback() as cb:
answer = await chain.ainvoke({"question": request.question})
cost = estimate_cost("gpt-4o-mini", cb.prompt_tokens, cb.completion_tokens)
return {
"answer": answer,
"usage": {
"prompt_tokens": cb.prompt_tokens,
"completion_tokens": cb.completion_tokens,
"total_tokens": cb.total_tokens,
"estimated_cost_usd": cost,
},
}
7.3 输入长度限制
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
"""计算文本的 Token 数量"""
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
MAX_INPUT_TOKENS = 2000 # 限制用户输入的最大 Token 数
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
token_count = count_tokens(request.question)
if token_count > MAX_INPUT_TOKENS:
raise HTTPException(
status_code=400,
detail=f"输入过长({token_count} tokens),请控制在 {MAX_INPUT_TOKENS} tokens 以内",
)
answer = await chain.ainvoke({"question": request.question})
return ChatResponse(answer=answer, session_id=request.session_id)
八、完整的生产服务骨架
将以上所有组件整合为一个完整的生产就绪服务:
# production_app.py
import asyncio
import json
import logging
import os
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ─── 配置 ────────────────────────────────────────────────────────────────────
class Settings:
model: str = os.getenv("LLM_MODEL", "gpt-4o-mini")
max_concurrency: int = int(os.getenv("MAX_CONCURRENCY", "10"))
request_timeout: float = float(os.getenv("REQUEST_TIMEOUT", "30.0"))
max_input_tokens: int = int(os.getenv("MAX_INPUT_TOKENS", "2000"))
env: str = os.getenv("APP_ENV", "development")
settings = Settings()
# ─── 模型与 Chain ─────────────────────────────────────────────────────────────
prompt = ChatPromptTemplate.from_messages([
(
"system",
"你是一个专业的 LangChain 技术助手。\n"
"回答简洁准确,技术问题附代码示例,回答不超过 300 字。"
),
("human", "{question}"),
])
# 主模型 + 备用模型 + 重试
primary_llm = ChatOpenAI(model=settings.model, temperature=0).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True,
)
fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm = primary_llm.with_fallbacks([fallback_llm])
chain = prompt | llm | StrOutputParser()
# 并发信号量
semaphore = asyncio.Semaphore(settings.max_concurrency)
# ─── 请求/响应模型 ────────────────────────────────────────────────────────────
class ChatRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=1000, description="用户问题")
session_id: str = Field(default="default", description="会话 ID")
class ChatResponse(BaseModel):
answer: str
session_id: str
# ─── FastAPI 应用 ─────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(f"服务启动,模型:{settings.model},环境:{settings.env}")
yield
logger.info("服务关闭")
app = FastAPI(
title="LangChain Production API",
version="1.0.0",
lifespan=lifespan,
)
# ─── 全局异常处理 ─────────────────────────────────────────────────────────────
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"未处理异常 [{request.method} {request.url}]: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "服务内部错误",
"detail": str(exc) if settings.env == "development" else "请稍后重试",
},
)
# ─── 接口 ─────────────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok", "model": settings.model, "env": settings.env}
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
async with semaphore:
try:
answer = await asyncio.wait_for(
chain.ainvoke({"question": request.question}),
timeout=settings.request_timeout,
)
return ChatResponse(answer=answer, session_id=request.session_id)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="请求超时,请简化问题后重试")
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
try:
async with semaphore:
async for chunk in chain.astream({"question": request.question}):
if chunk:
data = json.dumps(
{"token": chunk, "done": False}, ensure_ascii=False
)
yield f"data: {data}\n\n"
yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"
except Exception as e:
logger.error(f"流式输出异常: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e), 'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# 启动服务
uvicorn production_app:app --host 0.0.0.0 --port 8000 --workers 4
九、缓存:相同问题不重复调用 LLM
LLM 调用是整个链路中最昂贵的操作。对于相同的问题,重复调用既浪费成本又增加延迟。语义缓存能有效解决这个问题。
9.1 精确缓存(InMemoryCache)
适合开发环境和单进程部署,完全相同的输入命中缓存:
from langchain_core.globals import set_llm_cache
from langchain_community.cache import InMemoryCache
# 设置全局 LLM 缓存,之后所有 LLM 调用自动走缓存
set_llm_cache(InMemoryCache())
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# 第一次调用:实际请求 LLM
result1 = llm.invoke([HumanMessage(content="什么是向量数据库?")])
# 第二次相同输入:直接从缓存返回,不调用 LLM
result2 = llm.invoke([HumanMessage(content="什么是向量数据库?")])
9.2 语义缓存(SemanticSimilarityExactMatchCache)
精确缓存只能命中完全一致的输入。语义缓存通过向量相似度匹配,让"什么是向量数据库"和"向量数据库是什么"命中同一条缓存:
from langchain_openai import OpenAIEmbeddings
from langchain_community.cache import RedisSemanticCache
# 语义缓存:基于向量相似度,相似问题共享同一条缓存
semantic_cache = RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
score_threshold=0.95, # 相似度超过 0.95 才命中缓存,越高越严格
)
set_llm_cache(semantic_cache)
⚠️ 语义缓存需要额外的 Embedding 调用,本身也有成本。适合问题重复率高(如知识库问答、FAQ)的场景;对于每次都不同的创意生成类任务,缓存收益接近零。
9.3 Redis 缓存(生产多进程环境)
InMemoryCache 是进程内的,多进程部署下各进程缓存独立,命中率低。生产环境应使用 Redis 实现跨进程共享的缓存:
pip install redis langchain-community
from langchain_community.cache import RedisCache
import redis
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
decode_responses=True,
)
set_llm_cache(RedisCache(redis_=redis_client, ttl=3600)) # 缓存 1 小时
十、速率限制:防止接口被滥用
生产环境的 LLM 服务必须对单用户的请求频率加以限制,否则一个用户的大量请求就能耗尽 API 配额或使账单暴涨。
10.1 基于 Redis 的滑动窗口限流
pip install redis
import time
import redis
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def check_rate_limit(
user_id: str,
max_requests: int = 20,
window_seconds: int = 60,
) -> tuple[bool, int]:
"""
滑动窗口速率限制。
Returns:
(allowed, remaining): 是否允许请求,以及当前窗口内剩余次数
"""
key = f"rate_limit:{user_id}"
now = time.time()
window_start = now - window_seconds
pipe = redis_client.pipeline()
# 删除窗口外的旧记录
pipe.zremrangebyscore(key, 0, window_start)
# 记录本次请求时间戳
pipe.zadd(key, {str(now): now})
# 查询窗口内的请求总数
pipe.zcard(key)
# 设置 key 过期时间,防止内存泄漏
pipe.expire(key, window_seconds)
results = pipe.execute()
count = results[2]
remaining = max(0, max_requests - count)
allowed = count <= max_requests
return allowed, remaining
在 FastAPI 中集成限流:
from fastapi import Header
from fastapi.responses import JSONResponse
@app.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest,
x_user_id: str = Header(default="anonymous"), # 从请求头获取用户 ID
):
allowed, remaining = check_rate_limit(
user_id=x_user_id,
max_requests=20, # 每分钟最多 20 次
window_seconds=60,
)
if not allowed:
return JSONResponse(
status_code=429,
content={"error": "请求过于频繁,请稍后重试"},
headers={"Retry-After": "60", "X-RateLimit-Remaining": "0"},
)
async with semaphore:
answer = await asyncio.wait_for(
chain.ainvoke({"question": request.question}),
timeout=settings.request_timeout,
)
return ChatResponse(answer=answer, session_id=request.session_id)
十一、多轮对话的会话管理
第八、九篇演示的多轮对话都是在内存中维护 chat_history,进程重启后状态丢失。生产环境需要将会话历史持久化。
11.1 基于 Redis 的会话存储
import json
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
class RedisSessionStore:
"""将对话历史持久化到 Redis"""
def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl # 会话过期时间(秒),超时自动清理
def _key(self, session_id: str) -> str:
return f"session:{session_id}"
def get_history(self, session_id: str) -> list[BaseMessage]:
"""获取会话历史"""
raw = self.redis.get(self._key(session_id))
if not raw:
return []
messages = []
for item in json.loads(raw):
if item["type"] == "human":
messages.append(HumanMessage(content=item["content"]))
elif item["type"] == "ai":
messages.append(AIMessage(content=item["content"]))
return messages
def append(self, session_id: str, human: str, ai: str):
"""追加一轮对话"""
history = self.get_history(session_id)
history.extend([
HumanMessage(content=human),
AIMessage(content=ai),
])
# 只保留最近 20 轮,防止 context 过长
if len(history) > 40:
history = history[-40:]
serialized = [
{"type": "human" if isinstance(m, HumanMessage) else "ai",
"content": m.content}
for m in history
]
self.redis.setex(self._key(session_id), self.ttl, json.dumps(serialized))
def clear(self, session_id: str):
"""清除会话"""
self.redis.delete(self._key(session_id))
session_store = RedisSessionStore(redis_client)
11.2 带会话记忆的多轮对话接口
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# Prompt 中加入 chat_history 占位符
multi_turn_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的 LangChain 技术助手,记住对话上下文,前后保持连贯。"),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{question}"),
])
multi_turn_chain = multi_turn_prompt | llm | StrOutputParser()
@app.post("/chat/session", response_model=ChatResponse)
async def chat_with_session(request: ChatRequest):
session_id = request.session_id
# 从 Redis 取出历史
history = session_store.get_history(session_id)
async with semaphore:
answer = await asyncio.wait_for(
multi_turn_chain.ainvoke({
"question": request.question,
"chat_history": history,
}),
timeout=settings.request_timeout,
)
# 将本轮对话追加到 Redis
session_store.append(session_id, request.question, answer)
return ChatResponse(answer=answer, session_id=session_id)
@app.delete("/chat/session/{session_id}")
async def clear_session(session_id: str):
"""清除指定会话的历史"""
session_store.clear(session_id)
return {"message": f"会话 {session_id} 已清除"}
十二、结构化日志
生产环境的日志应输出 JSON 格式,便于 ELK、Loki 等日志采集系统解析和过滤,而不是人类可读的纯文本。
import logging
import json
import time
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""将日志格式化为 JSON,便于日志平台采集"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# 附加 extra 字段(如 trace_id、user_id)
for key, value in record.__dict__.items():
if key not in logging.LogRecord.__dict__ and not key.startswith("_"):
log_data[key] = value
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data, ensure_ascii=False)
def setup_logging():
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.handlers = [handler]
setup_logging()
logger = logging.getLogger(__name__)
在请求处理中注入上下文信息:
import uuid
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""为每个请求生成 trace_id,记录请求耗时"""
async def dispatch(self, request: Request, call_next):
trace_id = str(uuid.uuid4())[:8]
start_time = time.time()
# 将 trace_id 注入请求状态,方便在路由函数中引用
request.state.trace_id = trace_id
logger.info(
"Request started",
extra={
"trace_id": trace_id,
"method": request.method,
"path": request.url.path,
},
)
response = await call_next(request)
duration_ms = round((time.time() - start_time) * 1000, 2)
logger.info(
"Request completed",
extra={
"trace_id": trace_id,
"status_code": response.status_code,
"duration_ms": duration_ms,
},
)
response.headers["X-Trace-Id"] = trace_id
return response
app.add_middleware(RequestLoggingMiddleware)
十三、优雅关闭与健康检查
13.1 优雅关闭
进程收到 SIGTERM 信号时(如 Kubernetes 滚动更新),应等待正在处理的请求完成后再退出,而不是立即中断:
import signal
import asyncio
# 记录当前正在处理的请求数
active_requests = 0
shutting_down = False
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(f"服务启动,模型:{settings.model},环境:{settings.env}")
yield # 服务运行期间
# 进入关闭阶段
global shutting_down
shutting_down = True
logger.info("开始优雅关闭,等待进行中的请求完成...")
# 等待所有进行中的请求完成(最多等 30 秒)
for _ in range(300):
if active_requests == 0:
break
await asyncio.sleep(0.1)
logger.info(f"服务关闭,剩余未完成请求:{active_requests}")
app = FastAPI(title="LangChain Production API", version="1.0.0", lifespan=lifespan)
@app.middleware("http")
async def track_requests(request: Request, call_next):
global active_requests
if shutting_down:
return JSONResponse(status_code=503, content={"error": "服务正在重启,请稍后重试"})
active_requests += 1
try:
return await call_next(request)
finally:
active_requests -= 1
13.2 健康检查分级
生产环境的健康检查应分为两个级别:
@app.get("/health/live")
def liveness():
"""
存活探针(Liveness Probe):进程是否存活。
Kubernetes 用此判断是否需要重启容器。
返回简单的 200 即可,不做复杂检查。
"""
if shutting_down:
raise HTTPException(status_code=503, detail="服务正在关闭")
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
"""
就绪探针(Readiness Probe):服务是否准备好接收流量。
Kubernetes 用此判断是否将流量路由到该实例。
检查依赖服务(Redis、LLM API)是否可用。
"""
checks = {}
# 检查 Redis 连接
try:
redis_client.ping()
checks["redis"] = "ok"
except Exception as e:
checks["redis"] = f"error: {e}"
# 检查 LLM API 可达性(可选,避免增加太多延迟)
checks["llm_model"] = settings.model
all_ok = all(v == "ok" or not v.startswith("error") for v in checks.values()
if isinstance(v, str))
if not all_ok:
raise HTTPException(
status_code=503,
detail={"status": "not ready", "checks": checks},
)
return {"status": "ready", "checks": checks}
💡 Kubernetes 的
livenessProbe和readinessProbe应分别指向/health/live和/health/ready。存活探针失败会触发容器重启;就绪探针失败只会暂停流量路由,不重启容器。两者用途不同,不能混用。
十四、部署注意事项
14.1 多进程部署
pip install gunicorn
gunicorn production_app:app \
--worker-class uvicorn.workers.UvicornWorker \
--workers 4 \
--bind 0.0.0.0:8000 \
--timeout 120 \
--graceful-timeout 30 # 优雅关闭等待时间
⚠️
--workers 4会启动 4 个独立进程,每个进程有自己的事件循环和信号量。如果需要全局并发控制(如 API Rate Limit),应在 Redis 中实现,而不是用进程内的asyncio.Semaphore。
14.2 密钥管理
# ❌ 不要在代码或镜像中存放密钥
OPENAI_API_KEY = "sk-xxxxxxxx"
# ✅ 生产环境通过外部密钥管理服务注入
# Kubernetes Secret:
# kubectl create secret generic llm-secrets \
# --from-literal=OPENAI_API_KEY=sk-xxx
# AWS Secrets Manager / GCP Secret Manager / Vault 同理
14.3 Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 先复制依赖文件,利用 Docker 层缓存
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV PYTHONUNBUFFERED=1
ENV APP_ENV=production
EXPOSE 8000
# 使用非 root 用户运行,提升安全性
RUN useradd -m appuser && chown -R appuser /app
USER appuser
CMD ["gunicorn", "production_app:app", \
"--worker-class", "uvicorn.workers.UvicornWorker", \
"--workers", "2", \
"--bind", "0.0.0.0:8000", \
"--timeout", "120", \
"--graceful-timeout", "30", \
"--access-logfile", "-", \
"--error-logfile", "-"]
14.4 Kubernetes 部署片段
# deployment.yaml 关键配置片段
apiVersion: apps/v1
kind: Deployment
spec:
replicas: 3
template:
spec:
containers:
- name: langchain-api
image: your-registry/langchain-api:latest
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: llm-secrets
key: OPENAI_API_KEY
- name: APP_ENV
value: production
- name: REDIS_HOST
value: redis-service
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 10
periodSeconds: 15
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
lifecycle:
preStop:
exec:
# 给 gunicorn 时间完成优雅关闭
command: ["/bin/sh", "-c", "sleep 5"]
十五、完整的生产服务骨架(整合版)
将本文所有组件整合为一份完整的、可直接用作项目起点的代码:
# production_app.py — 完整整合版
import asyncio
import json
import logging
import os
import time
import uuid
from contextlib import asynccontextmanager
import redis
from dotenv import load_dotenv
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from starlette.middleware.base import BaseHTTPMiddleware
load_dotenv()
# ─── 配置 ────────────────────────────────────────────────────────────────────
class Settings:
model: str = os.getenv("LLM_MODEL", "gpt-4o-mini")
max_concurrency: int = int(os.getenv("MAX_CONCURRENCY", "10"))
request_timeout: float = float(os.getenv("REQUEST_TIMEOUT", "30.0"))
max_input_tokens: int = int(os.getenv("MAX_INPUT_TOKENS", "2000"))
env: str = os.getenv("APP_ENV", "development")
redis_host: str = os.getenv("REDIS_HOST", "localhost")
redis_port: int = int(os.getenv("REDIS_PORT", "6379"))
rate_limit_max: int = int(os.getenv("RATE_LIMIT_MAX", "20"))
rate_limit_window: int = int(os.getenv("RATE_LIMIT_WINDOW", "60"))
settings = Settings()
# ─── 日志 ─────────────────────────────────────────────────────────────────────
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
}
for key, value in record.__dict__.items():
if key not in logging.LogRecord.__dict__ and not key.startswith("_"):
log_data[key] = value
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data, ensure_ascii=False)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
logger = logging.getLogger(__name__)
# ─── Redis ────────────────────────────────────────────────────────────────────
redis_client = redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
decode_responses=True,
)
set_llm_cache(RedisCache(redis_=redis_client, ttl=3600))
# ─── 模型与 Chain ─────────────────────────────────────────────────────────────
primary_llm = ChatOpenAI(model=settings.model, temperature=0).with_retry(
stop_after_attempt=3, wait_exponential_jitter=True,
)
fallback_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm = primary_llm.with_fallbacks([fallback_llm])
single_turn_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的 LangChain 技术助手,回答简洁准确,技术问题附代码示例。"),
("human", "{question}"),
])
multi_turn_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个专业的 LangChain 技术助手,记住对话上下文,前后保持连贯。"),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{question}"),
])
parser = StrOutputParser()
chain = single_turn_prompt | llm | parser
session_chain = multi_turn_prompt | llm | parser
semaphore = asyncio.Semaphore(settings.max_concurrency)
# ─── 请求/响应模型 ────────────────────────────────────────────────────────────
class ChatRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=1000)
session_id: str = Field(default="default")
class ChatResponse(BaseModel):
answer: str
session_id: str
# ─── 会话存储 ─────────────────────────────────────────────────────────────────
class RedisSessionStore:
def __init__(self, client: redis.Redis, ttl: int = 3600):
self.redis = client
self.ttl = ttl
def _key(self, sid: str) -> str:
return f"session:{sid}"
def get_history(self, sid: str):
raw = self.redis.get(self._key(sid))
if not raw:
return []
messages = []
for item in json.loads(raw):
cls = HumanMessage if item["type"] == "human" else AIMessage
messages.append(cls(content=item["content"]))
return messages
def append(self, sid: str, human: str, ai: str):
history = self.get_history(sid)
history.extend([HumanMessage(content=human), AIMessage(content=ai)])
if len(history) > 40:
history = history[-40:]
serialized = [
{"type": "human" if isinstance(m, HumanMessage) else "ai",
"content": m.content}
for m in history
]
self.redis.setex(self._key(sid), self.ttl, json.dumps(serialized))
def clear(self, sid: str):
self.redis.delete(self._key(sid))
session_store = RedisSessionStore(redis_client)
# ─── 速率限制 ─────────────────────────────────────────────────────────────────
def check_rate_limit(user_id: str) -> tuple[bool, int]:
key = f"rate_limit:{user_id}"
now = time.time()
window_start = now - settings.rate_limit_window
pipe = redis_client.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zadd(key, {str(now): now})
pipe.zcard(key)
pipe.expire(key, settings.rate_limit_window)
results = pipe.execute()
count = results[2]
return count <= settings.rate_limit_max, max(0, settings.rate_limit_max - count)
# ─── 生命周期与中间件 ─────────────────────────────────────────────────────────
active_requests = 0
shutting_down = False
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Service starting", extra={"model": settings.model, "env": settings.env})
yield
global shutting_down
shutting_down = True
for _ in range(300):
if active_requests == 0:
break
await asyncio.sleep(0.1)
logger.info("Service shutdown complete")
app = FastAPI(title="LangChain Production API", version="1.0.0", lifespan=lifespan)
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
global active_requests
if shutting_down:
return JSONResponse(status_code=503, content={"error": "服务正在重启"})
trace_id = str(uuid.uuid4())[:8]
request.state.trace_id = trace_id
start = time.time()
active_requests += 1
try:
response = await call_next(request)
response.headers["X-Trace-Id"] = trace_id
logger.info("Request", extra={
"trace_id": trace_id,
"path": request.url.path,
"status": response.status_code,
"duration_ms": round((time.time() - start) * 1000, 2),
})
return response
finally:
active_requests -= 1
app.add_middleware(RequestLoggingMiddleware)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error("Unhandled exception", exc_info=True,
extra={"path": str(request.url)})
return JSONResponse(
status_code=500,
content={"error": "服务内部错误",
"detail": str(exc) if settings.env == "development" else "请稍后重试"},
)
# ─── 接口 ─────────────────────────────────────────────────────────────────────
@app.get("/health/live")
def liveness():
if shutting_down:
raise HTTPException(status_code=503, detail="shutting down")
return {"status": "alive"}
@app.get("/health/ready")
def readiness():
try:
redis_client.ping()
except Exception as e:
raise HTTPException(status_code=503, detail={"redis": str(e)})
return {"status": "ready"}
@app.post("/chat", response_model=ChatResponse)
async def chat(
request: ChatRequest,
x_user_id: str = Header(default="anonymous"),
):
allowed, remaining = check_rate_limit(x_user_id)
if not allowed:
return JSONResponse(
status_code=429,
content={"error": "请求过于频繁,请稍后重试"},
headers={"Retry-After": str(settings.rate_limit_window),
"X-RateLimit-Remaining": "0"},
)
async with semaphore:
try:
answer = await asyncio.wait_for(
chain.ainvoke({"question": request.question}),
timeout=settings.request_timeout,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="请求超时,请简化问题后重试")
return ChatResponse(answer=answer, session_id=request.session_id)
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
try:
async with semaphore:
async for chunk in chain.astream({"question": request.question}):
if chunk:
yield f"data: {json.dumps({'token': chunk, 'done': False}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"
except Exception as e:
logger.error("Stream error", exc_info=True)
yield f"data: {json.dumps({'error': str(e), 'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.post("/chat/session", response_model=ChatResponse)
async def chat_session(request: ChatRequest):
history = session_store.get_history(request.session_id)
async with semaphore:
try:
answer = await asyncio.wait_for(
session_chain.ainvoke({
"question": request.question,
"chat_history": history,
}),
timeout=settings.request_timeout,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="请求超时")
session_store.append(request.session_id, request.question, answer)
return ChatResponse(answer=answer, session_id=request.session_id)
@app.delete("/chat/session/{session_id}")
def clear_session(session_id: str):
session_store.clear(session_id)
return {"message": f"会话 {session_id} 已清除"}
# 启动
gunicorn production_app:app \
--worker-class uvicorn.workers.UvicornWorker \
--workers 4 \
--bind 0.0.0.0:8000 \
--timeout 120 \
--graceful-timeout 30
十六、常见坑与最佳实践
坑一:在 async 函数中调用同步 invoke
# ❌ 阻塞整个事件循环,并发性能归零
answer = chain.invoke({"question": request.question})
# ✅ 使用异步版本
answer = await chain.ainvoke({"question": request.question})
坑二:每次请求都重新初始化模型
# ❌ 每次请求都 new 一个 ChatOpenAI,建立 HTTP 连接池开销大
@app.post("/chat")
async def chat(request: ChatRequest):
llm = ChatOpenAI(model="gpt-4o-mini") # 不要在这里初始化
...
# ✅ 模块级别初始化,复用连接池
llm = ChatOpenAI(model="gpt-4o-mini") # 全局一次
坑三:流式接口忘记关闭 Nginx 缓冲
# ❌ Nginx 默认缓冲响应,流式效果消失,用户等完整响应才收到数据
return StreamingResponse(generate(), media_type="text/event-stream")
# ✅ 显式关闭缓冲
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
坑四:asyncio.Semaphore 在多进程下失效
# ❌ Semaphore 只在单进程内有效
# gunicorn --workers 4 启动 4 个进程,每个进程各有独立的 Semaphore(10)
# 实际并发上限是 40,而不是 10
semaphore = asyncio.Semaphore(10)
# ✅ 全局并发控制应放在 Redis 中,用 SETNX 或 Lua 脚本实现分布式限流
# 进程内 Semaphore 只用于控制单进程内的并发,不能跨进程
坑五:语义缓存的相似度阈值设置不当
# ❌ 阈值过低(0.7):不相似的问题共享答案,出现答非所问
RedisSemanticCache(score_threshold=0.7)
# ❌ 阈值过高(0.99):几乎不命中缓存,形同虚设
RedisSemanticCache(score_threshold=0.99)
# ✅ 0.93~0.96 是经验上的合理区间,上线前在实际数据集上验证
RedisSemanticCache(score_threshold=0.95)
坑六:会话历史无上限导致 context 过长
# ❌ 历史消息无限增长,最终超出模型 context 窗口
history.append(HumanMessage(...))
history.append(AIMessage(...))
# ✅ 保留最近 N 轮,并在存储层截断
if len(history) > 40: # 最多保留 20 轮(40 条消息)
history = history[-40:]
十七、总结
| 环节 | 关键实现 | 核心价值 |
|---|---|---|
| 异步接口 | ainvoke / astream |
不阻塞事件循环,支持高并发 |
| 流式输出 | StreamingResponse + SSE |
用户实时看到生成过程 |
| 重试 | .with_retry(stop_after_attempt=3) |
透明处理瞬时 API 错误 |
| 降级 | .with_fallbacks([备用模型]) |
主模型故障时自动切换 |
| 超时保护 | asyncio.wait_for(timeout=N) |
防止慢请求拖垮服务 |
| 并发控制 | 进程内 Semaphore + Redis 分布式限流 | 避免触发 API Rate Limit |
| 缓存 | RedisCache / RedisSemanticCache |
相同问题不重复调用 LLM,降成本 |
| 速率限制 | Redis 滑动窗口 | 防单用户滥用,保护 API 配额 |
| 会话管理 | Redis 持久化 chat_history |
多轮对话跨进程、跨重启保持上下文 |
| 结构化日志 | JSON 格式输出 + trace_id | 便于日志平台采集、过滤和告警 |
| 健康检查 | 存活探针 + 就绪探针分离 | Kubernetes 滚动更新零停机 |
| 优雅关闭 | lifespan 等待请求完成 |
避免更新时丢失正在处理的请求 |
| 成本监控 | get_openai_callback + Token 统计 |
量化每次调用费用,及时发现异常消耗 |
🎯 LLM 应用上线的本质是一次工程化过程:把"能跑"变成"可靠地跑"。重试解决偶发错误,降级解决系统故障,缓存降成本,速率限制防滥用,会话管理保体验——每一项都是生产环境的标配,缺一不可。
参考资料
- FastAPI 官方文档
- FastAPI 流式响应
- LangChain 异步支持
- LangChain with_retry
- LangChain with_fallbacks
- LangChain Caching
- tiktoken
系列完结
至此,LangChain 系列十一篇全部完成。从第一篇"为什么不直接调 API"到最后一篇"从 Demo 到上线",完整覆盖了:
| 阶段 | 篇章 | 主题 |
|---|---|---|
| 打地基 | 第 1-3 篇 | LangChain 基础、LCEL、Prompt 工程 |
| 做 RAG | 第 4-6 篇 | RAG 基础、RAG 进阶、RAG 评估 |
| 做 Agent | 第 7-9 篇 | Tools、Agent、LangGraph |
| 生产实战 | 第 10-11 篇 | LangSmith 监控、生产部署 |
🎯 框架只是工具,解决真实问题才是目标。这套技术栈的价值,在工程师把它用在具体产品上的那一刻才真正兑现。
更多推荐



所有评论(0)