手把手教你用Qwen2.5-VL构建智能客服问答系统

1. 引言

想象一下,你是一家电商公司的客服主管。每天,你的团队需要处理成千上万的用户咨询,其中不仅有文字问题,还有大量用户上传的商品图片、订单截图、物流单号照片。传统的文本客服机器人面对这些图片时束手无策,只能回复“请用文字描述您的问题”,用户体验大打折扣。

这就是我们今天要解决的问题。本文将带你从零开始,使用 Qwen2.5-VL多模态语义相关度评估引擎,构建一个真正能“看懂”图片的智能客服问答系统。这个系统不仅能理解用户的文字问题,还能分析用户上传的图片,智能判断客服知识库中哪个答案最相关,从而提供精准的回复。

通过本教程,你将学会:

  • 如何快速部署Qwen2.5-VL多模态评估引擎
  • 如何构建一个支持图文混合输入的智能客服系统
  • 如何将语义相关度评估集成到实际业务场景中
  • 如何优化系统性能,提升客服响应准确率

无论你是AI工程师、产品经理,还是对智能客服感兴趣的技术爱好者,这篇教程都将为你提供一套完整、可落地的解决方案。

2. 系统核心:Qwen2.5-VL多模态评估引擎

在开始动手之前,我们先来了解一下这个系统的“大脑”——Qwen2.5-VL多模态语义相关度评估引擎。

2.1 它能做什么?

简单来说,这个引擎就像一个智能裁判。当你给它一个用户的问题(可能是文字,也可能是文字+图片),再给它一个候选的客服答案,它就能判断这个答案是否真的回答了用户的问题,并给出一个0到1的分数。

举个例子:

  • 用户查询:文字“这件衣服有货吗?” + 图片(一件红色连衣裙的照片)
  • 候选答案1:“红色连衣裙目前有货,尺码齐全”
  • 候选答案2:“黑色西装外套需要预定”

引擎会分析图片中的红色连衣裙,然后判断:答案1直接提到了“红色连衣裙”,相关性很高,可能得0.9分;答案2说的是“黑色西装外套”,完全不相关,可能得0.1分。

2.2 为什么选择这个方案?

传统的客服系统有三大痛点:

  1. 看不懂图片:用户发个商品图问“这个有货吗?”,系统无法识别图片内容
  2. 语义理解浅:只能做关键词匹配,无法理解问题的真实意图
  3. 答案质量差:经常给出看似相关但实际没用的答案

Qwen2.5-VL引擎正好解决了这些问题:

  • 多模态理解:文字和图片都能处理
  • 深度语义分析:不是简单的关键词匹配,而是真正的理解
  • 概率化评分:给出0-1的置信度分数,让你知道答案有多可靠

2.3 技术架构概览

整个评估过程可以简化为以下流程:

用户输入(文字/图片)
        │
        ▼
多模态Prompt构造
        │
        ▼
Qwen2.5-VL多模态模型
        │
        ▼
语义推理与分析
        │
        ▼
相关度评分(0~1)

这个流程的核心是Qwen2.5-VL模型,它基于先进的视觉-语言大模型技术,能够同时理解图像和文本的语义信息。

3. 环境准备与快速部署

现在让我们开始动手搭建。首先需要准备好运行环境。

3.1 系统要求

确保你的环境满足以下要求:

  • 操作系统:Linux(Ubuntu 20.04+推荐)或 macOS
  • Python版本:3.8 或更高版本
  • 内存:至少16GB RAM
  • GPU:推荐使用NVIDIA GPU(显存8GB+),CPU也可运行但速度较慢
  • 磁盘空间:至少20GB可用空间

3.2 一键部署脚本

我们提供了一个完整的部署脚本,可以自动完成所有环境配置:

#!/bin/bash
# deploy_qwen_vl_customer_service.sh

echo "开始部署Qwen2.5-VL智能客服系统..."

# 1. 创建项目目录
mkdir -p qwen_vl_customer_service
cd qwen_vl_customer_service

# 2. 创建Python虚拟环境
echo "创建Python虚拟环境..."
python3 -m venv venv
source venv/bin/activate

# 3. 安装基础依赖
echo "安装Python依赖包..."
pip install --upgrade pip
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
pip install transformers>=4.35.0
pip install accelerate
pip install sentencepiece
pip install protobuf
pip install pillow
pip install requests
pip install streamlit  # 用于Web界面

# 4. 下载Qwen2.5-VL模型
echo "下载Qwen2.5-VL模型..."
# 这里使用Hugging Face的模型,确保你有访问权限
# 或者使用ModelScope(国内访问更快)
pip install modelscope
python -c "from modelscope import snapshot_download; snapshot_download('qwen/Qwen2.5-VL-7B-Instruct', cache_dir='./models')"

echo "部署完成!"
echo "请运行:source venv/bin/activate 激活环境"

将上述脚本保存为deploy.sh,然后执行:

chmod +x deploy.sh
./deploy.sh

3.3 验证安装

部署完成后,运行一个简单的测试脚本验证环境是否正常:

# test_environment.py
import torch
import transformers
from PIL import Image
import requests
from io import BytesIO

print("=== 环境验证测试 ===")

