限时福利领取


LangChain智能客服实战:如何基于直属库构建高效问答系统

背景痛点:传统客服的“慢”与“乱”

去年我在一家 SaaS 公司做客服中台,高峰期每天 3 万条工单。老系统用 MySQL 全文索引 + 正则匹配,平均响应 4.8 秒,命中率只有 42%。痛点集中在这几点:

  1. 关键词检索只能做字面匹配,用户口语化提问直接翻车
  2. 知识库 18 万条 FAQ,每次 like '%xxx%' 把 CPU 打满
  3. 大模型直接回答,幻觉严重,还慢——一次 GPT-4 调用 8 秒,用户早走了

老板一句话:能不能“秒回”且“不说错”?于是我们把目光投向 LangChain + 直属库(公司内部的 PostgreSQL 集群,延迟 < 5 ms)的 RAG 方案。

技术选型:直接 LLM vs RAG

维度 直接调用大模型 RAG + 直属库
延迟 2–8 s 200–600 ms
幻觉 低(可控)
数据新鲜度 训练截止日 实时同步
成本 按 1k tokens 计费 自建向量索引,几乎 0 增量
可解释 黑盒 返回出处,可审计

结论:客服场景要“快、准、省”,RAG 完胜。

核心实现:三步搞定“直属库 + LangChain”

1. 数据预处理:把 18 万条 FAQ 变成“向量块”

先写个并行清洗脚本,把数据库里 title、content 两列捞出来,清洗掉 HTML、连续换行,再按 512 token 滑动窗口切分,overlap 10% 防止截断语义。

# etl/chunk_worker.py
import re, os, json
from sqlalchemy import create_engine
from langchain.text_splitter import RecursiveCharacterTextSplitter

DSN = "postgresql+psycopg2://user:pwd@直属库:5432/crm"
engine = create_engine(D,SN, pool_pre_ping=True)

def clean(txt):
    txt = re.sub(r'<.*?>', '', txt)  # 去 HTML
    txt = re.sub(r'\s+', ' ', txt)
    return txt.strip()

def yield_rows():
    sql = "SELECT id, title, content FROM faq WHERE status='ONLINE'"
    for chunk in pd.read_sql(sql, engine, chunksize=5000):
        for _, row in chunk.iterrows():
            text = f"{row['title']}\n{row['content']}"
            yield row['id'], clean(text)

def build_chunks():
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=51,
        length_function=len,
        separators=["\n", "。", ";"]
    )
    for pk, text in yield_rows():
        for idx, sub in enumerate(splitter.split_text(text)):
            yield {"faq_id": pk, "chunk_id": idx, "text": sub}

if __name__ == "__main__":
    with open("chunks.jsonl", "w", encoding="utf8") as f:
        for c in build_chunks():
            f.write(json.dumps(c, ensure_ascii=False) + "\n")

用 GNU Parallel 把 18 万条拆 20 进程跑,10 分钟搞定。

2. 向量化与索引:直属库 pgvector 一步到位

直属库已装 pgvector 插件,直接建表:

CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE faq_emb (
    id SERIAL PRIMARY KEY,
    faq_id INT,
    chunk_id INT,
    text TEXT,
    emb VECTOR(1536)
);

Python 灌库:

# emb/insert.py
import openai, json, psycopg2
from pgvector.psycopg2 import register_vector

openai.api_key = os.getenv("OPENAI_KEY")
conn = psycopg2.connect(dbname="crm", user="user", password="pwd", host="直属库")
register_vector(conn)

def get_embedding(text):
    resp = openai.Embedding.create(input=text, model="text-embedding-ada-002")
    return resp['data'][0]['embedding']

cur = conn.cursor()
with open("chunks.jsonl", encoding="utf8") as f:
    for line in f:
        c = json.loads(line)
        emb = get_embedding(c["text"])
        cur.execute(
            "INSERT INTO faq_emb(faq_id,chunk_id,text,emb) VALUES %s",
            [(c["faq_id"], c["chunk_id"], c["text"], emb)]
        )
