限时福利领取


背景痛点:JIMI 数据“看起来香,啃起来硬”

京东把 JIMI 智能客服的公开对话数据放出来后,很多团队第一时间下载,结果普遍卡在三个地方:

  1. 体量惊人:压缩包 8 GB,解压后 50 GB+ 的 JSON,单文件直接吃光 32 GB 内存。
  2. 结构松散:每行一条对话,字段层级深(session.messages[].content 嵌套数组),MySQL 需要拆成多表才能查。
  3. 分析无从下手:关键词搜“退货”能扫出 10 W 条,但找不到“情绪激烈且退货”的会话,传统 LIKE 性能瞬间爆炸。

一句话:数据金矿摆在面前,却没有趁手的铲子。

技术选型:为什么把 MySQL 换成 Elasticsearch

维度 MySQL 8.0 Elasticsearch 8.x
全文检索 需外挂 Sphinx,延迟 200 ms+ 内置倒排,10 ms 内
嵌套 JSON 拆表或 JSON 字段,查询复杂 原生 nested,一条 DSL 搞定
水平扩展 主从只读,分库分表改业务 加节点自动重平衡
聚合 group by 容易内存临时表 分布式 agg,毫秒级

结论:JIMI 数据是非结构化文本 + 深度嵌套,分析场景以“搜→聚→看”为主,Elasticsearch 在开发效率与查询性能上都更省心。

核心实现:从脏数据到语义洞察

1. 数据清洗流程(Python 3.10)

目标:把原始 .jsonl 转成“干净、扁平、带情绪标签”的文档。

# clean_jimi.py
import json, re, emoji, tqdm
from pathlib import Path

PUNCT = re.compile(r"[~!@#$%^&*()_+`\-={}|\[\]:\";'<>?,./]")

def clean_text(text: str) -> str:
    """去表情、去网址、去标点"""
    text = emoji.replace_emoji(text, replace='')
    text = re.sub(r'http\S+', '', text)
    text = PUNCT.sub(' ', text)
    return ' '.join(text.split())   # 合并多余空格

def iter_session(file_path):
    """原始文件一行一个 session"""
    with open(file_path, encoding='utf-8') as f:
        for line in f:
            yield json.loads(line)

def flatten(session):
    """把嵌套对话拍平,保留上下文"""
    sid = session['session_id']
    msgs = []
    for turn in session['messages']:
        role = turn['from']  # user / bot
        msgs.append({
            'role': role,
            'content': clean_text(turn['content']),
            'ts': turn['timestamp']
        })
    return {
        'session_id': sid,
        'msgs': msgs,
        'msg_cnt': len(msgs),
        'has_order': bool(session.get('order_id')),
        'create_time': session['create_time'][:10]  # 只取日期
    }

if __name__ == '__main__':
    src = Path('jimi_raw.jsonl')
    dst = Path('jimi_clean.jsonl')
    with dst.open('w', encoding='utf-8') as f_out:
        for ses in tqdm.tqdm(iter_session(src), total=6_800_000):
            f_out.write(json.dumps(flatten(ses), ensure_ascii=False) + '\n')

跑完得到 6 800 000 条扁平文档,体积从 50 GB 降到 21 GB,磁盘节省 58 %。

2. Elasticsearch 索引设计与优化