# 1. 检查PyTorch和CUDA
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU设备: {torch.cuda.get_device_name(0)}")
    print(f"显存: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

# 2. 检查transformers版本
print(f"Transformers版本: {transformers.__version__}")

# 3. 测试图片处理
try:
    # 下载测试图片
    url = "https://images.unsplash.com/photo-1490481651871-ab68de25d43d"
    response = requests.get(url)
    img = Image.open(BytesIO(response.content))
    print(f"图片处理测试: 成功加载图片,尺寸: {img.size}")
except Exception as e:
    print(f"图片处理测试失败: {e}")

print("=== 环境验证完成 ===")

运行测试:

python test_environment.py

如果一切正常,你会看到类似这样的输出:

=== 环境验证测试 ===
PyTorch版本: 2.1.0
CUDA可用: True
GPU设备: NVIDIA GeForce RTX 4090
显存: 24.00 GB
Transformers版本: 4.35.0
图片处理测试: 成功加载图片,尺寸: (600, 400)
=== 环境验证完成 ===

4. 构建智能客服问答系统

现在环境已经准备好了,让我们开始构建核心的智能客服系统。

4.1 系统架构设计

我们的智能客服系统包含以下核心组件:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   用户界面层     │    │   业务逻辑层     │    │   数据存储层     │
│                 │    │                 │    │                 │
│  - Web界面      │◄──►│  - 查询处理     │◄──►│  - 知识库       │
│  - API接口      │    │  - 图片分析     │    │  - 对话历史     │
│  - 文件上传     │    │  - 语义匹配     │    │  - 用户数据     │
└─────────────────┘    │  - 答案排序     │    └─────────────────┘
                       │  - 回复生成     │
                       └─────────────────┘
                                │
                                ▼
                       ┌─────────────────┐
                       │   模型服务层     │
                       │                 │
                       │  - Qwen2.5-VL   │
                       │  - 评估引擎     │
                       └─────────────────┘

4.2 核心代码实现

让我们从最核心的部分开始——Qwen2.5-VL评估引擎的封装。

# qwen_vl_evaluator.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from PIL import Image
import numpy as np
from typing import Union, List, Dict, Optional
import logging

class QwenVLEvaluator:
    """Qwen2.5-VL多模态语义评估器"""
    
    def __init__(self, model_path: str = "Qwen/Qwen2.5-VL-7B-Instruct", device: str = None):
        """
        初始化评估器
        
        Args:
            model_path: 模型路径,可以是本地路径或HuggingFace模型ID
            device: 运行设备,'cuda'或'cpu',默认自动选择
        """
        self.logger = logging.getLogger(__name__)
        
        # 自动选择设备
        if device is None:
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
        else:
            self.device = device
            
        self.logger.info(f"使用设备: {self.device}")
        
        # 加载模型和tokenizer
        self.logger.info("正在加载Qwen2.5-VL模型...")
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_path, 
            trust_remote_code=True
        )
        
        self.model = AutoModelForCausalLM.from_pretrained(
            model_path,
            torch_dtype=torch.bfloat16 if self.device == "cuda" else torch.float32,
            device_map="auto" if self.device == "cuda" else None,
            trust_remote_code=True
        ).eval()
        
        self.logger.info("模型加载完成")
        
    def prepare_query_prompt(self, query_text: str, query_image: Optional[Image.Image] = None) -> str:
        """
        准备查询的prompt
        
        Args:
            query_text: 查询文本
            query_image: 查询图片(可选)
            
        Returns:
            格式化后的prompt
        """
        messages = [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "请分析以下用户问题,理解其核心意图。"},
                ]
            }
        ]
        
        # 添加文本
        messages[0]["content"].append({"type": "text", "text": f"用户问题: {query_text}"})
        
        # 如果有图片,添加图片
        if query_image is not None:
            messages[0]["content"].append({"type": "image", "image": query_image})
            
        # 添加指令
        messages[0]["content"].append({
            "type": "text", 
            "text": "请用一句话总结用户的核心需求。"
        })
        
        return messages
    
    def prepare_document_prompt(self, document_text: str, document_image: Optional[Image.Image] = None) -> str:
        """
        准备文档(候选答案)的prompt
        
        Args:
            document_text: 文档文本
            document_image: 文档图片(可选)
            
        Returns:
            格式化后的prompt
        """
        messages = [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "请分析以下客服回答内容。"},
                ]
            }
        ]
        
        # 添加文本
        messages[0]["content"].append({"type": "text", "text": f"客服回答: {document_text}"})
        
        # 如果有图片,添加图片
        if document_image is not None:
            messages[0]["content"].append({"type": "image", "image": document_image})
            
        # 添加指令
        messages[0]["content"].append({
            "type": "text", 
            "text": "请用一句话总结这个回答的核心内容。"
        })
        
        return messages
    
    def evaluate_relevance(self, 
                          query_text: str,
                          document_text: str,
                          query_image: Optional[Image.Image] = None,
                          document_image: Optional[Image.Image] = None) -> float:
        """
        评估查询与文档的相关性
        
        Args:
            query_text: 查询文本
            document_text: 文档文本
            query_image: 查询图片(可选)
            document_image: 文档图片(可选)
            
        Returns:
            相关度分数(0-1)
        """
        try:
            # 构造评估prompt
            messages = [
                {
                    "role": "system",
                    "content": "你是一个智能客服评估专家。请评估用户问题与客服回答的相关性。"
                },
                {
                    "role": "user",
                    "content": []
                }
            ]
            
            # 添加查询部分
            messages[1]["content"].append({
                "type": "text", 
                "text": f"用户问题: {query_text}"
            })
            if query_image is not None:
                messages[1]["content"].append({"type": "image", "image": query_image})
                
            # 添加文档部分
            messages[1]["content"].append({
                "type": "text", 
                "text": f"客服回答: {document_text}"
            })
            if document_image is not None:
                messages[1]["content"].append({"type": "image", "image": document_image})
                
            # 添加评估指令
            messages[1]["content"].append({
                "type": "text",
                "text": """请评估这个客服回答是否直接、准确地回答了用户问题。
                
                请按照以下格式回答:
                1. 相关性分析:[简要分析]
                2. 匹配程度:高/中/低
                3. 置信度分数:0.0-1.0之间的数字
                
                注意:分数1.0表示完全匹配,0.0表示完全不相关。"""
            })
            
            # 生成评估结果
            text = self.tokenizer.apply_chat_template(
                messages,
                tokenize=False,
                add_generation_prompt=True
            )
            
            model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
            
            with torch.no_grad():
                generated_ids = self.model.generate(
                    **model_inputs,
                    max_new_tokens=512,
                    do_sample=False
                )
                
            generated_ids = [
                output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
            ]
            
            response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
            
            # 从响应中提取分数
            score = self._extract_score_from_response(response)
            
            return score
            
        except Exception as e:
            self.logger.error(f"评估过程中出错: {e}")
            return 0.0
    
    def _extract_score_from_response(self, response: str) -> float:
        """
        从模型响应中提取分数
        
        Args:
            response: 模型生成的文本
            
        Returns:
            提取的分数(0-1)
        """
        import re
        
        # 尝试多种方式提取分数
        patterns = [
            r'置信度分数[::]\s*([0-9]*\.?[0-9]+)',  # 中文冒号
            r'分数[::]\s*([0-9]*\.?[0-9]+)',        # 简单分数
            r'([0-9]*\.?[0-9]+)\s*分',               # XX分
            r'score[::]\s*([0-9]*\.?[0-9]+)',       # 英文score
        ]
        
        for pattern in patterns:
            match = re.search(pattern, response, re.IGNORECASE)
            if match:
                try:
                    score = float(match.group(1))
                    # 确保分数在0-1范围内
                    if score > 1.0:
                        score = score / 100.0  # 假设是百分比
                    return max(0.0, min(1.0, score))
                except ValueError:
                    continue
        
        # 如果没有找到数字,尝试根据文本判断
        if any(word in response.lower() for word in ['高度相关', '完全匹配', '非常准确']):
            return 0.9
        elif any(word in response.lower() for word in ['中等相关', '部分匹配', '基本准确']):
            return 0.6
        elif any(word in response.lower() for word in ['低相关', '不匹配', '不准确']):
            return 0.3
        else:
            return 0.5  # 默认中等分数
    
    def batch_evaluate(self, 
                      queries: List[Dict],
                      documents: List[Dict]) -> List[float]:
        """
        批量评估多个查询-文档对
        
        Args:
            queries: 查询列表,每个元素是{'text': str, 'image': Optional[Image]}
            documents: 文档列表,每个元素是{'text': str, 'image': Optional[Image]}
            
        Returns:
            相关度分数列表
        """
        scores = []
        for query, document in zip(queries, documents):
            score = self.evaluate_relevance(
                query_text=query.get('text', ''),
                document_text=document.get('text', ''),
                query_image=query.get('image'),
                document_image=document.get('image')
            )
            scores.append(score)
            
        return scores

