最近在做一个智能客服项目,客户对响应速度和成本控制要求很高。传统的基于云API的方案,延迟和费用都让人头疼。经过一番折腾,我们最终选择了 LangGraphOllama 这套组合拳,成功搭建了一个高性能、低成本的本地化智能客服系统。今天就来分享一下我们的架构设计和实战经验。

智能客服系统架构示意图

一、 为什么选择 LangGraph + Ollama?

在项目初期,我们评估了市面上几种主流方案。

1. 传统方案的痛点 传统的智能客服,要么是基于规则引擎(响应快但死板),要么是直接调用云端大模型API(灵活但成本高、延迟不可控)。特别是后者,存在几个核心问题:

  • 延迟高:网络往返加上模型推理,用户经常要等好几秒。
  • 成本不可控:按Token计费,对话量一大,账单非常吓人。
  • 数据隐私:客户对话数据要出域,存在合规风险。
  • 扩展性差:业务逻辑(如查询订单、转人工)和对话逻辑耦合在一起,难以维护和迭代。

2. 技术选型对比 为了解决这些问题,我们决定将工作流编排和模型推理拆开,并尽可能本地化。

  • 工作流编排:LangGraph vs. Airflow/Kubeflow Airflow和Kubeflow是优秀的工作流调度工具,但它们更偏向于数据处理和机器学习流水线,对于需要实时、有状态、多轮对话的客服场景,显得过于“重型”和复杂。LangGraph 是LangChain团队推出的库,专为构建有状态的、由LLM驱动的智能体(Agent) 和工作流而生。它的核心优势在于:

    • 轻量级与易用性:用Python定义状态图和节点,API非常直观,开发效率高。
    • 内置状态管理:自动维护对话的上下文状态(State),省去了自己用字典或数据库维护的麻烦。
    • 灵活的流程控制:支持条件分支、循环、并行等,能轻松实现“先识别意图,再决定下一步”的复杂对话逻辑。
  • 模型部署:Ollama vs. 云API 直接调用GPT-4等云API,简单但昂贵。Ollama 允许我们在自己的服务器上一键拉取和运行开源大模型(如Llama 3、Mistral、Qwen等)。它的优势显而易见:

    • 零网络延迟:模型就在本地,推理速度极快。
    • 固定成本:一次性投入硬件,对话量再大也没有额外Token费用。
    • 数据安全:所有数据都在内网流转。
    • 模型量化支持:Ollama支持GGUF等量化格式,可以在消费级显卡甚至CPU上流畅运行7B/8B参数级别的模型,性价比超高。

二、 核心架构设计与实现

我们的智能客服核心是一个由LangGraph编排的有向无环图(DAG)。每个用户会话对应一个图实例,状态在其中流转。

1. 系统状态(State)设计 这是LangGraph的核心,定义了在整个对话流程中需要传递和更新的数据。

from typing import TypedDict, List, Annotated
from langgraph.graph.message import add_messages
import operator

class State(TypedDict):
    """定义对话状态机所需的所有状态字段。"""
    # 用户当前输入的问题
    user_input: str
    # 历史对话消息列表,LangGraph提供了便捷的合并操作符
    messages: Annotated[List, add_messages]
    # 识别出的用户意图,如“查询订单”、“产品咨询”、“投诉”
    intent: str
    # 从用户输入中提取的关键实体,如订单号、产品名
    entities: dict
    # 当前对话所处的节点或阶段
    current_step: str
    # 从知识库或数据库查询到的结果
    query_result: str
    # 最终返回给用户的回复
    final_response: str

2. 构建对话工作流图(Graph) 我们将一次对话处理分解为几个顺序执行的节点(Node)。

from langgraph.graph import StateGraph, END
import asyncio
from typing import Any

# 初始化工作流构建器
workflow = StateGraph(State)

# 节点1:意图识别与实体提取
def intent_classification(state: State) -> State:
    """
    使用Ollama本地模型识别用户意图并提取关键实体。

    Args:
        state: 当前对话状态。

    Returns:
        State: 更新了`intent`和`entities`字段的状态。
    """
    # 这里简化了Prompt工程,实际应用需要精心设计
    prompt = f"""
    请分析以下用户输入的意图,并从输入中提取关键信息。
    可选意图:查询订单、产品咨询、操作指南、投诉建议、其他。
    用户输入:{state['user_input']}

    请以JSON格式回复,包含`intent`和`entities`两个字段。
    """
    # 调用本地Ollama模型(例如:llama3:8b)
    # 实际调用需要用到ollama的Python库或异步HTTP客户端
    response = call_ollama_model(prompt, model="llama3:8b")
    # 解析response,更新state
    parsed_response = parse_json_response(response)
    state['intent'] = parsed_response.get('intent', '其他')
    state['entities'] = parsed_response.get('entities', {})
    state['current_step'] = 'intent_classified'
    return state

