京东JIMI智能客服公开数据资料实战:构建高效对话分析系统
存储选对,后面少踩 80 % 的坑;清洗+语义一步到位,别让“垃圾进,垃圾出”。实时情绪预警:把 Flink 消费在线对话,调用同一模型写 Elasticsearch,实现 5 min 级情绪大盘。多轮意图提取:用 seq2seq 标注每轮意图,再聚合看“咨询→投诉→退货”漏斗,反向优化客服剧本。知识蒸馏:把高 anger_score 且最终解决的会话挑出来,自动生成 FAQ,反哺机器人训练。如果
背景痛点:JIMI 数据“看起来香,啃起来硬”
京东把 JIMI 智能客服的公开对话数据放出来后,很多团队第一时间下载,结果普遍卡在三个地方:
- 体量惊人:压缩包 8 GB,解压后 50 GB+ 的 JSON,单文件直接吃光 32 GB 内存。
- 结构松散:每行一条对话,字段层级深(
session.messages[].content嵌套数组),MySQL 需要拆成多表才能查。 - 分析无从下手:关键词搜“退货”能扫出 10 W 条,但找不到“情绪激烈且退货”的会话,传统 LIKE 性能瞬间爆炸。
一句话:数据金矿摆在面前,却没有趁手的铲子。
技术选型:为什么把 MySQL 换成 Elasticsearch
| 维度 | MySQL 8.0 | Elasticsearch 8.x |
|---|---|---|
| 全文检索 | 需外挂 Sphinx,延迟 200 ms+ | 内置倒排,10 ms 内 |
| 嵌套 JSON | 拆表或 JSON 字段,查询复杂 | 原生 nested,一条 DSL 搞定 |
| 水平扩展 | 主从只读,分库分表改业务 | 加节点自动重平衡 |
| 聚合 | group by 容易内存临时表 | 分布式 agg,毫秒级 |
结论:JIMI 数据是非结构化文本 + 深度嵌套,分析场景以“搜→聚→看”为主,Elasticsearch 在开发效率与查询性能上都更省心。
核心实现:从脏数据到语义洞察
1. 数据清洗流程(Python 3.10)
目标:把原始 .jsonl 转成“干净、扁平、带情绪标签”的文档。
# clean_jimi.py
import json, re, emoji, tqdm
from pathlib import Path
PUNCT = re.compile(r"[~!@#$%^&*()_+`\-={}|\[\]:\";'<>?,./]")
def clean_text(text: str) -> str:
"""去表情、去网址、去标点"""
text = emoji.replace_emoji(text, replace='')
text = re.sub(r'http\S+', '', text)
text = PUNCT.sub(' ', text)
return ' '.join(text.split()) # 合并多余空格
def iter_session(file_path):
"""原始文件一行一个 session"""
with open(file_path, encoding='utf-8') as f:
for line in f:
yield json.loads(line)
def flatten(session):
"""把嵌套对话拍平,保留上下文"""
sid = session['session_id']
msgs = []
for turn in session['messages']:
role = turn['from'] # user / bot
msgs.append({
'role': role,
'content': clean_text(turn['content']),
'ts': turn['timestamp']
})
return {
'session_id': sid,
'msgs': msgs,
'msg_cnt': len(msgs),
'has_order': bool(session.get('order_id')),
'create_time': session['create_time'][:10] # 只取日期
}
if __name__ == '__main__':
src = Path('jimi_raw.jsonl')
dst = Path('jimi_clean.jsonl')
with dst.open('w', encoding='utf-8') as f_out:
for ses in tqdm.tqdm(iter_session(src), total=6_800_000):
f_out.write(json.dumps(flatten(ses), ensure_ascii=False) + '\n')
跑完得到 6 800 000 条扁平文档,体积从 50 GB 降到 21 GB,磁盘节省 58 %。
2. Elasticsearch 索引设计与优化
2.1 映射模板
PUT /jimi
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 1,
"refresh_interval": "30s",
"analysis": {
"analyzer": {
"cjk_smart": {
"tokenizer": "jieba_index",
"filter": ["lowercase", "stop"]
}
}
}
},
"mappings": {
"properties": {
"session_id": {"type": "keyword"},
"create_time": {"type": "date", "format": "yyyy-MM-dd"},
"has_order": {"type": "boolean"},
"msg_cnt": {"type": "short"},
"msgs": {
"type": "nested",
"properties": {
"role": {"type": "keyword"},
"content": {
"type": "text",
"analyzer": "cjk_smart",
"fields": {"raw": {"type": "keyword"}}
},
"ts": {"type": "date", "format": "epoch_second"}
}
}
}
}
}
说明:
- 6 分片对应 3 节点集群,每节点 2 片,保证 CPU 用满。
- nested 结构让“用户→客服→用户”时序不丢失,也能独立查询任一角色。
- refresh 30 s 兼顾写入吞吐与实时可见性。
2.2 批量灌库(Python)
from elasticsearch import Elasticsearch, helpers
es = Elasticsearch(['http://es1:9200'], request_timeout=120)
def gendocs():
with open('jimi_clean.jsonl', encoding='utf-8') as f:
for line in f:
doc = json.loads(line)
doc['_index'] = 'jimi'
yield doc
helpers.bulk(es, gendocs(), chunk_size=3000, max_retries=3)
单节点 2 万 doc/s,三节点合计 6 万 doc/s,20 min 完成全量。
3. 基于 NLP 的语义分析实现
需求:找出“情绪激烈且要求退货”的会话,传统关键词召回率 62 %,需要语义升级。
步骤:
- 用开源 RoBERTa-wwm-ext 训练二分类模型(激动 / 平和),标注 1 万条,F1=0.89。
- 离线跑完 680 万条,把预测结果写回新字段
msgs.anger_score。 - 查询时加一条 nested filter,秒级返回。
DSL 示例:
GET /jimi/_search
{
"query": {
"bool": {
"must": [
{"nested": {
"path": "msgs",
"query": {"match": { "msgs.content": "退货" }}
}},
{"nested": {
"path": "msgs",
"query": {"range": {"msgs.anger_score": {"gte": 0.8}}}
}}
]
}
},
"aggs": {
"daily": {
"date_histogram": {"field": "create_time", "calendar_interval": "day"}
}
}
}
返回 1 200 条会话,比纯关键词少 85 % 噪声,运营同学直接可用。
性能考量:别让查询拖垮集群
测试环境:3 节点 16 核 64 GB SSD,JVM 31 GB。
| 场景 | 结果集 | 耗时 | 优化前 |
|---|---|---|---|
| 关键词“退货” | 10 W | 120 ms | 800 ms |
| 关键词+anger_score≥0.8 | 1.2 K | 45 ms | — |
| 聚合近 30 天情绪趋势 | 30 桶 | 350 ms | 2 100 ms |
优化手段:
- 把
anger_score从 float 降到half_float,节省 50 % 磁盘。 - 聚合查询加
"execution_hint": "map",对时间直方图效果显著。 - 热温分层:近 7 天热数据 SSD,>7 天自动迁到机械盘,查询慢 15 %,成本降 60 %。
避坑指南:生产踩过的 4 个坑
-
分片数随意定 → 后期扩容痛苦
建议:数据量 <100 GB 用 3~6 片,>500 GB 直接 12 片,避免 split 操作。 -
nested 嵌套层数过深 → 内存爆炸
真实案例:把“词级别”再嵌一层,聚合直接 OOM。保持 1 层 nested 足够。 -
用默认 dynamic mapping,字符串被猜成 text+keyword 双字段 → 索引膨胀 1.8 倍
提前手动建 mapping,关闭"dynamic": "strict",字段缺失就报错,早发现早修正。 -
中文分词用 standard 分词 → 查“客服”把“客”和“服”拆开,召回离谱
一定装 jieba 或 ik,再自定义业务词库(“京东自营”“价保”等),准确率提升 30 %。
总结与展望:把对话数据玩出更多花样
整套流程跑下来,我们把 50 GB 原始日志变成可秒搜、可语义过滤、可聚合的业务资产,核心经验只有两句话:
- 存储选对,后面少踩 80 % 的坑;
- 清洗+语义一步到位,别让“垃圾进,垃圾出”。
下一步可继续扩展:
- 实时情绪预警:把 Flink 消费在线对话,调用同一模型写 Elasticsearch,实现 5 min 级情绪大盘。
- 多轮意图提取:用 seq2seq 标注每轮意图,再聚合看“咨询→投诉→退货”漏斗,反向优化客服剧本。
- 知识蒸馏:把高 anger_score 且最终解决的会话挑出来,自动生成 FAQ,反哺机器人训练。
如果你已经用类似思路落地,欢迎留言交流踩坑细节;如果正准备动手,希望这篇笔记能让你少走一点弯路。

更多推荐


所有评论(0)