最近在帮朋友的抖店做客服自动化改造,发现人工客服每天要处理大量重复咨询,比如“什么时候发货”、“怎么退货”、“有没有优惠券”等等,不仅响应慢,夜间还经常无人值守,白白流失订单。正好研究了一下低代码AI应用平台Dify,用它搭建了一个智能客服Agent,成功接入了抖店客服,实现了自动回复。整个过程踩了不少坑,也总结了一些优化经验,在这里分享给大家。

背景痛点:为什么需要智能客服?

抖店作为兴趣电商的核心阵地,客服压力主要来自几个方面:

  1. 高频重复咨询:超过60%的咨询集中在物流状态、退换货政策、优惠活动等几个固定问题上。人工客服重复回答,效率低下且容易因疲劳出错。
  2. 服务时间限制:人工客服通常只在白天工作,而电商用户咨询行为是全天候的,尤其是夜间和凌晨的咨询无法及时响应,直接影响转化率和用户体验。
  3. 人力成本攀升:随着店铺规模扩大,客服团队人力成本成为一笔不小的开支,且招聘和培训周期长。
  4. 服务标准不一:不同客服的业务熟练度和回答方式存在差异,难以保证服务质量的统一性。

基于这些痛点,一个能够理解用户意图、7x24小时稳定响应、且回答准确一致的智能客服Agent就成了一个很实际的解决方案。

客服工作场景示意

技术选型:为什么是Dify?

在搭建智能客服时,我们对比过几种主流方案:

  • 自研NLP模型:效果可控但开发周期长,需要专业的算法和标注团队,维护成本高。
  • 云厂商对话机器人(如阿里云、腾讯云):开箱即用,但意图识别和对话流程定制不够灵活,且与业务系统深度集成较复杂。
  • 开源Bot框架(如Rasa):灵活性极高,但需要从零开始构建NLU、对话管理、动作执行等全套模块,技术门槛不低。

最终选择Dify,主要基于以下几点考虑:

  1. 意图识别准确率:Dify底层可以灵活接入多种大模型(如GPT、文心一言等),利用大模型的强大语义理解能力,对于未在训练集中明确标注的、但语义相近的用户问法,也能有较好的识别效果。相比之下,传统基于规则或小规模训练集的方案泛化能力较弱。
  2. 多轮对话支持:Dify提供了可视化的对话流程编排工具,可以轻松设计包含“槽位填充”的多轮对话。例如,处理退货申请时,可以引导用户依次提供订单号、退货原因、商品图片等信息,逻辑清晰,易于维护。
  3. 快速集成与部署:Dify提供了标准的API和Webhook接口,方便与外部系统(如抖店开放平台)对接。其“应用即API”的理念,让我们可以专注于业务逻辑,而非底层架构。
  4. 持续学习与优化:Dify支持通过对话日志进行持续学习,可以不断优化意图识别模型和回答内容,让智能体越用越聪明。

核心实现:从零搭建到自动回复

1. Dify Agent创建与训练

首先在Dify上创建一个新的“助手”类型应用。

NLU数据集构建: 意图识别的质量取决于训练数据。我们针对抖店常见问题,整理了多个意图类别,每个意图提供多种不同的用户表达方式(utterances)。

例如,针对“查询物流”意图,我们准备了如下训练数据(格式为CSV,可导入Dify的数据集模块):

用户问题,意图
我的快递到哪了?,query_logistics
物流信息怎么查?,query_logistics
发货了吗?几天能到?,query_logistics
订单号XXXXXX走到哪了?,query_logistics

关键技巧是同义句扩充。对于同一个意图,尽可能多地收集用户可能的不同问法,包括口语化、简写、带错别字的情况。我们使用了简单的脚本,基于种子问题通过同义词替换、句式变换生成了一批增强数据。

在Dify的“提示词编排”环节,我们为每个意图设定了清晰、友好的回复模板,并引入系统指令来约束AI的回复风格,例如:“你是一名专业、亲切的电商客服,请用简短、明确的语句回答用户问题,避免冗长和无关信息。”

2. 抖店开放平台接口对接

