SSE 断连重连导致的上下文错位
【今日问题】
聊天产品里,用户用着用着,大模型回复到一半,网络抖了一下,SSE 连接断了。UI 上自动重连后,为什么 LLM "忘了" 自己刚才说到哪儿了?
【真实场景】
某 SaaS 客服 Agent 上线第 2 个月,工程师做了流式输出(逐字打字效果)。某天客服反馈:
"用户问了一个长问题,AI 正在回复,突然页面卡住。等 2 秒后,AI 又开始回复,但接上的内容跟前面完全对不上——上半句在讲退款流程,下半句突然跳到物流时效,中间逻辑完全断裂。"
后台日志里也能看到诡异现象:同一 conversation_id 下出现了两个 parent_message_id 不同的并发请求,LLM 收到的上下文被截断到某个中间状态,于是基于"半句话"继续胡编。
这就是典型的 SSE 断连重连导致的上下文错位。这类问题在 2025–2026 年的 AI Chat 产品里频繁出现(ChatGPT、Claude、通义千问都有过类似 bug report)。
【思考盲区】
写流式输出时,90% 的开发者会踩下面这些坑:
-
"SSE 重连 = 再发一次请求":前端
EventSource默认断连后会自动重连,但重连时只携带Last-Event-ID,不会把会话状态重新发一次。后端拿到Last-Event-ID不知道从哪儿继续。 -
"对话历史在前端维护就够了":把 messages 数组放在 React state 里,刷新一下就丢一半;或者断网时 React state 没及时持久化,重连后用了一个旧版本的上下文。
-
"Token 流和事件流是同一条流":把业务事件(工具调用、用户中断、上下文压缩)和 LLM token 流挤在同一个 SSE 连接里,断连时业务事件丢失,但 token 还在继续生成,造成状态不一致。
-
"服务端是无状态的,客户端重试就行":LLM 是有状态的(自带 KV cache、对话历史),客户端重试不会自动恢复 LLM 那边的内部状态,只会从你给的 messages 重新生成,于是出现"AI 不记得自己说过什么"。
【逐步拆解】
一、现象复现(最小示例)
下面这段代码模拟了一个最常见的"翻车"现场:
# 错误示范:flask_sse_demo.py
from flask import Flask, Response, stream_with_context
import time, json
app = Flask(__name__)
@app.route("/chat/stream")
def chat_stream():
def generate():
# 假设用户问了一个长问题,LLM 准备流式输出 50 个 token
for i in range(50):
time.sleep(0.1) # 模拟生成耗时
yield f"data: {json.dumps({'token': f'片段{i}'})}\n\n"
if i == 25:
# 假设此时网络抖动,SSE 中断
raise ConnectionError("client disconnected")
return Response(stream_with_context(generate()), mimetype="text/event-stream")
前端用 EventSource 监听,断网 3 秒后浏览器自动重连:
// 前端
const es = new EventSource("/chat/stream?conv_id=abc123");
es.onmessage = (e) => console.log("收到:", JSON.parse(e.data).token);
// 重连后,es 收到的事件从 lastEventId 开始,
// 但服务端 generate() 已经重启,从头再来,
// 用户看到两段不连贯的内容
现象:用户看到「片段0-片段25」→ 黑屏 3 秒 → 「片段0-片段25」又来一遍 → 然后可能接上「片段26-片段49」。上下文看着是断的,实际是重复的。
二、根因分析(比喻 + 技术解释)
我们用一个比喻来理解这件事:
SSE 流式输出就像快递员送货。 快递员(SSE)从仓库(LLM)把 50 个包裹(token)送到你家(浏览器)。 走到第 25 个包裹时,他摔了一跤(网络断了)。 快递公司说"没事,站起来继续送"——但他从哪儿捡起第 25 个包裹?
如果他不记得自己送过哪几个,就只能从仓库重新搬一遍(重复生成);
如果他有签收单(lastEventId),最多只能知道"我送到了 25 号",但没法把 25 号之后的内容从 LLM 那边要回来——因为 LLM 的 KV cache 跟这次连接是绑定的。
技术上,SSE(Server-Sent Events)是一种单向、基于 HTTP 长轮询的协议。它有这些先天限制:
| 限制 | 后果 |
|---|---|
| HTTP 1.1 长连接,服务端无法主动感知客户端断连(要等下一次写入失败) | 断连时服务端还在傻傻生成 token,浪费算力 |
| 断连后服务端状态全部清空(generate() 退出,局部变量销毁) | 重连是从零开始,LLM KV cache 无法续上 |
浏览器重连只发 Last-Event-ID,不带业务上下文 |
后端要靠这个 ID 去"猜"从哪儿继续,但往往猜错 |
| HTTP/1.1 一个连接一次请求,无法做真正的"双向流" | 想中途修改上下文(比如用户补充信息)做不到 |
根本原因:流式输出的「生成状态」(LLM 那边)、「传输状态」(SSE 连接)、「业务状态」(对话历史)是三套独立的状态机,但被错误地当成了一回事。断连时它们三者不同步,就出现了上下文错位。
三、解决方案(三种,对比优劣)
方案 A:「幂等重放 + 断点续传」
服务端用 Last-Event-ID 续传已生成的内容,不再调用 LLM,直接从缓存/数据库读出后续 token。
# 方案 A 核心:断点续传
import redis
r = redis.Redis()
def stream_with_resume(conv_id: str, last_event_id: str):
# 1. 查缓存里这个会话已经生成到哪个 event id
cache_key = f"stream:{conv_id}"
if last_event_id:
# 跳过已经发送过的
cursor = int(last_event_id)
else:
cursor = 0
# 2. 从 cursor 开始,把后续 token 重新发一遍(幂等)
while True:
token = r.lindex(cache_key, cursor)
if token is None:
break
yield f"id: {cursor}\ndata: {token}\n\n"
cursor += 1
-
✅ 优点:实现简单,断网重连对用户透明
-
❌ 缺点:重连后整段重新发送,如果 LLM 已经生成到第 1000 个 token,重连要等很久;且 LLM 的算力被白白浪费(因为 LLM 那边其实已经停了)
方案 B:「会话状态外置 + 客户端持久化」(推荐 ⭐)
把对话历史从"绑在 SSE 连接上"改成"绑在 conversation_id 上",服务端用 conversation_id 作为状态隔离的 key,重连后用同一个 conversation_id 恢复 LLM 调用。
# 方案 B 核心:状态与传输解耦
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import uuid
class ConversationManager:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", streaming=True)
self.history_store = {} # 生产用 Redis
def get_history(self, conv_id: str):
return self.history_store.get(conv_id, [])
def append_message(self, conv_id: str, msg):
self.history_store.setdefault(conv_id, []).append(msg)
# 持久化到 Redis,TTL 设 24h
# r.setex(f"conv:{conv_id}", 86400, json.dumps(messages))
def stream_reply(self, conv_id: str, user_input: str, last_event_id: int = 0):
# 1. 加载完整历史(与 SSE 连接无关)
history = self.get_history(conv_id)
history.append(HumanMessage(content=user_input))
# 2. 流式生成
full_reply = ""
for chunk in self.llm.stream(history):
full_reply += chunk.content
yield f"id: {len(full_reply)}\ndata: {chunk.content}\n\n"
# 3. 写入历史(供下次断连恢复)
self.append_message(conv_id, AIMessage(content=full_reply))
前端用 EventSource 配合重连机制:
// 前端:把消息列表与传输解耦
class ChatClient {
constructor(convId) {
this.convId = convId;
this.messages = JSON.parse(localStorage.getItem(`msgs:${convId}`) || "[]");
this.lastEventId = 0;
}
async send(userInput) {
// 1. 立刻把用户消息持久化
this.messages.push({ role: "user", content: userInput });
localStorage.setItem(`msgs:${this.convId}`, JSON.stringify(this.messages));
// 2. 开启流(带 lastEventId 实现断点续传)
const url = new URL("/chat/stream", location.origin);
url.searchParams.set("conv_id", this.convId);
if (this.lastEventId) url.searchParams.set("last_event_id", this.lastEventId);
const es = new EventSource(url);
es.addEventListener("message", (e) => {
this.lastEventId = parseInt(e.lastEventId);
// 实时更新 UI...
});
es.addEventListener("done", () => {
es.close();
// 把 AI 回复写进历史
this.messages.push({ role: "assistant", content: this.assembledReply });
localStorage.setItem(`msgs:${this.convId}`, JSON.stringify(this.messages));
});
}
}
-
✅ 优点:彻底解决上下文错位,重连后用同一个
conv_id自动续上;前端可中途刷新页面也不丢上下文 -
❌ 缺点:需要维护
conversation_id状态机,前端要做消息持久化
方案 C:「WebSocket 替代 SSE + 服务端会话」(适合长连接场景)
直接放弃 SSE,用 WebSocket 做真正的双向流,服务端用内存/Redis 维护会话状态。
-
✅ 优点:WebSocket 双向通信,可中途打断、改上下文
-
❌ 缺点:运维复杂度高(企业网/防火墙不友好),不适合 AI Chat 通用场景
对比总结:
| 维度 | A. 幂等重放 | B. 状态外置(推荐) | C. WebSocket |
|---|---|---|---|
| 实现复杂度 | 低 | 中 | 高 |
| 重连体验 | 一般(重新发整段) | 好(续传) | 好(双向流) |
| 资源浪费 | 高(LLM 重复生成) | 低 | 低 |
| 适合场景 | 短回答(<500 token) | 通用 AI Chat | 实时协作/Agent |
四、代码示例(完整可运行 FastAPI 版)
把方案 B 跑起来:
# 完整 demo:sse_chat_resumable.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
import json, asyncio, os
app = FastAPI()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True, api_key=os.getenv("OPENAI_API_KEY"))
# 生产环境换成 Redis
CONV_STORE = {}
def load_history(conv_id: str):
return CONV_STORE.setdefault(conv_id, [
SystemMessage(content="你是一个友好的助手。")
])
def save_history(conv_id: str, messages):
CONV_STORE[conv_id] = messages
@app.get("/chat/stream")
async def stream_chat(conv_id: str, user_input: str, last_event_id: int = 0):
history = load_history(conv_id)
history.append(HumanMessage(content=user_input))
async def event_generator():
full_reply = ""
event_id = last_event_id
try:
async for chunk in llm.astream(history):
token = chunk.content
full_reply += token
event_id += 1
yield f"id: {event_id}\ndata: {json.dumps({'token': token}, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.01) # 让出事件循环
except Exception as e:
# 客户端断连时,异常会传到这
print(f"客户端断连,但已生成 {len(full_reply)} 字符,保留到历史")
finally:
# 关键:不管有没有断,都要把回复写入历史
if full_reply:
history.append(AIMessage(content=full_reply))
save_history(conv_id, history)
yield "event: done\ndata: {}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
测试断点续传:
# 终端 1:启动服务 uvicorn sse_chat_resumable:app --reload # 终端 2:用 curl 模拟断连 curl -N "http://localhost:8000/chat/stream?conv_id=demo1&user_input=讲个长笑话" # 看到一半按 Ctrl+C 断开 # 再连时带 last_event_id,服务端从该位置继续
【避坑指南】
-
❌ 不要把 messages 数组只放在 React state:刷新即丢,断网即丢。必须用 localStorage / IndexedDB 持久化(或用 TanStack Query 的 persistor)。
-
❌ 不要用 SSE 传业务事件:工具调用、用户中断、上下文压缩这些事件单独走 WebSocket 或 HTTP,否则 SSE 一断,业务事件就全丢了。
-
❌ 不要在重连时复用旧的
eventSource对象:EventSource重连是自动的,但状态在闭包里的(比如lastEventId、已渲染 token)需要你自己管理。建议重连时关闭旧 es,新建一个,把状态显式传过去。 -
❌ 不要在服务端用全局变量缓存 LLM 状态:多用户并发会串。必须用
conversation_id隔离,缓存键加conv_id前缀。 -
⚠️ 一定要在
finally块里保存历史:流式输出抛异常时(常见于客户端断连),full_reply里已经有部分内容,必须写进历史,否则下次重连从空开始。 -
⚠️
last_event_id要用单调递增的数字,不要用 UUID:服务端拿到数字才知道"上次发到第几个 token",UUID 没法做断点续传。 -
⚠️ 客户端 EventSource 默认重试间隔是 3 秒,生产环境要调:设置
es.retry = 1000(1 秒)能显著改善弱网体验。
【你来做一做】
任务:在上面的 FastAPI 示例基础上,加一个「断点续传」的端点。
需求:
-
新增
GET /chat/resume?conv_id=xxx&last_event_id=N端点 -
从
CONV_STORE加载该会话的最后一条 AI 回复 -
跳过前 N 个字符,只返回第 N+1 个字符之后的内容
-
思考:这种"重发"的方式在长回答(>2000 token)时会有什么问题?你会怎么优化?(提示:考虑「服务端把 LLM 的 token 缓存到 Redis,用 stream id 做索引」)
完成后,把答案贴在评论区,我们一起 review 代码!
📚 延伸阅读
-
MDN: EventSource 自动重连机制
-
LangChain 文档:ChatModel.astream 的状态管理
更多推荐
所有评论(0)