# 节点2:路由决策
def router(state: State) -> str:
    """
    根据识别出的意图,决定下一步该进入哪个节点。

    Args:
        state: 当前对话状态。

    Returns:
        str: 下一个要执行的节点名称。
    """
    intent = state['intent']
    if intent == "查询订单":
        return "query_database"
    elif intent == "产品咨询":
        return "query_knowledge_base"
    elif intent == "操作指南":
        return "query_knowledge_base"
    elif intent == "投诉建议":
        return "transfer_to_human"
    else:
        return "general_chat"

# 节点3:查询知识库(RAG示例)
async def query_knowledge_base(state: State) -> State:
    """
    基于检索增强生成(RAG)从本地知识库中查找答案。
    包含错误重试机制。

    Args:
        state: 当前对话状态。

    Returns:
        State: 更新了`query_result`字段的状态。
    """
    query = state['user_input']
    max_retries = 3
    for attempt in range(max_retries):
        try:
            # 1. 将用户问题转换为向量(Embedding)
            query_vector = await get_embedding_async(query)
            # 2. 从向量数据库(如Chroma, Weaviate)进行相似度搜索
            # 这里假设有一个异步的搜索函数
            search_results = await vector_store.similarity_search_by_vector(
                query_vector, k=3
            )
            # 3. 将检索到的上下文与问题组合,发送给LLM生成最终答案
            context = "\n".join([doc.page_content for doc in search_results])
            rag_prompt = f"""基于以下上下文回答问题。如果上下文不包含答案,请礼貌告知你无法回答。
            上下文:{context}
            问题:{query}
            答案:"""
            # 使用带超时的异步调用
            answer = await asyncio.wait_for(
                call_ollama_model_async(rag_prompt, model="llama3:8b"),
                timeout=10.0 # 设置10秒超时
            )
            state['query_result'] = answer
            state['current_step'] = 'knowledge_queried'
            return state
        except asyncio.TimeoutError:
            print(f"知识库查询超时,第{attempt + 1}次重试...")
            if attempt == max_retries - 1:
                state['query_result'] = "系统繁忙,请稍后再试。"
                return state
        except Exception as e:
            print(f"知识库查询出错: {e}")
            if attempt == max_retries - 1:
                state['query_result'] = "查询知识库时发生错误。"
                return state
        await asyncio.sleep(1) # 重试前等待1秒

# 节点4:生成最终回复
def generate_response(state: State) -> State:
    """
    整合所有信息,生成最终的自然语言回复。

    Args:
        state: 当前对话状态。

    Returns:
        State: 更新了`final_response`字段的状态。
    """
    if state.get('query_result'):
        base = state['query_result']
    else:
        base = state['user_input']

    final_prompt = f"""
    你是一个专业的客服助手。请根据以下信息,生成一段友好、专业的回复。
    用户原始问题:{state['user_input']}
    已获取的信息:{base}
    请生成回复:
    """
    response = call_ollama_model(final_prompt, model="llama3:8b")
    state['final_response'] = response
    state['current_step'] = 'response_generated'
    return state

# 将各个节点添加到图中
workflow.add_node("classify_intent", intent_classification)
workflow.add_node("query_kb", query_knowledge_base) # 注意这里添加的是异步节点
workflow.add_node("generate_resp", generate_response)
# ... 添加其他节点,如 query_database, general_chat 等

# 设置边和路由
workflow.set_entry_point("classify_intent")
workflow.add_conditional_edges(
    "classify_intent",
    router, # 路由函数决定下一个节点
    {
        "query_database": "query_db_node", # 假设有查询数据库节点
        "query_knowledge_base": "query_kb",
        "general_chat": "generate_resp",
        "transfer_to_human": END # 直接结束,转人工
    }
)
workflow.add_edge("query_kb", "generate_resp")
workflow.add_edge("generate_resp", END)

# 编译图
app = workflow.compile()

3. Ollama模型选型与部署建议

  • 模型选择:对于客服场景,我们推荐Mistral 7BLlama 3 8B的指令微调版本(Instruct)。它们在理解指令、遵循格式和生成友好文本方面表现均衡。中文场景可以考虑Qwen 7B
  • 量化部署:使用Ollama的ollama pull命令拉取量化版模型,如llama3:8b-instruct-q4_K_Mq4_K_M表示4位量化,在保证精度损失很小的前提下,大幅降低显存占用(8B模型约需5-6GB显存),使得在RTX 4060等消费级显卡上部署成为可能。
  • 服务化:使用ollama serve在后台运行模型服务,并通过其提供的HTTP API(默认端口11434)进行调用。

三、 性能优化实战

架构搭好了,但要达到“生产级”和“毫秒级响应”,还需要一系列优化。

1. 异步IO处理高并发 智能客服是典型的IO密集型应用(网络、数据库、模型推理)。使用异步编程可以极大提升并发能力。

import aiohttp
from fastapi import FastAPI, BackgroundTasks
import json

app = FastAPI()
# 假设我们有一个全局的LangGraph应用实例 `compiled_app`

async def call_ollama_model_async(prompt: str, model: str) -> str:
    """异步调用Ollama模型的HTTP API。"""
    url = "http://localhost:11434/api/generate"
    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False
    }
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                result = await resp.json()
                return result.get("response", "")
        except aiohttp.ClientError as e:
            print(f"调用Ollama API失败: {e}")
            return ""