抖店客服消息通过开放平台的“消息推送”接口接收,我们需要配置一个可公网访问的Webhook URL。

OAuth2.0鉴权: 调用抖店大部分API都需要先获取访问令牌(access_token)。以下是使用httpx库异步获取token的代码片段:

import httpx
from typing import Optional
import time

class DouDianAuth:
    def __init__(self, app_key: str, app_secret: str, shop_id: str):
        self.app_key = app_key
        self.app_secret = app_secret
        self.shop_id = shop_id
        self._access_token: Optional[str] = None
        self._token_expire_time: float = 0

    async def get_access_token(self) -> str:
        """获取抖店开放平台access_token,带内存缓存"""
        if self._access_token and time.time() < self._token_expire_time:
            return self._access_token

        url = "https://openapi.douyin.com/oauth2/access_token/"
        params = {
            "app_key": self.app_key,
            "app_secret": self.app_secret,
            "shop_id": self.shop_id,
            "grant_type": "authorization_code"
        }
        async with httpx.AsyncClient() as client:
            resp = await client.post(url, params=params)
            resp.raise_for_status()
            data = resp.json()

        if data.get("code") == 0:
            self._access_token = data["data"]["access_token"]
            # 令牌有效期通常为24小时,这里提前5分钟刷新
            self._token_expire_time = time.time() + data["data"]["expires_in"] - 300
            return self._access_token
        else:
            raise Exception(f"Failed to get access token: {data}")

消息接收与路由: 抖店会将用户消息以HTTP POST形式推送到我们配置的Webhook。我们需要实现一个消息路由分发器。

3. 消息路由与处理的完整Python实现

以下是一个基于FastAPI的完整消息处理示例,包含了异步处理、异常重试和基础限流。

from fastapi import FastAPI, Request, HTTPException, Depends
import httpx
import asyncio
from typing import Dict, Any
import json
import logging
from datetime import datetime
from .auth import DouDianAuth  # 导入上面的鉴权类
from .rate_limiter import TokenBucketLimiter  # 自定义限流器,下文会介绍

app = FastAPI()
logger = logging.getLogger(__name__)

# 初始化组件
auth_client = DouDianAuth(app_key="your_app_key", app_secret="your_secret", shop_id="your_shop_id")
dify_webhook_url = "https://api.dify.ai/v1/chat-messages"  # Dify应用API地址
dify_api_key = "your_dify_app_api_key"
limiter = TokenBucketLimiter(capacity=10, fill_rate=1)  # 限流:每秒最多处理10个请求

async def verify_douyin_signature(request: Request) -> Dict[str, Any]:
    """验证抖店消息签名(简化示例,实际需按抖店文档实现)"""
    body_bytes = await request.body()
    # 实际签名验证逻辑应在此处实现,使用app_secret等
    # 这里假设验证通过,直接解析JSON
    try:
        return json.loads(body_bytes)
    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON")

@app.post("/doudian/webhook")
async def handle_customer_message(payload: Dict[str, Any] = Depends(verify_douyin_signature)):
    """处理抖店推送的客服消息"""
    # 1. 基础校验与限流
    if not limiter.allow_request():
        logger.warning(f"Rate limit exceeded at {datetime.utcnow()}")
        raise HTTPException(status_code=429, detail="Too Many Requests")

    # 2. 提取关键信息
    try:
        msg_type = payload.get("type")
        if msg_type != "message":
            return {"code": 0, "msg": "ignore non-message event"}  # 非消息事件直接忽略

        user_open_id = payload["data"]["user_open_id"]
        content = payload["data"]["content"].strip()
        session_id = f"doudian_{user_open_id}"  # 用open_id构造会话ID,用于Dify的会话保持

        logger.info(f"Received message from {user_open_id}: {content}")

        # 3. 调用Dify API获取智能回复
        dify_response = await call_dify_agent(session_id, content)

        # 4. 将回复发送回抖店客服接口
        await send_reply_to_doudian(user_open_id, dify_response)

        return {"code": 0, "msg": "success"}

    except KeyError as e:
        logger.error(f"Missing key in payload: {e}")
        raise HTTPException(status_code=400, detail=f"Invalid payload structure: {e}")
    except Exception as e:
        logger.exception(f"Error processing message: {e}")
        # 可以考虑加入重试队列,这里简单返回错误
        raise HTTPException(status_code=500, detail="Internal server error")