2.1 映射模板
PUT /jimi
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 1,
    "refresh_interval": "30s",
    "analysis": {
      "analyzer": {
        "cjk_smart": {
          "tokenizer": "jieba_index",
          "filter": ["lowercase", "stop"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "session_id": {"type": "keyword"},
      "create_time": {"type": "date", "format": "yyyy-MM-dd"},
      "has_order": {"type": "boolean"},
      "msg_cnt": {"type": "short"},
      "msgs": {
        "type": "nested",
        "properties": {
          "role": {"type": "keyword"},
          "content": {
            "type": "text",
            "analyzer": "cjk_smart",
            "fields": {"raw": {"type": "keyword"}}
          },
          "ts": {"type": "date", "format": "epoch_second"}
        }
      }
    }
  }
}

说明:

  • 6 分片对应 3 节点集群,每节点 2 片,保证 CPU 用满。
  • nested 结构让“用户→客服→用户”时序不丢失,也能独立查询任一角色。
  • refresh 30 s 兼顾写入吞吐与实时可见性。
2.2 批量灌库(Python)
from elasticsearch import Elasticsearch, helpers

es = Elasticsearch(['http://es1:9200'], request_timeout=120)

def gendocs():
    with open('jimi_clean.jsonl', encoding='utf-8') as f:
        for line in f:
            doc = json.loads(line)
            doc['_index'] = 'jimi'
            yield doc

helpers.bulk(es, gendocs(), chunk_size=3000, max_retries=3)

单节点 2 万 doc/s,三节点合计 6 万 doc/s,20 min 完成全量。

3. 基于 NLP 的语义分析实现

需求:找出“情绪激烈且要求退货”的会话,传统关键词召回率 62 %,需要语义升级。

步骤:

  1. 用开源 RoBERTa-wwm-ext 训练二分类模型(激动 / 平和),标注 1 万条,F1=0.89。
  2. 离线跑完 680 万条,把预测结果写回新字段 msgs.anger_score
  3. 查询时加一条 nested filter,秒级返回。

DSL 示例:

GET /jimi/_search
{
  "query": {
    "bool": {
      "must": [
        {"nested": {
          "path": "msgs",
          "query": {"match": { "msgs.content": "退货" }}
        }},
        {"nested": {
          "path": "msgs",
          "query": {"range": {"msgs.anger_score": {"gte": 0.8}}}
        }}
      ]
    }
  },
  "aggs": {
    "daily": {
      "date_histogram": {"field": "create_time", "calendar_interval": "day"}
    }
  }
}

返回 1 200 条会话,比纯关键词少 85 % 噪声,运营同学直接可用。

性能考量:别让查询拖垮集群

测试环境:3 节点 16 核 64 GB SSD,JVM 31 GB。

场景 结果集 耗时 优化前
关键词“退货” 10 W 120 ms 800 ms
关键词+anger_score≥0.8 1.2 K 45 ms
聚合近 30 天情绪趋势 30 桶 350 ms 2 100 ms

优化手段:

  1. anger_score 从 float 降到 half_float,节省 50 % 磁盘。
  2. 聚合查询加 "execution_hint": "map",对时间直方图效果显著。
  3. 热温分层:近 7 天热数据 SSD,>7 天自动迁到机械盘,查询慢 15 %,成本降 60 %。

避坑指南:生产踩过的 4 个坑

  1. 分片数随意定 → 后期扩容痛苦
    建议:数据量 <100 GB 用 3~6 片,>500 GB 直接 12 片,避免 split 操作。

  2. nested 嵌套层数过深 → 内存爆炸
    真实案例:把“词级别”再嵌一层,聚合直接 OOM。保持 1 层 nested 足够。

  3. 用默认 dynamic mapping,字符串被猜成 text+keyword 双字段 → 索引膨胀 1.8 倍
    提前手动建 mapping,关闭 "dynamic": "strict",字段缺失就报错,早发现早修正。

  4. 中文分词用 standard 分词 → 查“客服”把“客”和“服”拆开,召回离谱
    一定装 jieba 或 ik,再自定义业务词库(“京东自营”“价保”等),准确率提升 30 %。

总结与展望:把对话数据玩出更多花样

整套流程跑下来,我们把 50 GB 原始日志变成可秒搜、可语义过滤、可聚合的业务资产,核心经验只有两句话:

  • 存储选对,后面少踩 80 % 的坑;
  • 清洗+语义一步到位,别让“垃圾进,垃圾出”。

下一步可继续扩展:

  • 实时情绪预警:把 Flink 消费在线对话,调用同一模型写 Elasticsearch,实现 5 min 级情绪大盘。
  • 多轮意图提取:用 seq2seq 标注每轮意图,再聚合看“咨询→投诉→退货”漏斗,反向优化客服剧本。
  • 知识蒸馏:把高 anger_score 且最终解决的会话挑出来,自动生成 FAQ,反哺机器人训练。

如果你已经用类似思路落地,欢迎留言交流踩坑细节;如果正准备动手,希望这篇笔记能让你少走一点弯路。

限时福利领取


Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