@app.post("/chat")
async def chat_endpoint(user_input: str, session_id: str):
    """
    处理用户聊天请求的异步端点。
    """
    # 1. 从Redis获取或初始化该session_id的对话状态
    initial_state = await redis.get(f"state:{session_id}")
    if not initial_state:
        initial_state = State(user_input=user_input, messages=[], ...)
    else:
        initial_state['user_input'] = user_input

    # 2. 异步执行LangGraph工作流
    # LangGraph的`ainvoke`是异步方法
    final_state = await app.ainvoke(initial_state)

    # 3. 将更新后的状态保存回Redis,并设置过期时间(如30分钟无活动则清除)
    await redis.setex(f"state:{session_id}", 1800, json.dumps(final_state))

    # 4. 返回最终回复
    return {"response": final_state['final_response']}

2. 对话状态的Redis缓存策略

  • 键设计state:{session_id}
  • 序列化:使用json.dumps/loadsmsgpack
  • 过期时间:根据业务设置,例如30分钟。避免内存无限增长。
  • 写回策略:每次ainvoke执行后,将整个状态写回。虽然有一定开销,但保证了状态的一致性,简化了逻辑。

3. 负载测试与瓶颈分析 使用locustwrk进行压力测试。

  • 关键指标
    • QPS(每秒查询率):在响应时间可接受(如P95 < 2s)的前提下,系统能处理的最大请求数。
    • 延迟(Latency):P50、P95、P99分位的响应时间。
  • 常见瓶颈及排查
    1. Ollama模型推理速度:这是主要瓶颈。监控GPU利用率。考虑升级GPU、使用更小的量化模型(如q4_0)、或部署多个Ollama实例做负载均衡。
    2. 向量数据库搜索:如果知识库很大,检索可能变慢。确保对embedding字段建立了向量索引,并限制返回结果数量(k=3通常足够)。
    3. 网络与序列化:异步框架和Redis调用本身也有开销。使用连接池、优化序列化协议(如用orjson替代json)。

性能监控仪表盘示意图

四、 避坑指南与进阶技巧

在实际部署中,我们踩过不少坑,这里总结一下。

1. 对话漂移问题 模型在长对话中可能会“忘记”最初的目标或上下文。

  • 解决方案:在State中明确维护一个conversation_goal(对话目标)字段,在每一轮路由时都进行核对。或者在messages历史中,定期由系统插入一个“总结当前对话核心问题”的提示,来锚定对话方向。

2. 模型冷启动优化 Ollama首次加载模型或长时间无请求后,第一次推理会特别慢。

  • 技巧
    • 预热:服务启动时,发送几个简单的预热请求给Ollama。
    • 保活:如果业务有低峰期,可以设置一个定时任务,每隔几分钟发送一个轻量级请求,保持模型处于“温热”状态。
    • 使用num_ctx参数:在Ollama的模型配置中合理设置上下文长度,太大会增加每次推理的开销。

3. 敏感信息过滤 客服可能接触到用户手机号、地址等信息。

  • 实践方案
    • 输入输出过滤:在状态进入LangGraph之前和最终回复输出之前,增加一个过滤节点。可以使用正则表达式或专门训练的小型NER模型来识别和脱敏(如替换为[PHONE])。
    • 日志脱敏:确保所有日志系统不会记录原始的敏感信息。
def sensitive_info_filter(state: State) -> State:
    """简易的敏感信息过滤函数。"""
    import re
    text = state['user_input']
    # 过滤手机号(简单示例)
    phone_pattern = r'1[3-9]\d{9}'
    filtered_text = re.sub(phone_pattern, '[PHONE]', text)
    state['user_input'] = filtered_text
    return state
# 将这个节点作为工作流的第一个节点
workflow.add_node("input_filter", sensitive_info_filter)
workflow.set_entry_point("input_filter")
workflow.add_edge("input_filter", "classify_intent")

五、 总结与思考

通过将LangGraph的流程编排能力和Ollama的本地化模型部署相结合,我们成功构建了一个响应迅速、成本可控、数据安全的智能客服系统。这套架构的核心优势在于解耦可控:业务逻辑被清晰地定义在状态图的各个节点中,模型服务完全掌握在自己手里。

当然,这套方案也在持续演进中。最后,留三个开放性问题供大家探讨:

  1. 图的可视化与调试:LangGraph的工作流在复杂后,如何能更直观地监控一次对话具体走了哪些节点,每个节点的输入输出是什么?是否需要集成像LangSmith这样的追踪工具?
  2. 模型的动态热更新:当我们需要将Ollama中的模型从llama3:8b升级到llama3:70b,或者切换为另一个新模型时,如何在不中断在线服务的情况下实现平滑过渡?
  3. 多模态扩展:未来的客服可能不只是文字,还需要处理用户上传的图片(如产品故障图)、语音消息。这套架构应该如何改造,以优雅地支持多模态输入和输出?

希望这篇笔记能为你构建自己的智能客服系统提供一些切实可行的思路。这条路我们还在继续探索,欢迎一起交流。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