async def call_dify_agent(session_id: str, query: str, retries: int = 3) -> str:
    """调用Dify智能体API,包含重试机制"""
    headers = {
        "Authorization": f"Bearer {dify_api_key}",
        "Content-Type": "application/json"
    }
    data = {
        "query": query,
        "response_mode": "blocking",  # 同步等待回复
        "conversation_id": session_id,  # 传入会话ID,Dify会维护上下文
        "user": session_id
    }
    async with httpx.AsyncClient(timeout=30.0) as client:
        for attempt in range(retries):
            try:
                resp = await client.post(dify_webhook_url, json=data, headers=headers)
                resp.raise_for_status()
                result = resp.json()
                # 提取Dify返回的答案文本
                answer = result.get("answer", "").strip() or result.get("message", "")
                return answer if answer else "您好,我暂时无法处理这个问题,请稍后再试或联系人工客服。"
            except (httpx.RequestError, httpx.HTTPStatusError) as e:
                logger.warning(f"Attempt {attempt + 1} failed to call Dify: {e}")
                if attempt == retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)  # 指数退避
    return "服务暂时不可用,请稍后重试。"

async def send_reply_to_doudian(user_open_id: str, reply_text: str):
    """调用抖店开放平台API,发送客服回复消息"""
    access_token = await auth_client.get_access_token()
    url = "https://openapi.douyin.com/api/message/send/"
    headers = {"access-token": access_token}
    # 注意:抖店消息内容需要做安全过滤和转义(下文会讲)
    filtered_text = sensitive_filter(reply_text)
    payload = {
        "to_user_id": user_open_id,
        "message_type": "text",
        "content": json.dumps({"text": filtered_text})  # 文本消息格式
    }
    async with httpx.AsyncClient() as client:
        resp = await client.post(url, json=payload, headers=headers)
        # 处理响应,记录日志或错误
        logger.debug(f"Reply sent to {user_open_id}, status: {resp.status_code}")

生产考量:确保稳定与安全

1. 并发限流策略

电商大促期间,消息量可能激增。为了防止我们的服务被压垮或过度调用Dify API(可能产生高额费用),必须实施限流。这里实现一个简单的令牌桶算法

import time
import asyncio
from threading import Lock

class TokenBucketLimiter:
    """简单的令牌桶限流器(线程安全)"""
    def __init__(self, capacity: int, fill_rate: float):
        """
        Args:
            capacity: 桶容量,即瞬时最大请求数
            fill_rate: 每秒填充的令牌数
        """
        self.capacity = float(capacity)
        self._tokens = float(capacity)
        self.fill_rate = fill_rate
        self._last_time = time.monotonic()
        self._lock = Lock()

    def allow_request(self, tokens: float = 1.0) -> bool:
        """检查是否允许请求,消耗tokens个令牌"""
        with self._lock:
            now = time.monotonic()
            # 计算自上次检查以来应填充的令牌数
            elapsed = now - self._last_time
            self._tokens = min(self.capacity, self._tokens + elapsed * self.fill_rate)
            self._last_time = now

            if self._tokens >= tokens:
                self._tokens -= tokens
                return True
            return False

在FastAPI中,我们可以使用中间件或在每个端点入口调用limiter.allow_request()

2. 敏感词过滤模块

自动回复的内容必须安全合规。我们需要一个过滤模块,在消息发送前进行处理。

import re
from typing import List, Set