4.3 智能客服系统实现

有了评估引擎,现在我们来构建完整的智能客服系统:

# customer_service_system.py
import json
import sqlite3
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
import logging
from qwen_vl_evaluator import QwenVLEvaluator
from PIL import Image
import hashlib

@dataclass
class CustomerQuery:
    """客户查询"""
    query_id: str
    text: str
    image_path: Optional[str] = None
    timestamp: datetime = None
    user_id: Optional[str] = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()
    
    @property
    def image(self) -> Optional[Image.Image]:
        """获取图片对象"""
        if self.image_path and Path(self.image_path).exists():
            try:
                return Image.open(self.image_path)
            except Exception as e:
                logging.warning(f"无法加载图片 {self.image_path}: {e}")
                return None
        return None

@dataclass
class KnowledgeBaseEntry:
    """知识库条目"""
    entry_id: str
    question: str
    answer: str
    image_path: Optional[str] = None
    category: str = "general"
    tags: List[str] = None
    confidence_threshold: float = 0.7
    
    def __post_init__(self):
        if self.tags is None:
            self.tags = []
    
    @property
    def image(self) -> Optional[Image.Image]:
        """获取图片对象"""
        if self.image_path and Path(self.image_path).exists():
            try:
                return Image.open(self.image_path)
            except Exception as e:
                logging.warning(f"无法加载图片 {self.image_path}: {e}")
                return None
        return None