conn.commit()

建 IVFFlat 索引加速:

CREATE INDEX ON faq_emb USING ivfflat (emb vector_cosine_ops) WITH (lists = 100);

3. LangChain RetrievalQA:200 毫秒完成“检索 + 生成”

# bot/chain.py
from langchain.vectorstores.pgvector import PGVector
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

CONNECTION_STRING = "postgresql+psycopg2://user:pwd@直属库:5432/crm"
COLLECTION = "faq_emb"

vectorstore = PGVector(
    connection_string=CONNECTION_STRING,
    collection_name=COLLECTION,
    embedding_function=OpenAIEmbeddings(model="text-embedding-ada-002")
)

prompt = PromptTemplate(
    input_variables=["context", "question"],
    template="""
你是公司客服机器人,只能使用以下上下文回答问题,禁止编造:
{context}
问题:{question}
"""
)

qa = RetrievalQA.from_chain_type(
    llm=ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 3}),
    chain_type_kwargs={"prompt": prompt},
    return_source_documents=True
)

FastAPI 包一层:

# bot/api.py
from fastapi import FastAPI
app = FastAPI()

@app.post("/ask")
def ask(q: str):
    ans = qa({"query": q})
    return {"answer": ans["result"], "source": ans["source_documents"]}

压测结果:P99 580 ms,命中率 87%,老板终于笑了。

整体架构

性能优化:让“秒回”更稳

1. 并行建索引

上面 ETL 脚本开 20 进程,IO 打满但 CPU 还有余量,就把 embedding 请求改成异步:

import asyncio, aiohttp, openai
async def embed_many(texts):
    tasks = [openai.Embedding.acreate(input=t, model="ada-002") for t in texts]
    return await asyncio.gather(*tasks)

一次批量 100 条,QPS 提升 6 倍。

2. 缓存热问

客服 80% 问题集中在 200 个高频 FAQ。用 Redis 把“向量哈希 → 答案”缓存 5 分钟,命中后 20 ms 返回。哈希取法:

import hashlib
def qhash(q):
    return hashlib.blake2b(q.encode('utf8'), digest_size=8).hexdigest()

3. 超时重试

OpenAI 接口偶发 429,用 tenacity 装饰器:

from tenacity import retry, stop_after_attempt, wait_random_exponential
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(5))
def get_embedding(text):
    ...

避坑指南:中文场景的小地雷

  1. 分词器选错,句子被拦腰斩断
    RecursiveCharacterTextSplitter 时把 separators 放中文标点:“\n。;,” 实测召回 +5%。

  2. 向量相似度阈值
    cosine < 0.78 基本胡言乱语,> 0.82 又太保守。用验证集画 PR 曲线,选 0.8 最平衡。

  3. 对话状态管理
    多轮场景要保留上文。把历史问答拼成“伪文档”再检索,否则用户追问“那怎么办”会断片。LangChain 的 ConversationalRetrievalQA 可接盘。

生产建议:监控 + 容灾

指标 采集方式 告警阈值
P99 延迟 FastAPI middleware > 1 s
检索命中率 日志统计 < 80%
幻觉率 随机 100 条人工抽检 > 5%
向量索引延迟 pg_stat_statements > 100 ms

容灾:直属库做主从 + 延迟从,向量表每日逻辑备份到对象存储;LLM 侧配置 fallback 到 Azure OpenAI,DNS 秒级切换。

监控看板

进阶思考

  1. 查询改写:先用 LLM 把口语问句改写成关键词组合,再向量检索,能否再提 10% 命中率?
  2. 分级召回:先走倒排索引粗排 1k 候选,再走向量精排 10,延迟能否压到 200 ms 以内?
  3. 私有化 embedding:用中文 BGE-large-en-v1.5 替代 OpenAI,成本归零,效果会不会掉?

把这三点跑通,也许就能从“秒回”进化到“毫秒回”了。祝你玩得开心,有问题评论区一起踩坑。

限时福利领取


Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