class SensitiveFilter:
    def __init__(self, keyword_file_path: str = None):
        self._sensitive_words: Set[str] = set()
        if keyword_file_path:
            self.load_keywords(keyword_file_path)
        # 也可以加载一些内置的常见敏感词
        self._sensitive_words.update(["涉政词A", "违禁词B"])  # 示例,实际需从安全渠道获取

    def load_keywords(self, file_path: str):
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                word = line.strip()
                if word:
                    self._sensitive_words.add(word)

    def filter(self, text: str, replace_char: str = "*") -> str:
        """过滤文本中的敏感词"""
        if not self._sensitive_words:
            return text
        pattern = '|'.join(map(re.escape, self._sensitive_words))
        # 使用正则进行替换,忽略大小写
        filtered_text = re.sub(pattern, replace_char, text, flags=re.IGNORECASE)
        return filtered_text

# 全局过滤器实例
sensitive_filter = SensitiveFilter("path/to/sensitive_words.txt")

send_reply_to_doudian函数中调用filtered_text = sensitive_filter(reply_text)即可。

避坑指南:实战中遇到的“坑”

1. 抖店消息格式的特殊处理

  • 表情符号与特殊字符:抖店消息中的Emoji或颜文字,在JSON序列化和网络传输时可能出问题。建议在接收和发送时,使用json.dumps(..., ensure_ascii=False)json.loads()进行妥善处理。对于极端情况,可以考虑先进行Base64编码传输,接收端再解码。
  • 消息类型:抖店消息除了文本(text),还有图片(image)、商品卡片(product)等。我们的智能客服初期可以只处理文本类型,对于其他类型,可以回复固定话术,如“暂不支持查看图片,请用文字描述您的问题”。
  • 签名验证:务必严格按照抖店开放平台文档实现消息推送的签名验证,防止恶意请求。

2. 提升Dify意图识别准确率的3个技巧

  1. 精细化意图划分与负样本:不要将所有“其他”问题都归为一个fallback意图。可以细分为“咨询活动”、“产品规格”、“投诉建议”等。同时,为每个意图收集一些“负样本”(即不属于该意图的相似问题),在Dify的数据集标注中明确标记为“不属于此意图”,帮助模型更好地区分边界。
  2. 利用系统提示词进行约束:在Dify的提示词编排中,通过系统指令明确告诉AI它的角色和回答范围。例如:“你是一名抖店客服,只回答与订单、物流、售后、商品相关的问题。对于无法确认或与店铺无关的问题,应引导用户联系人工客服或告知无法处理。”
  3. 结合业务规则进行后处理:Dify返回的答案可能过于笼统。我们可以加入一个后处理层,对识别出的特定意图进行答案增强。例如,当识别到query_logistics意图时,除了返回Dify生成的通用话术,还可以通过用户ID(需与订单系统关联)尝试查询其最近订单的真实物流状态,将动态信息插入回复中,用户体验大幅提升。

技术架构示意图

延伸思考:从自动回复到智能销售

基础的问答客服上线后,已经能解决大部分重复问题。但我们可以更进一步,让这个智能体变得更有价值。

一个方向是结合订单数据库实现个性化推荐。当用户咨询某款商品时,智能体在回复参数、价格等信息的同时,可以查询该用户的购买历史,进行个性化推荐:“您之前购买过A商品,这款新上市的B商品是它的升级版,搭配购买可以享受优惠。”

实现思路是:

  1. 在Dify的“外部知识库”功能中,接入商品知识库(如商品标题、属性、卖点)。
  2. 在消息路由处理层,当识别出用户意图为“商品咨询”或“推荐”时,不仅调用Dify,还同时异步查询用户画像和订单数据库。
  3. 将查询到的个性化信息(如用户偏好品类、历史订单金额)作为“上下文”或“变量”,通过Dify的API传入,让AI在生成回复时参考这些信息,实现“千人千面”的推荐话术。

这相当于给智能客服装上了“记忆”和“洞察”能力,使其从成本中心向销售助手转变。

整个项目从搭建到上线稳定运行大约用了两周时间。目前这个智能客服已经能处理约70%的日常咨询,夜间订单转化率有了明显提升。最大的体会是,利用Dify这样的平台,确实大大降低了AI应用的门槛,让我们可以更专注于业务逻辑和体验优化,而不是陷在模型训练和部署的细节里。如果你也在为客服效率发愁,不妨试试这个方案。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