手把手教你用Qwen2.5-VL构建智能客服问答系统
想象一下,你是一家电商公司的客服主管。每天,你的团队需要处理成千上万的用户咨询,其中不仅有文字问题,还有大量用户上传的商品图片、订单截图、物流单号照片。传统的文本客服机器人面对这些图片时束手无策,只能回复“请用文字描述您的问题”,用户体验大打折扣。这就是我们今天要解决的问题。本文将带你从零开始,使用,构建一个真正能“看懂”图片的智能客服问答系统。这个系统不仅能理解用户的文字问题,还能分析用户上传的
手把手教你用Qwen2.5-VL构建智能客服问答系统
1. 引言
想象一下,你是一家电商公司的客服主管。每天,你的团队需要处理成千上万的用户咨询,其中不仅有文字问题,还有大量用户上传的商品图片、订单截图、物流单号照片。传统的文本客服机器人面对这些图片时束手无策,只能回复“请用文字描述您的问题”,用户体验大打折扣。
这就是我们今天要解决的问题。本文将带你从零开始,使用 Qwen2.5-VL多模态语义相关度评估引擎,构建一个真正能“看懂”图片的智能客服问答系统。这个系统不仅能理解用户的文字问题,还能分析用户上传的图片,智能判断客服知识库中哪个答案最相关,从而提供精准的回复。
通过本教程,你将学会:
- 如何快速部署Qwen2.5-VL多模态评估引擎
- 如何构建一个支持图文混合输入的智能客服系统
- 如何将语义相关度评估集成到实际业务场景中
- 如何优化系统性能,提升客服响应准确率
无论你是AI工程师、产品经理,还是对智能客服感兴趣的技术爱好者,这篇教程都将为你提供一套完整、可落地的解决方案。
2. 系统核心:Qwen2.5-VL多模态评估引擎
在开始动手之前,我们先来了解一下这个系统的“大脑”——Qwen2.5-VL多模态语义相关度评估引擎。
2.1 它能做什么?
简单来说,这个引擎就像一个智能裁判。当你给它一个用户的问题(可能是文字,也可能是文字+图片),再给它一个候选的客服答案,它就能判断这个答案是否真的回答了用户的问题,并给出一个0到1的分数。
举个例子:
- 用户查询:文字“这件衣服有货吗?” + 图片(一件红色连衣裙的照片)
- 候选答案1:“红色连衣裙目前有货,尺码齐全”
- 候选答案2:“黑色西装外套需要预定”
引擎会分析图片中的红色连衣裙,然后判断:答案1直接提到了“红色连衣裙”,相关性很高,可能得0.9分;答案2说的是“黑色西装外套”,完全不相关,可能得0.1分。
2.2 为什么选择这个方案?
传统的客服系统有三大痛点:
- 看不懂图片:用户发个商品图问“这个有货吗?”,系统无法识别图片内容
- 语义理解浅:只能做关键词匹配,无法理解问题的真实意图
- 答案质量差:经常给出看似相关但实际没用的答案
Qwen2.5-VL引擎正好解决了这些问题:
- 多模态理解:文字和图片都能处理
- 深度语义分析:不是简单的关键词匹配,而是真正的理解
- 概率化评分:给出0-1的置信度分数,让你知道答案有多可靠
2.3 技术架构概览
整个评估过程可以简化为以下流程:
用户输入(文字/图片)
│
▼
多模态Prompt构造
│
▼
Qwen2.5-VL多模态模型
│
▼
语义推理与分析
│
▼
相关度评分(0~1)
这个流程的核心是Qwen2.5-VL模型,它基于先进的视觉-语言大模型技术,能够同时理解图像和文本的语义信息。
3. 环境准备与快速部署
现在让我们开始动手搭建。首先需要准备好运行环境。
3.1 系统要求
确保你的环境满足以下要求:
- 操作系统:Linux(Ubuntu 20.04+推荐)或 macOS
- Python版本:3.8 或更高版本
- 内存:至少16GB RAM
- GPU:推荐使用NVIDIA GPU(显存8GB+),CPU也可运行但速度较慢
- 磁盘空间:至少20GB可用空间
3.2 一键部署脚本
我们提供了一个完整的部署脚本,可以自动完成所有环境配置:
#!/bin/bash
# deploy_qwen_vl_customer_service.sh
echo "开始部署Qwen2.5-VL智能客服系统..."
# 1. 创建项目目录
mkdir -p qwen_vl_customer_service
cd qwen_vl_customer_service
# 2. 创建Python虚拟环境
echo "创建Python虚拟环境..."
python3 -m venv venv
source venv/bin/activate
# 3. 安装基础依赖
echo "安装Python依赖包..."
pip install --upgrade pip
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
pip install transformers>=4.35.0
pip install accelerate
pip install sentencepiece
pip install protobuf
pip install pillow
pip install requests
pip install streamlit # 用于Web界面
# 4. 下载Qwen2.5-VL模型
echo "下载Qwen2.5-VL模型..."
# 这里使用Hugging Face的模型,确保你有访问权限
# 或者使用ModelScope(国内访问更快)
pip install modelscope
python -c "from modelscope import snapshot_download; snapshot_download('qwen/Qwen2.5-VL-7B-Instruct', cache_dir='./models')"
echo "部署完成!"
echo "请运行:source venv/bin/activate 激活环境"
将上述脚本保存为deploy.sh,然后执行:
chmod +x deploy.sh
./deploy.sh
3.3 验证安装
部署完成后,运行一个简单的测试脚本验证环境是否正常:
# test_environment.py
import torch
import transformers
from PIL import Image
import requests
from io import BytesIO
print("=== 环境验证测试 ===")
# 1. 检查PyTorch和CUDA
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"GPU设备: {torch.cuda.get_device_name(0)}")
print(f"显存: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
# 2. 检查transformers版本
print(f"Transformers版本: {transformers.__version__}")
# 3. 测试图片处理
try:
# 下载测试图片
url = "https://images.unsplash.com/photo-1490481651871-ab68de25d43d"
response = requests.get(url)
img = Image.open(BytesIO(response.content))
print(f"图片处理测试: 成功加载图片,尺寸: {img.size}")
except Exception as e:
print(f"图片处理测试失败: {e}")
print("=== 环境验证完成 ===")
运行测试:
python test_environment.py
如果一切正常,你会看到类似这样的输出:
=== 环境验证测试 ===
PyTorch版本: 2.1.0
CUDA可用: True
GPU设备: NVIDIA GeForce RTX 4090
显存: 24.00 GB
Transformers版本: 4.35.0
图片处理测试: 成功加载图片,尺寸: (600, 400)
=== 环境验证完成 ===
4. 构建智能客服问答系统
现在环境已经准备好了,让我们开始构建核心的智能客服系统。
4.1 系统架构设计
我们的智能客服系统包含以下核心组件:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 用户界面层 │ │ 业务逻辑层 │ │ 数据存储层 │
│ │ │ │ │ │
│ - Web界面 │◄──►│ - 查询处理 │◄──►│ - 知识库 │
│ - API接口 │ │ - 图片分析 │ │ - 对话历史 │
│ - 文件上传 │ │ - 语义匹配 │ │ - 用户数据 │
└─────────────────┘ │ - 答案排序 │ └─────────────────┘
│ - 回复生成 │
└─────────────────┘
│
▼
┌─────────────────┐
│ 模型服务层 │
│ │
│ - Qwen2.5-VL │
│ - 评估引擎 │
└─────────────────┘
4.2 核心代码实现
让我们从最核心的部分开始——Qwen2.5-VL评估引擎的封装。
# qwen_vl_evaluator.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from PIL import Image
import numpy as np
from typing import Union, List, Dict, Optional
import logging
class QwenVLEvaluator:
"""Qwen2.5-VL多模态语义评估器"""
def __init__(self, model_path: str = "Qwen/Qwen2.5-VL-7B-Instruct", device: str = None):
"""
初始化评估器
Args:
model_path: 模型路径,可以是本地路径或HuggingFace模型ID
device: 运行设备,'cuda'或'cpu',默认自动选择
"""
self.logger = logging.getLogger(__name__)
# 自动选择设备
if device is None:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
else:
self.device = device
self.logger.info(f"使用设备: {self.device}")
# 加载模型和tokenizer
self.logger.info("正在加载Qwen2.5-VL模型...")
self.tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True
)
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.bfloat16 if self.device == "cuda" else torch.float32,
device_map="auto" if self.device == "cuda" else None,
trust_remote_code=True
).eval()
self.logger.info("模型加载完成")
def prepare_query_prompt(self, query_text: str, query_image: Optional[Image.Image] = None) -> str:
"""
准备查询的prompt
Args:
query_text: 查询文本
query_image: 查询图片(可选)
Returns:
格式化后的prompt
"""
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "请分析以下用户问题,理解其核心意图。"},
]
}
]
# 添加文本
messages[0]["content"].append({"type": "text", "text": f"用户问题: {query_text}"})
# 如果有图片,添加图片
if query_image is not None:
messages[0]["content"].append({"type": "image", "image": query_image})
# 添加指令
messages[0]["content"].append({
"type": "text",
"text": "请用一句话总结用户的核心需求。"
})
return messages
def prepare_document_prompt(self, document_text: str, document_image: Optional[Image.Image] = None) -> str:
"""
准备文档(候选答案)的prompt
Args:
document_text: 文档文本
document_image: 文档图片(可选)
Returns:
格式化后的prompt
"""
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "请分析以下客服回答内容。"},
]
}
]
# 添加文本
messages[0]["content"].append({"type": "text", "text": f"客服回答: {document_text}"})
# 如果有图片,添加图片
if document_image is not None:
messages[0]["content"].append({"type": "image", "image": document_image})
# 添加指令
messages[0]["content"].append({
"type": "text",
"text": "请用一句话总结这个回答的核心内容。"
})
return messages
def evaluate_relevance(self,
query_text: str,
document_text: str,
query_image: Optional[Image.Image] = None,
document_image: Optional[Image.Image] = None) -> float:
"""
评估查询与文档的相关性
Args:
query_text: 查询文本
document_text: 文档文本
query_image: 查询图片(可选)
document_image: 文档图片(可选)
Returns:
相关度分数(0-1)
"""
try:
# 构造评估prompt
messages = [
{
"role": "system",
"content": "你是一个智能客服评估专家。请评估用户问题与客服回答的相关性。"
},
{
"role": "user",
"content": []
}
]
# 添加查询部分
messages[1]["content"].append({
"type": "text",
"text": f"用户问题: {query_text}"
})
if query_image is not None:
messages[1]["content"].append({"type": "image", "image": query_image})
# 添加文档部分
messages[1]["content"].append({
"type": "text",
"text": f"客服回答: {document_text}"
})
if document_image is not None:
messages[1]["content"].append({"type": "image", "image": document_image})
# 添加评估指令
messages[1]["content"].append({
"type": "text",
"text": """请评估这个客服回答是否直接、准确地回答了用户问题。
请按照以下格式回答:
1. 相关性分析:[简要分析]
2. 匹配程度:高/中/低
3. 置信度分数:0.0-1.0之间的数字
注意:分数1.0表示完全匹配,0.0表示完全不相关。"""
})
# 生成评估结果
text = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
**model_inputs,
max_new_tokens=512,
do_sample=False
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
# 从响应中提取分数
score = self._extract_score_from_response(response)
return score
except Exception as e:
self.logger.error(f"评估过程中出错: {e}")
return 0.0
def _extract_score_from_response(self, response: str) -> float:
"""
从模型响应中提取分数
Args:
response: 模型生成的文本
Returns:
提取的分数(0-1)
"""
import re
# 尝试多种方式提取分数
patterns = [
r'置信度分数[::]\s*([0-9]*\.?[0-9]+)', # 中文冒号
r'分数[::]\s*([0-9]*\.?[0-9]+)', # 简单分数
r'([0-9]*\.?[0-9]+)\s*分', # XX分
r'score[::]\s*([0-9]*\.?[0-9]+)', # 英文score
]
for pattern in patterns:
match = re.search(pattern, response, re.IGNORECASE)
if match:
try:
score = float(match.group(1))
# 确保分数在0-1范围内
if score > 1.0:
score = score / 100.0 # 假设是百分比
return max(0.0, min(1.0, score))
except ValueError:
continue
# 如果没有找到数字,尝试根据文本判断
if any(word in response.lower() for word in ['高度相关', '完全匹配', '非常准确']):
return 0.9
elif any(word in response.lower() for word in ['中等相关', '部分匹配', '基本准确']):
return 0.6
elif any(word in response.lower() for word in ['低相关', '不匹配', '不准确']):
return 0.3
else:
return 0.5 # 默认中等分数
def batch_evaluate(self,
queries: List[Dict],
documents: List[Dict]) -> List[float]:
"""
批量评估多个查询-文档对
Args:
queries: 查询列表,每个元素是{'text': str, 'image': Optional[Image]}
documents: 文档列表,每个元素是{'text': str, 'image': Optional[Image]}
Returns:
相关度分数列表
"""
scores = []
for query, document in zip(queries, documents):
score = self.evaluate_relevance(
query_text=query.get('text', ''),
document_text=document.get('text', ''),
query_image=query.get('image'),
document_image=document.get('image')
)
scores.append(score)
return scores
4.3 智能客服系统实现
有了评估引擎,现在我们来构建完整的智能客服系统:
# customer_service_system.py
import json
import sqlite3
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
import logging
from qwen_vl_evaluator import QwenVLEvaluator
from PIL import Image
import hashlib
@dataclass
class CustomerQuery:
"""客户查询"""
query_id: str
text: str
image_path: Optional[str] = None
timestamp: datetime = None
user_id: Optional[str] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
@property
def image(self) -> Optional[Image.Image]:
"""获取图片对象"""
if self.image_path and Path(self.image_path).exists():
try:
return Image.open(self.image_path)
except Exception as e:
logging.warning(f"无法加载图片 {self.image_path}: {e}")
return None
return None
@dataclass
class KnowledgeBaseEntry:
"""知识库条目"""
entry_id: str
question: str
answer: str
image_path: Optional[str] = None
category: str = "general"
tags: List[str] = None
confidence_threshold: float = 0.7
def __post_init__(self):
if self.tags is None:
self.tags = []
@property
def image(self) -> Optional[Image.Image]:
"""获取图片对象"""
if self.image_path and Path(self.image_path).exists():
try:
return Image.open(self.image_path)
except Exception as e:
logging.warning(f"无法加载图片 {self.image_path}: {e}")
return None
return None
class CustomerServiceSystem:
"""智能客服系统"""
def __init__(self,
model_path: str = "Qwen/Qwen2.5-VL-7B-Instruct",
db_path: str = "customer_service.db"):
"""
初始化客服系统
Args:
model_path: Qwen2.5-VL模型路径
db_path: 数据库路径
"""
self.logger = logging.getLogger(__name__)
# 初始化评估器
self.evaluator = QwenVLEvaluator(model_path)
# 初始化数据库
self.db_path = db_path
self._init_database()
# 知识库缓存
self.knowledge_base = []
self._load_knowledge_base()
# 对话历史
self.conversation_history = {}
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建知识库表
cursor.execute('''
CREATE TABLE IF NOT EXISTS knowledge_base (
id TEXT PRIMARY KEY,
question TEXT NOT NULL,
answer TEXT NOT NULL,
image_path TEXT,
category TEXT,
tags TEXT,
confidence_threshold REAL DEFAULT 0.7,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建对话历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversation_history (
id TEXT PRIMARY KEY,
user_id TEXT,
query_text TEXT NOT NULL,
query_image_path TEXT,
matched_entry_id TEXT,
confidence_score REAL,
response_text TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (matched_entry_id) REFERENCES knowledge_base (id)
)
''')
# 创建用户反馈表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_feedback (
id TEXT PRIMARY KEY,
conversation_id TEXT,
helpful_score INTEGER CHECK(helpful_score >= 1 AND helpful_score <= 5),
accuracy_score INTEGER CHECK(accuracy_score >= 1 AND accuracy_score <= 5),
comments TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversation_history (id)
)
''')
conn.commit()
conn.close()
def _load_knowledge_base(self):
"""从数据库加载知识库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT * FROM knowledge_base')
rows = cursor.fetchall()
self.knowledge_base = []
for row in rows:
entry = KnowledgeBaseEntry(
entry_id=row[0],
question=row[1],
answer=row[2],
image_path=row[3],
category=row[4],
tags=json.loads(row[5]) if row[5] else [],
confidence_threshold=row[6]
)
self.knowledge_base.append(entry)
conn.close()
self.logger.info(f"加载了 {len(self.knowledge_base)} 条知识库条目")
def add_knowledge_entry(self,
question: str,
answer: str,
image_path: Optional[str] = None,
category: str = "general",
tags: List[str] = None,
confidence_threshold: float = 0.7) -> str:
"""
添加知识库条目
Args:
question: 问题
answer: 答案
image_path: 图片路径(可选)
category: 分类
tags: 标签列表
confidence_threshold: 置信度阈值
Returns:
条目ID
"""
entry_id = hashlib.md5(f"{question}{answer}".encode()).hexdigest()[:12]
if tags is None:
tags = []
entry = KnowledgeBaseEntry(
entry_id=entry_id,
question=question,
answer=answer,
image_path=image_path,
category=category,
tags=tags,
confidence_threshold=confidence_threshold
)
# 保存到数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO knowledge_base
(id, question, answer, image_path, category, tags, confidence_threshold)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
entry_id,
question,
answer,
image_path,
category,
json.dumps(tags),
confidence_threshold
))
conn.commit()
conn.close()
# 更新缓存
self.knowledge_base.append(entry)
return entry_id
def process_customer_query(self,
query_text: str,
query_image_path: Optional[str] = None,
user_id: Optional[str] = None) -> Dict[str, Any]:
"""
处理客户查询
Args:
query_text: 查询文本
query_image_path: 查询图片路径(可选)
user_id: 用户ID(可选)
Returns:
处理结果
"""
query_id = hashlib.md5(f"{query_text}{query_image_path}{datetime.now()}".encode()).hexdigest()[:12]
# 创建查询对象
query = CustomerQuery(
query_id=query_id,
text=query_text,
image_path=query_image_path,
user_id=user_id
)
# 在知识库中查找最佳匹配
best_match = None
best_score = 0.0
all_scores = []
for entry in self.knowledge_base:
# 评估相关性
score = self.evaluator.evaluate_relevance(
query_text=query_text,
document_text=entry.answer,
query_image=query.image,
document_image=entry.image
)
all_scores.append({
'entry_id': entry.entry_id,
'question': entry.question,
'score': score,
'threshold': entry.confidence_threshold
})
# 更新最佳匹配
if score > best_score and score >= entry.confidence_threshold:
best_score = score
best_match = entry
# 准备响应
if best_match and best_score >= (best_match.confidence_threshold if best_match else 0.7):
response_text = best_match.answer
matched_entry_id = best_match.entry_id
confidence = best_score
else:
# 没有找到足够置信度的匹配
response_text = "抱歉,我暂时无法准确回答您的问题。您可以尝试重新描述问题,或联系人工客服获取帮助。"
matched_entry_id = None
confidence = best_score if best_score > 0 else 0.0
# 如果有一些接近的匹配,可以提示用户
if all_scores:
top_scores = sorted(all_scores, key=lambda x: x['score'], reverse=True)[:3]
if top_scores[0]['score'] > 0.5:
response_text += "\n\n以下是一些可能相关的信息:"
for i, item in enumerate(top_scores[:2], 1):
if item['score'] > 0.5:
response_text += f"\n{i}. {item['question']}"
# 保存对话历史
self._save_conversation(
query_id=query_id,
user_id=user_id,
query_text=query_text,
query_image_path=query_image_path,
matched_entry_id=matched_entry_id,
confidence_score=confidence,
response_text=response_text
)
# 返回结果
return {
'query_id': query_id,
'response': response_text,
'matched_entry_id': matched_entry_id,
'confidence_score': confidence,
'all_candidates': all_scores[:5], # 返回前5个候选
'has_image': query_image_path is not None
}
def _save_conversation(self,
query_id: str,
user_id: Optional[str],
query_text: str,
query_image_path: Optional[str],
matched_entry_id: Optional[str],
confidence_score: float,
response_text: str):
"""保存对话历史"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO conversation_history
(id, user_id, query_text, query_image_path, matched_entry_id, confidence_score, response_text)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
query_id,
user_id,
query_text,
query_image_path,
matched_entry_id,
confidence_score,
response_text
))
conn.commit()
conn.close()
def get_conversation_history(self, user_id: str, limit: int = 10) -> List[Dict]:
"""获取用户对话历史"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM conversation_history
WHERE user_id = ?
ORDER BY timestamp DESC
LIMIT ?
''', (user_id, limit))
rows = cursor.fetchall()
conn.close()
history = []
for row in rows:
history.append({
'query_id': row[0],
'query_text': row[2],
'response_text': row[6],
'confidence_score': row[5],
'timestamp': row[7]
})
return history
def add_feedback(self,
conversation_id: str,
helpful_score: int,
accuracy_score: int,
comments: str = "") -> bool:
"""
添加用户反馈
Args:
conversation_id: 对话ID
helpful_score: 有帮助程度(1-5)
accuracy_score: 准确度(1-5)
comments: 评论
Returns:
是否成功
"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
feedback_id = hashlib.md5(f"{conversation_id}{datetime.now()}".encode()).hexdigest()[:12]
cursor.execute('''
INSERT INTO user_feedback
(id, conversation_id, helpful_score, accuracy_score, comments)
VALUES (?, ?, ?, ?, ?)
''', (
feedback_id,
conversation_id,
helpful_score,
accuracy_score,
comments
))
conn.commit()
conn.close()
return True
except Exception as e:
self.logger.error(f"添加反馈失败: {e}")
return False
def analyze_system_performance(self) -> Dict[str, Any]:
"""分析系统性能"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 总体统计
cursor.execute('SELECT COUNT(*) FROM conversation_history')
total_conversations = cursor.fetchone()[0]
cursor.execute('SELECT AVG(confidence_score) FROM conversation_history WHERE confidence_score > 0')
avg_confidence = cursor.fetchone()[0] or 0
# 反馈统计
cursor.execute('SELECT AVG(helpful_score), AVG(accuracy_score) FROM user_feedback')
feedback_result = cursor.fetchone()
avg_helpful = feedback_result[0] or 0
avg_accuracy = feedback_result[1] or 0
# 知识库使用统计
cursor.execute('''
SELECT kb.question, COUNT(ch.id) as usage_count
FROM knowledge_base kb
LEFT JOIN conversation_history ch ON kb.id = ch.matched_entry_id
GROUP BY kb.id
ORDER BY usage_count DESC
LIMIT 10
''')
top_entries = cursor.fetchall()
conn.close()
return {
'total_conversations': total_conversations,
'average_confidence': round(avg_confidence, 3),
'average_helpful_score': round(avg_helpful, 2) if avg_helpful else None,
'average_accuracy_score': round(avg_accuracy, 2) if avg_accuracy else None,
'top_used_entries': [
{'question': row[0], 'usage_count': row[1]}
for row in top_entries
]
}
4.4 Web界面实现
为了让非技术人员也能使用这个系统,我们创建一个简单的Web界面:
# web_interface.py
import streamlit as st
import pandas as pd
from PIL import Image
import tempfile
import os
from datetime import datetime
from customer_service_system import CustomerServiceSystem
import plotly.express as px
import plotly.graph_objects as go
class CustomerServiceWebApp:
"""客服系统Web应用"""
def __init__(self):
st.set_page_config(
page_title="智能客服系统",
page_icon="",
layout="wide"
)
# 初始化系统
if 'service_system' not in st.session_state:
st.session_state.service_system = CustomerServiceSystem()
self.system = st.session_state.service_system
# 初始化会话状态
if 'conversation_history' not in st.session_state:
st.session_state.conversation_history = []
if 'current_user_id' not in st.session_state:
st.session_state.current_user_id = f"user_{datetime.now().strftime('%Y%m%d%H%M%S')}"
def run(self):
"""运行Web应用"""
st.title(" 智能客服问答系统")
st.markdown("基于Qwen2.5-VL的多模态语义理解客服系统")
# 侧边栏
with st.sidebar:
st.header("系统设置")
# 用户ID
user_id = st.text_input(
"用户ID",
value=st.session_state.current_user_id,
help="用于标识用户,保存对话历史"
)
st.session_state.current_user_id = user_id
# 知识库管理
st.subheader("知识库管理")
if st.button("查看知识库"):
self.show_knowledge_base()
if st.button("添加知识条目"):
self.add_knowledge_entry()
# 系统分析
st.subheader("系统分析")
if st.button("查看系统性能"):
self.show_system_performance()
# 清空对话
if st.button("清空当前对话"):
st.session_state.conversation_history = []
st.rerun()
# 主界面 - 对话区域
col1, col2 = st.columns([2, 1])
with col1:
st.subheader(" 客服对话")
# 显示对话历史
self.display_conversation_history()
# 输入区域
st.divider()
self.display_input_area()
with col2:
st.subheader(" 对话分析")
self.display_conversation_analysis()
def display_conversation_history(self):
"""显示对话历史"""
if st.session_state.conversation_history:
for msg in st.session_state.conversation_history:
with st.chat_message(msg["role"]):
st.write(msg["content"])
# 显示置信度
if msg.get("confidence"):
confidence = msg["confidence"]
color = "green" if confidence > 0.7 else "orange" if confidence > 0.5 else "red"
st.markdown(f"<small>置信度: <span style='color:{color}'>{confidence:.2%}</span></small>",
unsafe_allow_html=True)
# 显示图片
if msg.get("image"):
st.image(msg["image"], caption="用户上传的图片", use_column_width=True)
else:
st.info("还没有对话记录,开始提问吧!")
def display_input_area(self):
"""显示输入区域"""
with st.form(key="query_form", clear_on_submit=True):
# 文本输入
query_text = st.text_area(
"请输入您的问题",
placeholder="例如:这件衣服有货吗?或者描述您遇到的问题...",
height=100
)
# 图片上传
uploaded_file = st.file_uploader(
"上传相关图片(可选)",
type=['jpg', 'jpeg', 'png', 'gif'],
help="可以上传商品图片、订单截图等"
)
# 提交按钮
submitted = st.form_submit_button("发送", use_container_width=True)
if submitted and query_text:
# 处理图片
image_path = None
if uploaded_file is not None:
# 保存临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmp_file:
tmp_file.write(uploaded_file.getvalue())
image_path = tmp_file.name
# 添加用户消息到历史
user_msg = {
"role": "user",
"content": query_text,
"timestamp": datetime.now()
}
if uploaded_file:
user_msg["image"] = Image.open(uploaded_file)
st.session_state.conversation_history.append(user_msg)
# 处理查询
with st.spinner("正在分析您的问题..."):
result = self.system.process_customer_query(
query_text=query_text,
query_image_path=image_path,
user_id=st.session_state.current_user_id
)
# 添加系统回复到历史
system_msg = {
"role": "assistant",
"content": result["response"],
"confidence": result["confidence_score"],
"timestamp": datetime.now()
}
st.session_state.conversation_history.append(system_msg)
# 清理临时文件
if image_path and os.path.exists(image_path):
os.unlink(image_path)
st.rerun()
def display_conversation_analysis(self):
"""显示对话分析"""
if len(st.session_state.conversation_history) >= 2:
# 计算平均置信度
confidences = [
msg["confidence"] for msg in st.session_state.conversation_history
if msg["role"] == "assistant" and "confidence" in msg
]
if confidences:
avg_confidence = sum(confidences) / len(confidences)
# 显示置信度图表
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=avg_confidence * 100,
title={'text': "平均回答置信度"},
gauge={
'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 50], 'color': "red"},
{'range': [50, 80], 'color': "orange"},
{'range': [80, 100], 'color': "green"}
],
'threshold': {
'line': {'color': "black", 'width': 4},
'thickness': 0.75,
'value': 70
}
}
))
fig.update_layout(height=300)
st.plotly_chart(fig, use_container_width=True)
# 显示置信度分布
st.subheader("置信度分布")
df = pd.DataFrame({
"回答序号": range(1, len(confidences) + 1),
"置信度": [c * 100 for c in confidences]
})
fig2 = px.line(df, x="回答序号", y="置信度",
title="每次回答的置信度变化",
markers=True)
fig2.update_layout(height=250)
st.plotly_chart(fig2, use_container_width=True)
# 显示建议
if avg_confidence < 0.7:
st.warning(" 系统平均置信度较低,建议优化知识库内容")
else:
st.success(" 系统回答质量良好")
# 显示候选答案分析(如果有)
if st.session_state.conversation_history:
last_assistant_msg = None
for msg in reversed(st.session_state.conversation_history):
if msg["role"] == "assistant":
last_assistant_msg = msg
break
if last_assistant_msg and "candidates" in last_assistant_msg:
st.subheader(" 候选答案排名")
candidates_df = pd.DataFrame(last_assistant_msg["candidates"])
st.dataframe(
candidates_df.style.background_gradient(subset=['score'], cmap='RdYlGn'),
use_container_width=True
)
def show_knowledge_base(self):
"""显示知识库"""
st.subheader(" 知识库管理")
if self.system.knowledge_base:
# 显示统计信息
categories = {}
for entry in self.system.knowledge_base:
categories[entry.category] = categories.get(entry.category, 0) + 1
col1, col2, col3 = st.columns(3)
with col1:
st.metric("总条目数", len(self.system.knowledge_base))
with col2:
st.metric("分类数量", len(categories))
with col3:
avg_threshold = sum(e.confidence_threshold for e in self.system.knowledge_base) / len(self.system.knowledge_base)
st.metric("平均置信阈值", f"{avg_threshold:.2f}")
# 显示知识库表格
kb_data = []
for entry in self.system.knowledge_base:
kb_data.append({
"ID": entry.entry_id[:8],
"问题": entry.question[:50] + "..." if len(entry.question) > 50 else entry.question,
"答案": entry.answer[:50] + "..." if len(entry.answer) > 50 else entry.answer,
"分类": entry.category,
"标签": ", ".join(entry.tags[:3]),
"置信阈值": entry.confidence_threshold,
"有图片": "" if entry.image_path else ""
})
df = pd.DataFrame(kb_data)
st.dataframe(df, use_container_width=True)
# 分类统计图表
fig = px.pie(
values=list(categories.values()),
names=list(categories.keys()),
title="知识库分类分布"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("知识库为空,请先添加知识条目")
def add_knowledge_entry(self):
"""添加知识条目"""
st.subheader("➕ 添加知识条目")
with st.form(key="add_knowledge_form"):
col1, col2 = st.columns(2)
with col1:
question = st.text_area("问题", height=100)
category = st.text_input("分类", value="general")
tags = st.text_input("标签(用逗号分隔)")
with col2:
answer = st.text_area("答案", height=100)
confidence_threshold = st.slider(
"置信度阈值",
min_value=0.0,
max_value=1.0,
value=0.7,
step=0.05,
help="匹配分数需达到此阈值才会使用该答案"
)
knowledge_image = st.file_uploader(
"相关图片(可选)",
type=['jpg', 'jpeg', 'png']
)
submitted = st.form_submit_button("添加条目")
if submitted and question and answer:
# 处理图片
image_path = None
if knowledge_image:
# 保存图片
upload_dir = "knowledge_images"
os.makedirs(upload_dir, exist_ok=True)
image_path = os.path.join(upload_dir, f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{knowledge_image.name}")
with open(image_path, "wb") as f:
f.write(knowledge_image.getbuffer())
# 处理标签
tag_list = [tag.strip() for tag in tags.split(",")] if tags else []
# 添加条目
entry_id = self.system.add_knowledge_entry(
question=question,
answer=answer,
image_path=image_path,
category=category,
tags=tag_list,
confidence_threshold=confidence_threshold
)
st.success(f" 知识条目添加成功!ID: {entry_id}")
st.rerun()
def show_system_performance(self):
"""显示系统性能分析"""
st.subheader(" 系统性能分析")
performance = self.system.analyze_system_performance()
# 显示关键指标
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("总对话数", performance['total_conversations'])
with col2:
st.metric("平均置信度", f"{performance['average_confidence']:.1%}")
with col3:
helpful_score = performance['average_helpful_score']
if helpful_score:
st.metric("平均有帮助度", f"{helpful_score:.1f}/5")
else:
st.metric("平均有帮助度", "暂无数据")
with col4:
accuracy_score = performance['average_accuracy_score']
if accuracy_score:
st.metric("平均准确度", f"{accuracy_score:.1f}/5")
else:
st.metric("平均准确度", "暂无数据")
# 显示最常用知识条目
if performance['top_used_entries']:
st.subheader("🏆 最常用知识条目")
usage_df = pd.DataFrame(performance['top_used_entries'])
fig = px.bar(
usage_df,
x='usage_count',
y='question',
orientation='h',
title="知识条目使用频率排名",
labels={'usage_count': '使用次数', 'question': '问题'}
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# 显示建议
st.subheader(" 优化建议")
if performance['total_conversations'] < 10:
st.info("系统使用数据较少,建议积累更多对话数据后再进行分析")
elif performance['average_confidence'] < 0.6:
st.warning("""
**系统置信度偏低,建议:**
1. 扩充知识库覆盖范围
2. 优化现有知识条目的表述
3. 降低部分条目的置信度阈值
4. 添加更多带图片的知识条目
""")
else:
st.success("""
**系统运行良好,建议:**
1. 定期更新知识库内容
2. 收集用户反馈优化答案质量
3. 考虑添加更多专业领域知识
4. 监控系统性能指标变化
""")
def main():
"""主函数"""
app = CustomerServiceWebApp()
app.run()
if __name__ == "__main__":
main()
5. 实际应用案例演示
现在让我们通过几个实际场景来看看这个系统如何工作。
5.1 场景一:电商商品咨询
用户场景:小王在电商平台看中一件衣服,但不确定是否有货,于是上传图片咨询客服。
# 示例代码:电商商品咨询
def example_ecommerce_query():
"""电商商品咨询示例"""
system = CustomerServiceSystem()
# 先添加一些知识库条目
system.add_knowledge_entry(
question="红色连衣裙有货吗?",
answer="红色连衣裙目前有货,尺码S、M、L齐全,库存充足。",
category="服装",
tags=["连衣裙", "红色", "库存"]
)
system.add_knowledge_entry(
question="这件衣服是什么材质?",
answer="这款连衣裙采用95%棉和5%氨纶混纺,舒适透气,有弹性。",
category="服装",
tags=["材质", "面料", "成分"]
)
# 模拟用户查询(带图片)
# 注意:实际使用时需要真实的图片路径
result = system.process_customer_query(
query_text="这件衣服有货吗?",
query_image_path="red_dress.jpg", # 假设有这张图片
user_id="customer_001"
)
print("查询结果:")
print(f"回答:{result['response']}")
print(f"置信度:{result['confidence_score']:.2%}")
print(f"匹配的知识条目ID:{result['matched_entry_id']}")
# 显示候选答案
print("\n候选答案排名:")
for i, candidate in enumerate(result['all_candidates'][:3], 1):
print(f"{i}. {candidate['question'][:30]}... - 分数:{candidate['score']:.3f}")
# 运行示例
example_ecommerce_query()
预期输出:
查询结果:
回答:红色连衣裙目前有货,尺码S、M、L齐全,库存充足。
置信度:92.50%
匹配的知识条目ID:a1b2c3d4e5f6
候选答案排名:
1. 红色连衣裙有货吗?... - 分数:0.925
2. 这件衣服是什么材质?... - 分数:0.312
3. ... - 分数:0.105
5.2 场景二:技术支持问题
用户场景:小李的软件出现错误,他截图错误信息咨询技术支持。
# 示例代码:技术支持问题
def example_tech_support():
"""技术支持问题示例"""
system = CustomerServiceSystem()
# 添加技术支持知识库
system.add_knowledge_entry(
question="软件启动时报错'DLL not found'怎么办?",
answer="请尝试以下步骤:1. 重新安装Visual C++运行库 2. 以管理员身份运行 3. 检查杀毒软件是否误删文件",
category="技术支持",
tags=["启动错误", "DLL", "运行库"],
confidence_threshold=0.6
)
system.add_knowledge_entry(
question="如何导出数据报表?",
answer="在菜单栏选择'文件'->'导出'->'报表',选择格式和保存位置即可。",
category="使用指导",
tags=["导出", "报表", "数据"]
)
# 模拟用户查询(带错误截图)
result = system.process_customer_query(
query_text="启动软件时出现这个错误,怎么解决?",
query_image_path="error_screenshot.png", # 错误截图
user_id="user_tech_001"
)
print("技术支持查询结果:")
print(f"回答:{result['response']}")
print(f"置信度:{result['confidence_score']:.2%}")
# 如果置信度不够高,系统会给出备选建议
if result['confidence_score'] < 0.7:
print("\n系统提示:虽然找到了相关答案,但置信度不高,建议:")
print("1. 提供更详细的错误描述")
print("2. 联系人工技术支持")
print("3. 查看软件日志文件")
# 运行示例
example_tech_support()
5.3 场景三:多轮对话
用户场景:用户与客服进行多轮对话,系统能记住上下文。
# 示例代码:多轮对话
def example_multi_turn_conversation():
"""多轮对话示例"""
system = CustomerServiceSystem()
# 添加知识库
system.add_knowledge_entry(
question="你们的退货政策是什么?",
answer="我们支持7天无理由退货,商品需保持完好,不影响二次销售。",
category="售后政策",
tags=["退货", "售后", "政策"]
)
system.add_knowledge_entry(
question="退货运费谁承担?",
answer="非质量问题退货,运费由客户承担;质量问题退货,我们承担往返运费。",
category="售后政策",
tags=["运费", "退货", "责任"]
)
system.add_knowledge_entry(
question="退货后多久能收到退款?",
answer="我们收到退货商品并验收通过后,会在3-5个工作日内处理退款。",
category="售后政策",
tags=["退款", "时效", "退货"]
)
# 模拟多轮对话
conversations = [
("我想了解一下退货政策", None),
("如果是衣服尺寸不合适,运费谁出?", None),
("那退款要等多久?", None)
]
user_id = "customer_multi_001"
for i, (query_text, image_path) in enumerate(conversations, 1):
print(f"\n=== 第{i}轮对话 ===")
print(f"用户:{query_text}")
result = system.process_customer_query(
query_text=query_text,
query_image_path=image_path,
user_id=user_id
)
print(f"客服:{result['response']}")
print(f"置信度:{result['confidence_score']:.2%}")
# 查看对话历史
print(f"\n=== 用户{user_id}的对话历史 ===")
history = system.get_conversation_history(user_id, limit=5)
for item in history:
print(f"[{item['timestamp']}] 用户:{item['query_text'][:30]}...")
print(f" 客服:{item['response_text'][:30]}... (置信度:{item['confidence_score']:.2%})")
# 运行示例
example_multi_turn_conversation()
6. 系统优化与进阶技巧
6.1 性能优化建议
- 缓存优化:
# 添加查询缓存
from functools import lru_cache
import hashlib
class OptimizedCustomerServiceSystem(CustomerServiceSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.evaluation_cache = {}
def _get_cache_key(self, query_text, document_text, query_image_path, document_image_path):
"""生成缓存键"""
key_parts = [
query_text,
document_text,
query_image_path or "",
document_image_path or ""
]
key_string = "|".join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
def evaluate_relevance_with_cache(self, query_text, document_text,
query_image_path=None, document_image_path=None):
"""带缓存的评估"""
cache_key = self._get_cache_key(query_text, document_text,
query_image_path, document_image_path)
if cache_key in self.evaluation_cache:
return self.evaluation_cache[cache_key]
# 加载图片
query_image = self._load_image(query_image_path) if query_image_path else None
document_image = self._load_image(document_image_path) if document_image_path else None
# 评估
score = self.evaluator.evaluate_relevance(
query_text=query_text,
document_text=document_text,
query_image=query_image,
document_image=document_image
)
# 缓存结果(设置合适的过期时间)
self.evaluation_cache[cache_key] = score
if len(self.evaluation_cache) > 1000: # 限制缓存大小
# 移除最旧的条目
oldest_key = next(iter(self.evaluation_cache))
del self.evaluation_cache[oldest_key]
return score
- 批量处理优化:
# 批量处理查询
def batch_process_queries(self, queries_batch):
"""批量处理查询,提高效率"""
# 预处理所有查询
preprocessed_queries = []
for query in queries_batch:
preprocessed = self._preprocess_query(query)
preprocessed_queries.append(preprocessed)
# 批量评估
batch_scores = self.evaluator.batch_evaluate(
queries=preprocessed_queries,
documents=self.knowledge_base
)
# 批量生成响应
responses = []
for query, scores in zip(queries_batch, batch_scores):
best_match_idx = scores.index(max(scores))
if scores[best_match_idx] >= self.knowledge_base[best_match_idx].confidence_threshold:
response = self.knowledge_base[best_match_idx].answer
else:
response = self._generate_fallback_response(query, scores)
responses.append(response)
return responses
6.2 准确度提升技巧
- 知识库优化策略:
def optimize_knowledge_base(self):
"""优化知识库"""
optimization_suggestions = []
# 分析知识库使用情况
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 找出很少被使用的条目
cursor.execute('''
SELECT kb.id, kb.question, COUNT(ch.id) as usage_count
FROM knowledge_base kb
LEFT JOIN conversation_history ch ON kb.id = ch.matched_entry_id
GROUP BY kb.id
HAVING usage_count < 3
ORDER BY usage_count ASC
''')
low_usage_entries = cursor.fetchall()
for entry in low_usage_entries[:10]: # 只看前10个
optimization_suggestions.append({
'type': '低使用率',
'entry_id': entry[0],
'question': entry[1],
'usage_count': entry[2],
'suggestion': '考虑优化问题表述或合并到其他条目'
})
# 找出低置信度的条目
cursor.execute('''
SELECT ch.matched_entry_id, AVG(ch.confidence_score) as avg_confidence
FROM conversation_history ch
WHERE ch.matched_entry_id IS NOT NULL
GROUP BY c更多推荐


所有评论(0)