class CustomerServiceSystem:
    """智能客服系统"""
    
    def __init__(self, 
                 model_path: str = "Qwen/Qwen2.5-VL-7B-Instruct",
                 db_path: str = "customer_service.db"):
        """
        初始化客服系统
        
        Args:
            model_path: Qwen2.5-VL模型路径
            db_path: 数据库路径
        """
        self.logger = logging.getLogger(__name__)
        
        # 初始化评估器
        self.evaluator = QwenVLEvaluator(model_path)
        
        # 初始化数据库
        self.db_path = db_path
        self._init_database()
        
        # 知识库缓存
        self.knowledge_base = []
        self._load_knowledge_base()
        
        # 对话历史
        self.conversation_history = {}
        
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建知识库表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS knowledge_base (
                id TEXT PRIMARY KEY,
                question TEXT NOT NULL,
                answer TEXT NOT NULL,
                image_path TEXT,
                category TEXT,
                tags TEXT,
                confidence_threshold REAL DEFAULT 0.7,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建对话历史表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS conversation_history (
                id TEXT PRIMARY KEY,
                user_id TEXT,
                query_text TEXT NOT NULL,
                query_image_path TEXT,
                matched_entry_id TEXT,
                confidence_score REAL,
                response_text TEXT,
                timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (matched_entry_id) REFERENCES knowledge_base (id)
            )
        ''')
        
        # 创建用户反馈表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS user_feedback (
                id TEXT PRIMARY KEY,
                conversation_id TEXT,
                helpful_score INTEGER CHECK(helpful_score >= 1 AND helpful_score <= 5),
                accuracy_score INTEGER CHECK(accuracy_score >= 1 AND accuracy_score <= 5),
                comments TEXT,
                timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (conversation_id) REFERENCES conversation_history (id)
            )
        ''')
        
        conn.commit()
        conn.close()
        
    def _load_knowledge_base(self):
        """从数据库加载知识库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('SELECT * FROM knowledge_base')
        rows = cursor.fetchall()
        
        self.knowledge_base = []
        for row in rows:
            entry = KnowledgeBaseEntry(
                entry_id=row[0],
                question=row[1],
                answer=row[2],
                image_path=row[3],
                category=row[4],
                tags=json.loads(row[5]) if row[5] else [],
                confidence_threshold=row[6]
            )
            self.knowledge_base.append(entry)
            
        conn.close()
        self.logger.info(f"加载了 {len(self.knowledge_base)} 条知识库条目")
        
    def add_knowledge_entry(self, 
                           question: str, 
                           answer: str, 
                           image_path: Optional[str] = None,
                           category: str = "general",
                           tags: List[str] = None,
                           confidence_threshold: float = 0.7) -> str:
        """
        添加知识库条目
        
        Args:
            question: 问题
            answer: 答案
            image_path: 图片路径(可选)
            category: 分类
            tags: 标签列表
            confidence_threshold: 置信度阈值
            
        Returns:
            条目ID
        """
        entry_id = hashlib.md5(f"{question}{answer}".encode()).hexdigest()[:12]
        
        if tags is None:
            tags = []
            
        entry = KnowledgeBaseEntry(
            entry_id=entry_id,
            question=question,
            answer=answer,
            image_path=image_path,
            category=category,
            tags=tags,
            confidence_threshold=confidence_threshold
        )
        
        # 保存到数据库
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT OR REPLACE INTO knowledge_base 
            (id, question, answer, image_path, category, tags, confidence_threshold)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            entry_id,
            question,
            answer,
            image_path,
            category,
            json.dumps(tags),
            confidence_threshold
        ))
        
        conn.commit()
        conn.close()
        
        # 更新缓存
        self.knowledge_base.append(entry)
        
        return entry_id
    
    def process_customer_query(self, 
                              query_text: str,
                              query_image_path: Optional[str] = None,
                              user_id: Optional[str] = None) -> Dict[str, Any]:
        """
        处理客户查询
        
        Args:
            query_text: 查询文本
            query_image_path: 查询图片路径(可选)
            user_id: 用户ID(可选)
            
        Returns:
            处理结果
        """
        query_id = hashlib.md5(f"{query_text}{query_image_path}{datetime.now()}".encode()).hexdigest()[:12]
        
        # 创建查询对象
        query = CustomerQuery(
            query_id=query_id,
            text=query_text,
            image_path=query_image_path,
            user_id=user_id
        )
        
        # 在知识库中查找最佳匹配
        best_match = None
        best_score = 0.0
        all_scores = []
        
        for entry in self.knowledge_base:
            # 评估相关性
            score = self.evaluator.evaluate_relevance(
                query_text=query_text,
                document_text=entry.answer,
                query_image=query.image,
                document_image=entry.image
            )
            
            all_scores.append({
                'entry_id': entry.entry_id,
                'question': entry.question,
                'score': score,
                'threshold': entry.confidence_threshold
            })
            
            # 更新最佳匹配
            if score > best_score and score >= entry.confidence_threshold:
                best_score = score
                best_match = entry
        
        # 准备响应
        if best_match and best_score >= (best_match.confidence_threshold if best_match else 0.7):
            response_text = best_match.answer
            matched_entry_id = best_match.entry_id
            confidence = best_score
        else:
            # 没有找到足够置信度的匹配
            response_text = "抱歉,我暂时无法准确回答您的问题。您可以尝试重新描述问题,或联系人工客服获取帮助。"
            matched_entry_id = None
            confidence = best_score if best_score > 0 else 0.0
            
            # 如果有一些接近的匹配,可以提示用户
            if all_scores:
                top_scores = sorted(all_scores, key=lambda x: x['score'], reverse=True)[:3]
                if top_scores[0]['score'] > 0.5:
                    response_text += "\n\n以下是一些可能相关的信息:"
                    for i, item in enumerate(top_scores[:2], 1):
                        if item['score'] > 0.5:
                            response_text += f"\n{i}. {item['question']}"
        
        # 保存对话历史
        self._save_conversation(
            query_id=query_id,
            user_id=user_id,
            query_text=query_text,
            query_image_path=query_image_path,
            matched_entry_id=matched_entry_id,
            confidence_score=confidence,
            response_text=response_text
        )
        
        # 返回结果
        return {
            'query_id': query_id,
            'response': response_text,
            'matched_entry_id': matched_entry_id,
            'confidence_score': confidence,
            'all_candidates': all_scores[:5],  # 返回前5个候选
            'has_image': query_image_path is not None
        }
    
    def _save_conversation(self,
                          query_id: str,
                          user_id: Optional[str],
                          query_text: str,
                          query_image_path: Optional[str],
                          matched_entry_id: Optional[str],
                          confidence_score: float,
                          response_text: str):
        """保存对话历史"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO conversation_history 
            (id, user_id, query_text, query_image_path, matched_entry_id, confidence_score, response_text)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            query_id,
            user_id,
            query_text,
            query_image_path,
            matched_entry_id,
            confidence_score,
            response_text
        ))
        
        conn.commit()
        conn.close()
    
    def get_conversation_history(self, user_id: str, limit: int = 10) -> List[Dict]:
        """获取用户对话历史"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT * FROM conversation_history 
            WHERE user_id = ? 
            ORDER BY timestamp DESC 
            LIMIT ?
        ''', (user_id, limit))
        
        rows = cursor.fetchall()
        conn.close()
        
        history = []
        for row in rows:
            history.append({
                'query_id': row[0],
                'query_text': row[2],
                'response_text': row[6],
                'confidence_score': row[5],
                'timestamp': row[7]
            })
            
        return history
    
    def add_feedback(self, 
                    conversation_id: str,
                    helpful_score: int,
                    accuracy_score: int,
                    comments: str = "") -> bool:
        """
        添加用户反馈
        
        Args:
            conversation_id: 对话ID
            helpful_score: 有帮助程度(1-5)
            accuracy_score: 准确度(1-5)
            comments: 评论
            
        Returns:
            是否成功
        """
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            feedback_id = hashlib.md5(f"{conversation_id}{datetime.now()}".encode()).hexdigest()[:12]
            
            cursor.execute('''
                INSERT INTO user_feedback 
                (id, conversation_id, helpful_score, accuracy_score, comments)
                VALUES (?, ?, ?, ?, ?)
            ''', (
                feedback_id,
                conversation_id,
                helpful_score,
                accuracy_score,
                comments
            ))
            
            conn.commit()
            conn.close()
            return True
            
        except Exception as e:
            self.logger.error(f"添加反馈失败: {e}")
            return False
    
    def analyze_system_performance(self) -> Dict[str, Any]:
        """分析系统性能"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 总体统计
        cursor.execute('SELECT COUNT(*) FROM conversation_history')
        total_conversations = cursor.fetchone()[0]
        
        cursor.execute('SELECT AVG(confidence_score) FROM conversation_history WHERE confidence_score > 0')
        avg_confidence = cursor.fetchone()[0] or 0
        
        # 反馈统计
        cursor.execute('SELECT AVG(helpful_score), AVG(accuracy_score) FROM user_feedback')
        feedback_result = cursor.fetchone()
        avg_helpful = feedback_result[0] or 0
        avg_accuracy = feedback_result[1] or 0
        
        # 知识库使用统计
        cursor.execute('''
            SELECT kb.question, COUNT(ch.id) as usage_count
            FROM knowledge_base kb
            LEFT JOIN conversation_history ch ON kb.id = ch.matched_entry_id
            GROUP BY kb.id
            ORDER BY usage_count DESC
            LIMIT 10
        ''')
        top_entries = cursor.fetchall()
        
        conn.close()
        
        return {
            'total_conversations': total_conversations,
            'average_confidence': round(avg_confidence, 3),
            'average_helpful_score': round(avg_helpful, 2) if avg_helpful else None,
            'average_accuracy_score': round(avg_accuracy, 2) if avg_accuracy else None,
            'top_used_entries': [
                {'question': row[0], 'usage_count': row[1]}
                for row in top_entries
            ]
        }

4.4 Web界面实现

为了让非技术人员也能使用这个系统,我们创建一个简单的Web界面:

# web_interface.py
import streamlit as st
import pandas as pd
from PIL import Image
import tempfile
import os
from datetime import datetime
from customer_service_system import CustomerServiceSystem
import plotly.express as px
import plotly.graph_objects as go

class CustomerServiceWebApp:
    """客服系统Web应用"""
    
    def __init__(self):
        st.set_page_config(
            page_title="智能客服系统",
            page_icon="",
            layout="wide"
        )
        
        # 初始化系统
        if 'service_system' not in st.session_state:
            st.session_state.service_system = CustomerServiceSystem()
            
        self.system = st.session_state.service_system
        
        # 初始化会话状态
        if 'conversation_history' not in st.session_state:
            st.session_state.conversation_history = []
        if 'current_user_id' not in st.session_state:
            st.session_state.current_user_id = f"user_{datetime.now().strftime('%Y%m%d%H%M%S')}"
            
    def run(self):
        """运行Web应用"""
        st.title(" 智能客服问答系统")
        st.markdown("基于Qwen2.5-VL的多模态语义理解客服系统")
        
        # 侧边栏
        with st.sidebar:
            st.header("系统设置")
            
            # 用户ID
            user_id = st.text_input(
                "用户ID",
                value=st.session_state.current_user_id,
                help="用于标识用户,保存对话历史"
            )
            st.session_state.current_user_id = user_id
            
            # 知识库管理
            st.subheader("知识库管理")
            if st.button("查看知识库"):
                self.show_knowledge_base()
                
            if st.button("添加知识条目"):
                self.add_knowledge_entry()
                
            # 系统分析
            st.subheader("系统分析")
            if st.button("查看系统性能"):
                self.show_system_performance()
                
            # 清空对话
            if st.button("清空当前对话"):
                st.session_state.conversation_history = []
                st.rerun()
        
        # 主界面 - 对话区域
        col1, col2 = st.columns([2, 1])
        
        with col1:
            st.subheader(" 客服对话")
            
            # 显示对话历史
            self.display_conversation_history()
            
            # 输入区域
            st.divider()
            self.display_input_area()
            
        with col2:
            st.subheader(" 对话分析")
            self.display_conversation_analysis()
    
    def display_conversation_history(self):
        """显示对话历史"""
        if st.session_state.conversation_history:
            for msg in st.session_state.conversation_history:
                with st.chat_message(msg["role"]):
                    st.write(msg["content"])
                    
                    # 显示置信度
                    if msg.get("confidence"):
                        confidence = msg["confidence"]
                        color = "green" if confidence > 0.7 else "orange" if confidence > 0.5 else "red"
                        st.markdown(f"<small>置信度: <span style='color:{color}'>{confidence:.2%}</span></small>", 
                                  unsafe_allow_html=True)
                    
                    # 显示图片
                    if msg.get("image"):
                        st.image(msg["image"], caption="用户上传的图片", use_column_width=True)
        else:
            st.info("还没有对话记录,开始提问吧!")
    
    def display_input_area(self):
        """显示输入区域"""
        with st.form(key="query_form", clear_on_submit=True):
            # 文本输入
            query_text = st.text_area(
                "请输入您的问题",
                placeholder="例如:这件衣服有货吗?或者描述您遇到的问题...",
                height=100
            )
            
            # 图片上传
            uploaded_file = st.file_uploader(
                "上传相关图片(可选)",
                type=['jpg', 'jpeg', 'png', 'gif'],
                help="可以上传商品图片、订单截图等"
            )
            
            # 提交按钮
            submitted = st.form_submit_button("发送", use_container_width=True)
            
            if submitted and query_text:
                # 处理图片
                image_path = None
                if uploaded_file is not None:
                    # 保存临时文件
                    with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file:
                        tmp_file.write(uploaded_file.getvalue())
                        image_path = tmp_file.name
                
                # 添加用户消息到历史
                user_msg = {
                    "role": "user",
                    "content": query_text,
                    "timestamp": datetime.now()
                }
                if uploaded_file:
                    user_msg["image"] = Image.open(uploaded_file)
                st.session_state.conversation_history.append(user_msg)
                
                # 处理查询
                with st.spinner("正在分析您的问题..."):
                    result = self.system.process_customer_query(
                        query_text=query_text,
                        query_image_path=image_path,
                        user_id=st.session_state.current_user_id
                    )
                
                # 添加系统回复到历史
                system_msg = {
                    "role": "assistant",
                    "content": result["response"],
                    "confidence": result["confidence_score"],
                    "timestamp": datetime.now()
                }
                st.session_state.conversation_history.append(system_msg)
                
                # 清理临时文件
                if image_path and os.path.exists(image_path):
                    os.unlink(image_path)
                
                st.rerun()
    
    def display_conversation_analysis(self):
        """显示对话分析"""
        if len(st.session_state.conversation_history) >= 2:
            # 计算平均置信度
            confidences = [
                msg["confidence"] for msg in st.session_state.conversation_history 
                if msg["role"] == "assistant" and "confidence" in msg
            ]
            
            if confidences:
                avg_confidence = sum(confidences) / len(confidences)
                
                # 显示置信度图表
                fig = go.Figure(go.Indicator(
                    mode="gauge+number",
                    value=avg_confidence * 100,
                    title={'text': "平均回答置信度"},
                    gauge={
                        'axis': {'range': [0, 100]},
                        'bar': {'color': "darkblue"},
                        'steps': [
                            {'range': [0, 50], 'color': "red"},
                            {'range': [50, 80], 'color': "orange"},
                            {'range': [80, 100], 'color': "green"}
                        ],
                        'threshold': {
                            'line': {'color': "black", 'width': 4},
                            'thickness': 0.75,
                            'value': 70
                        }
                    }
                ))
                
                fig.update_layout(height=300)
                st.plotly_chart(fig, use_container_width=True)
                
                # 显示置信度分布
                st.subheader("置信度分布")
                df = pd.DataFrame({
                    "回答序号": range(1, len(confidences) + 1),
                    "置信度": [c * 100 for c in confidences]
                })
                
                fig2 = px.line(df, x="回答序号", y="置信度", 
                              title="每次回答的置信度变化",
                              markers=True)
                fig2.update_layout(height=250)
                st.plotly_chart(fig2, use_container_width=True)
                
                # 显示建议
                if avg_confidence < 0.7:
                    st.warning(" 系统平均置信度较低,建议优化知识库内容")
                else:
                    st.success(" 系统回答质量良好")
        
        # 显示候选答案分析(如果有)
        if st.session_state.conversation_history:
            last_assistant_msg = None
            for msg in reversed(st.session_state.conversation_history):
                if msg["role"] == "assistant":
                    last_assistant_msg = msg
                    break
            
            if last_assistant_msg and "candidates" in last_assistant_msg:
                st.subheader(" 候选答案排名")
                candidates_df = pd.DataFrame(last_assistant_msg["candidates"])
                st.dataframe(
                    candidates_df.style.background_gradient(subset=['score'], cmap='RdYlGn'),
                    use_container_width=True
                )
    
    def show_knowledge_base(self):
        """显示知识库"""
        st.subheader(" 知识库管理")
        
        if self.system.knowledge_base:
            # 显示统计信息
            categories = {}
            for entry in self.system.knowledge_base:
                categories[entry.category] = categories.get(entry.category, 0) + 1
            
            col1, col2, col3 = st.columns(3)
            with col1:
                st.metric("总条目数", len(self.system.knowledge_base))
            with col2:
                st.metric("分类数量", len(categories))
            with col3:
                avg_threshold = sum(e.confidence_threshold for e in self.system.knowledge_base) / len(self.system.knowledge_base)
                st.metric("平均置信阈值", f"{avg_threshold:.2f}")
            
            # 显示知识库表格
            kb_data = []
            for entry in self.system.knowledge_base:
                kb_data.append({
                    "ID": entry.entry_id[:8],
                    "问题": entry.question[:50] + "..." if len(entry.question) > 50 else entry.question,
                    "答案": entry.answer[:50] + "..." if len(entry.answer) > 50 else entry.answer,
                    "分类": entry.category,
                    "标签": ", ".join(entry.tags[:3]),
                    "置信阈值": entry.confidence_threshold,
                    "有图片": "" if entry.image_path else ""
                })
            
            df = pd.DataFrame(kb_data)
            st.dataframe(df, use_container_width=True)
            
            # 分类统计图表
            fig = px.pie(
                values=list(categories.values()),
                names=list(categories.keys()),
                title="知识库分类分布"
            )
            st.plotly_chart(fig, use_container_width=True)
        else:
            st.info("知识库为空,请先添加知识条目")
    
    def add_knowledge_entry(self):
        """添加知识条目"""
        st.subheader("➕ 添加知识条目")
        
        with st.form(key="add_knowledge_form"):
            col1, col2 = st.columns(2)
            
            with col1:
                question = st.text_area("问题", height=100)
                category = st.text_input("分类", value="general")
                tags = st.text_input("标签(用逗号分隔)")
                
            with col2:
                answer = st.text_area("答案", height=100)
                confidence_threshold = st.slider(
                    "置信度阈值",
                    min_value=0.0,
                    max_value=1.0,
                    value=0.7,
                    step=0.05,
                    help="匹配分数需达到此阈值才会使用该答案"
                )
                knowledge_image = st.file_uploader(
                    "相关图片(可选)",
                    type=['jpg', 'jpeg', 'png']
                )
            
            submitted = st.form_submit_button("添加条目")
            
            if submitted and question and answer:
                # 处理图片
                image_path = None
                if knowledge_image:
                    # 保存图片
                    upload_dir = "knowledge_images"
                    os.makedirs(upload_dir, exist_ok=True)
                    image_path = os.path.join(upload_dir, f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{knowledge_image.name}")
                    
                    with open(image_path, "wb") as f:
                        f.write(knowledge_image.getbuffer())
                
                # 处理标签
                tag_list = [tag.strip() for tag in tags.split(",")] if tags else []
                
                # 添加条目
                entry_id = self.system.add_knowledge_entry(
                    question=question,
                    answer=answer,
                    image_path=image_path,
                    category=category,
                    tags=tag_list,
                    confidence_threshold=confidence_threshold
                )
                
                st.success(f" 知识条目添加成功!ID: {entry_id}")
                st.rerun()
    
    def show_system_performance(self):
        """显示系统性能分析"""
        st.subheader(" 系统性能分析")
        
        performance = self.system.analyze_system_performance()
        
        # 显示关键指标
        col1, col2, col3, col4 = st.columns(4)
        
        with col1:
            st.metric("总对话数", performance['total_conversations'])
        
        with col2:
            st.metric("平均置信度", f"{performance['average_confidence']:.1%}")
        
        with col3:
            helpful_score = performance['average_helpful_score']
            if helpful_score:
                st.metric("平均有帮助度", f"{helpful_score:.1f}/5")
            else:
                st.metric("平均有帮助度", "暂无数据")
        
        with col4:
            accuracy_score = performance['average_accuracy_score']
            if accuracy_score:
                st.metric("平均准确度", f"{accuracy_score:.1f}/5")
            else:
                st.metric("平均准确度", "暂无数据")
        
        # 显示最常用知识条目
        if performance['top_used_entries']:
            st.subheader("🏆 最常用知识条目")
            
            usage_df = pd.DataFrame(performance['top_used_entries'])
            fig = px.bar(
                usage_df,
                x='usage_count',
                y='question',
                orientation='h',
                title="知识条目使用频率排名",
                labels={'usage_count': '使用次数', 'question': '问题'}
            )
            fig.update_layout(height=400)
            st.plotly_chart(fig, use_container_width=True)
        
        # 显示建议
        st.subheader(" 优化建议")
        
        if performance['total_conversations'] < 10:
            st.info("系统使用数据较少,建议积累更多对话数据后再进行分析")
        elif performance['average_confidence'] < 0.6:
            st.warning("""
            **系统置信度偏低,建议:**
            1. 扩充知识库覆盖范围
            2. 优化现有知识条目的表述
            3. 降低部分条目的置信度阈值
            4. 添加更多带图片的知识条目
            """)
        else:
            st.success("""
            **系统运行良好,建议:**
            1. 定期更新知识库内容
            2. 收集用户反馈优化答案质量
            3. 考虑添加更多专业领域知识
            4. 监控系统性能指标变化
            """)

def main():
    """主函数"""
    app = CustomerServiceWebApp()
    app.run()

if __name__ == "__main__":
    main()

5. 实际应用案例演示

现在让我们通过几个实际场景来看看这个系统如何工作。

5.1 场景一:电商商品咨询

用户场景:小王在电商平台看中一件衣服,但不确定是否有货,于是上传图片咨询客服。

# 示例代码:电商商品咨询
def example_ecommerce_query():
    """电商商品咨询示例"""
    system = CustomerServiceSystem()
    
    # 先添加一些知识库条目
    system.add_knowledge_entry(
        question="红色连衣裙有货吗?",
        answer="红色连衣裙目前有货,尺码S、M、L齐全,库存充足。",
        category="服装",
        tags=["连衣裙", "红色", "库存"]
    )
    
    system.add_knowledge_entry(
        question="这件衣服是什么材质?",
        answer="这款连衣裙采用95%棉和5%氨纶混纺,舒适透气,有弹性。",
        category="服装",
        tags=["材质", "面料", "成分"]
    )
    
    # 模拟用户查询(带图片)
    # 注意:实际使用时需要真实的图片路径
    result = system.process_customer_query(
        query_text="这件衣服有货吗?",
        query_image_path="red_dress.jpg",  # 假设有这张图片
        user_id="customer_001"
    )
    
    print("查询结果:")
    print(f"回答:{result['response']}")
    print(f"置信度:{result['confidence_score']:.2%}")
    print(f"匹配的知识条目ID:{result['matched_entry_id']}")
    
    # 显示候选答案
    print("\n候选答案排名:")
    for i, candidate in enumerate(result['all_candidates'][:3], 1):
        print(f"{i}. {candidate['question'][:30]}... - 分数:{candidate['score']:.3f}")

# 运行示例
example_ecommerce_query()

预期输出

查询结果:
回答:红色连衣裙目前有货,尺码S、M、L齐全,库存充足。
置信度:92.50%
匹配的知识条目ID:a1b2c3d4e5f6

候选答案排名:
1. 红色连衣裙有货吗?... - 分数:0.925
2. 这件衣服是什么材质?... - 分数:0.312
3. ... - 分数:0.105

5.2 场景二:技术支持问题

用户场景:小李的软件出现错误,他截图错误信息咨询技术支持。

# 示例代码:技术支持问题
def example_tech_support():
    """技术支持问题示例"""
    system = CustomerServiceSystem()
    
    # 添加技术支持知识库
    system.add_knowledge_entry(
        question="软件启动时报错'DLL not found'怎么办?",
        answer="请尝试以下步骤:1. 重新安装Visual C++运行库 2. 以管理员身份运行 3. 检查杀毒软件是否误删文件",
        category="技术支持",
        tags=["启动错误", "DLL", "运行库"],
        confidence_threshold=0.6
    )
    
    system.add_knowledge_entry(
        question="如何导出数据报表?",
        answer="在菜单栏选择'文件'->'导出'->'报表',选择格式和保存位置即可。",
        category="使用指导",
        tags=["导出", "报表", "数据"]
    )
    
    # 模拟用户查询(带错误截图)
    result = system.process_customer_query(
        query_text="启动软件时出现这个错误,怎么解决?",
        query_image_path="error_screenshot.png",  # 错误截图
        user_id="user_tech_001"
    )
    
    print("技术支持查询结果:")
    print(f"回答:{result['response']}")
    print(f"置信度:{result['confidence_score']:.2%}")
    
    # 如果置信度不够高,系统会给出备选建议
    if result['confidence_score'] < 0.7:
        print("\n系统提示:虽然找到了相关答案,但置信度不高,建议:")
        print("1. 提供更详细的错误描述")
        print("2. 联系人工技术支持")
        print("3. 查看软件日志文件")

# 运行示例
example_tech_support()

5.3 场景三:多轮对话

用户场景:用户与客服进行多轮对话,系统能记住上下文。

# 示例代码:多轮对话
def example_multi_turn_conversation():
    """多轮对话示例"""
    system = CustomerServiceSystem()
    
    # 添加知识库
    system.add_knowledge_entry(
        question="你们的退货政策是什么?",
        answer="我们支持7天无理由退货,商品需保持完好,不影响二次销售。",
        category="售后政策",
        tags=["退货", "售后", "政策"]
    )
    
    system.add_knowledge_entry(
        question="退货运费谁承担?",
        answer="非质量问题退货,运费由客户承担;质量问题退货,我们承担往返运费。",
        category="售后政策",
        tags=["运费", "退货", "责任"]
    )
    
    system.add_knowledge_entry(
        question="退货后多久能收到退款?",
        answer="我们收到退货商品并验收通过后,会在3-5个工作日内处理退款。",
        category="售后政策",
        tags=["退款", "时效", "退货"]
    )
    
    # 模拟多轮对话
    conversations = [
        ("我想了解一下退货政策", None),
        ("如果是衣服尺寸不合适,运费谁出?", None),
        ("那退款要等多久?", None)
    ]
    
    user_id = "customer_multi_001"
    
    for i, (query_text, image_path) in enumerate(conversations, 1):
        print(f"\n=== 第{i}轮对话 ===")
        print(f"用户:{query_text}")
        
        result = system.process_customer_query(
            query_text=query_text,
            query_image_path=image_path,
            user_id=user_id
        )
        
        print(f"客服:{result['response']}")
        print(f"置信度:{result['confidence_score']:.2%}")
    
    # 查看对话历史
    print(f"\n=== 用户{user_id}的对话历史 ===")
    history = system.get_conversation_history(user_id, limit=5)
    for item in history:
        print(f"[{item['timestamp']}] 用户:{item['query_text'][:30]}...")
        print(f"     客服:{item['response_text'][:30]}... (置信度:{item['confidence_score']:.2%})")

# 运行示例
example_multi_turn_conversation()

6. 系统优化与进阶技巧

6.1 性能优化建议

  1. 缓存优化
# 添加查询缓存
from functools import lru_cache
import hashlib

class OptimizedCustomerServiceSystem(CustomerServiceSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.evaluation_cache = {}
    
    def _get_cache_key(self, query_text, document_text, query_image_path, document_image_path):
        """生成缓存键"""
        key_parts = [
            query_text,
            document_text,
            query_image_path or "",
            document_image_path or ""
        ]
        key_string = "|".join(key_parts)
        return hashlib.md5(key_string.encode()).hexdigest()
    
    def evaluate_relevance_with_cache(self, query_text, document_text, 
                                     query_image_path=None, document_image_path=None):
        """带缓存的评估"""
        cache_key = self._get_cache_key(query_text, document_text, 
                                       query_image_path, document_image_path)
        
        if cache_key in self.evaluation_cache:
            return self.evaluation_cache[cache_key]
        
        # 加载图片
        query_image = self._load_image(query_image_path) if query_image_path else None
        document_image = self._load_image(document_image_path) if document_image_path else None
        
        # 评估
        score = self.evaluator.evaluate_relevance(
            query_text=query_text,
            document_text=document_text,
            query_image=query_image,
            document_image=document_image
        )
        
        # 缓存结果(设置合适的过期时间)
        self.evaluation_cache[cache_key] = score
        if len(self.evaluation_cache) > 1000:  # 限制缓存大小
            # 移除最旧的条目
            oldest_key = next(iter(self.evaluation_cache))
            del self.evaluation_cache[oldest_key]
        
        return score
  1. 批量处理优化
# 批量处理查询
def batch_process_queries(self, queries_batch):
    """批量处理查询,提高效率"""
    # 预处理所有查询
    preprocessed_queries = []
    for query in queries_batch:
        preprocessed = self._preprocess_query(query)
        preprocessed_queries.append(preprocessed)
    
    # 批量评估
    batch_scores = self.evaluator.batch_evaluate(
        queries=preprocessed_queries,
        documents=self.knowledge_base
    )
    
    # 批量生成响应
    responses = []
    for query, scores in zip(queries_batch, batch_scores):
        best_match_idx = scores.index(max(scores))
        if scores[best_match_idx] >= self.knowledge_base[best_match_idx].confidence_threshold:
            response = self.knowledge_base[best_match_idx].answer
        else:
            response = self._generate_fallback_response(query, scores)
        responses.append(response)
    
    return responses

6.2 准确度提升技巧

  1. 知识库优化策略
def optimize_knowledge_base(self):
    """优化知识库"""
    optimization_suggestions = []
    
    # 分析知识库使用情况
    conn = sqlite3.connect(self.db_path)
    cursor = conn.cursor()
    
    # 找出很少被使用的条目
    cursor.execute('''
        SELECT kb.id, kb.question, COUNT(ch.id) as usage_count
        FROM knowledge_base kb
        LEFT JOIN conversation_history ch ON kb.id = ch.matched_entry_id
        GROUP BY kb.id
        HAVING usage_count < 3
        ORDER BY usage_count ASC
    ''')
    
    low_usage_entries = cursor.fetchall()
    
    for entry in low_usage_entries[:10]:  # 只看前10个
        optimization_suggestions.append({
            'type': '低使用率',
            'entry_id': entry[0],
            'question': entry[1],
            'usage_count': entry[2],
            'suggestion': '考虑优化问题表述或合并到其他条目'
        })
    
    # 找出低置信度的条目
    cursor.execute('''
        SELECT ch.matched_entry_id, AVG(ch.confidence_score) as avg_confidence
        FROM conversation_history ch
        WHERE ch.matched_entry_id IS NOT NULL
        GROUP BY c
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