切分策略

  1. 按照固定字符数或 Token 数来切分,但可能会在不适当的位置切断句子。
  2. 递归使用多个分隔符切分,同时尽量保证字符数或 Token数不超出限制。能保证不切断完整的句子。
  3. 语义切分:根据文本的语义内容切分,旨在保持相关信息的集中和完整,适用于需要高度语义保持的场景。但处理速度较慢,且可能出现不同块之间长度极不均衡的情况。具体切分过程为:将相邻的几个句子拼成一个句组。对所有句组进行嵌入,并比较嵌入向量的距离,找到语义变化大的位置,根据阈值确定切分点(比如计算相邻句子嵌入向量的余弦距离,取距离分布的第
    N 百分位值作为阈值,高于此值则切分)。按照切分点切分出若干个语义段,并合并某些长度很短的语义段。

RecursiveCharacterTextSplitter递归字符文本切分器

RecursiveCharacterTextSplitte(递归字符文本切分器)是最常用的切分器,它由一个字符列表作为参数,默认列表为 ["\n\n", "\n", " ", ""],并且会尝试按顺序使用这些字符进行切分,直到块足够小。由此尽可能地将所有段落(然后是句子,最后是词)保持在一起,因为这些段落通常看起来是语义上最相关的文本片段。同时为了保证段之间语义完整,可以设置每个块之间有一部分重叠。
# pip install langchain-text-splitters
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader

# 加载文档
docs = UnstructuredWordDocumentLoader(
    file_path="assets/sample.docx", mode="single"
).load()

# 切分为文本块
chunks = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],  # 分隔符列表
    chunk_size=400,  # 每个块的最大长度
    chunk_overlap=50,  # 每个块重叠的长度
    length_function=len,  # 可选:计算文本长度的函数,默认为字符串长度,可自定义函数来实现按 token 数切分
    add_start_index=True,  # 可选:块的元数据中添加此块起始索引
).split_documents(docs)

print(chunks)

文档嵌入

# pip install sentence-transformers langchain_huggingface
import os
from langchain_huggingface import HuggingFaceEmbeddings

# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
    model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)

# 单文本嵌入
query = "你好,世界"
print(embed_model.embed_query(query))

# 多文本嵌入
docs = ["你好,世界", "你好,世界"]
print(embed_model.embed_documents(docs))

向量数据库Milvus

1.构建Collection
2.定义Collection的schema信息(数据结构,有哪些字段)
3.定义构建索引的逻辑,加速数据查询
4.将通过embedding model获取到的向量存储到collection中
5.在做语义检索时,通过milvus内置的检索算法,可以检索到相关的向量及其他内容

创建Collection(类似mysql中的创建表)

# pip install pymilvus[milvus_lite]
from pprint import pprint
from pymilvus import MilvusClient, DataType

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 创建 schema
def build_schema():
    return (
        MilvusClient.create_schema(
            # 自动分配主键
            auto_id=True,
            # 启用动态字段,未在 Schema 中声明的字段会以键值对的形式存储在这个动态字段
            enable_dynamic_field=True,
        )
        # 添加 id 字段,类型为整数,设置为主键
        .add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        # 添加 vector 字段,类型为浮点数向量,维度为 768
        .add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
        # 添加 text 字段,类型为字符串,最大长度为 1024
        .add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1024)
        # 添加 metadata 字段,类型为 JSON
        .add_field(field_name="metadata", datatype=DataType.JSON)
    )

# 创建 index
def build_index():
    index_params = MilvusClient.prepare_index_params()
    index_params.add_index(
        field_name="vector",  # 建立索引的字段
        index_type="AUTOINDEX",  # 索引类型
        metric_type="L2",  # 向量相似度度量方式
    )
    return index_params

# 创建 collection
if client.has_collection(collection_name="demo_collection"):
    # 删除 collection
    # 在 Milvus 中删除数据后,存储空间不会立即释放。虽然删除数据会将实体标记为 "逻辑删除",但实际空间可能不会立即释放。
    # Milvus 会在后台自动压缩数据。这个过程会将较小的数据段合并为较大的数据段,并删除"逻辑删除"的数据或已超过有效时间的数据。
    # 一个名为 Garbage Collection (GC) 的独立进程会定期删除这些 "已删除 "的数据段,从而释放它们占用的存储空间。
    client.drop_collection(collection_name="demo_collection")
client.create_collection(
    collection_name="demo_collection",  # collection 名称
    schema=build_schema(),  # collection 的 schema
    index_params=build_index(),  # collection 的 index
)

# 查看 collection
print(client.list_collections())

# 查看 collection 描述
pprint(client.describe_collection(collection_name="demo_collection"))

插入实体

import os
from pymilvus import MilvusClient
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 加载文档
docs = UnstructuredWordDocumentLoader(
    file_path="assets/sample.docx",
    mode="single",
).load()

# 文档切分
chunks = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],
    chunk_size=400,
    chunk_overlap=50,
).split_documents(docs)

# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
    model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)

# 计算嵌入向量
embeddings = embed_model.embed_documents([chunk.page_content for chunk in chunks])

# 转换数据格式
data = [
    {
        "vector": embedding,
        "text": chunk.page_content,
        "metadata": chunk.metadata,
    }
    for chunk, embedding in zip(chunks, embeddings)
]

# 插入实体
res = client.insert(collection_name="demo_collection", data=data)
print(res)

查询实体

from pymilvus import MilvusClient

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 通过主键查询实体
res = client.get(
    collection_name="demo_collection",
    ids=[461484610130804912, 461484610130804913],
    output_fields=["text", "metadata"],
)
print(res)

# 通过过滤条件(https://milvus.io/docs/zh/boolean.md)查询实体
res = client.query(
    collection_name="demo_collection",
    filter='metadata["source"] == "assets/sample.docx"',  # 使用 metadata["source"] 进行过滤
    output_fields=["text", "metadata"],
    limit=1,
)
print(res)

删除实体

from pymilvus import MilvusClient

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 通过主键删除实体
res = client.delete(
    collection_name="demo_collection",
    ids=[461484610130804912, 461484610130804913],
)
print(res)

# 通过过滤条件(https://milvus.io/docs/zh/boolean.md)删除实体
res = client.delete(
    collection_name="demo_collection",
    filter='text LIKE "第%"',  # 使用 text 前缀过滤
)
print(res)

检索与生成

检索

import os
from pymilvus import MilvusClient, DataType
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# ========== 构建索引 ==========
# 创建 schema
def build_schema():
    return (
        MilvusClient.create_schema(auto_id=True, enable_dynamic_field=True)
        .add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        .add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
        .add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1024)
        .add_field(field_name="metadata", datatype=DataType.JSON)
    )

# 创建 index
def build_index():
    index_params = MilvusClient.prepare_index_params()
    index_params.add_index(
        field_name="vector",  # 建立索引的字段
        index_type="AUTOINDEX",  # 索引类型
        metric_type="L2",  # 向量相似度度量方式
    )
    return index_params

# 创建 collection
if client.has_collection(collection_name="demo_collection"):
    client.drop_collection(collection_name="demo_collection")
client.create_collection(
    collection_name="demo_collection",  # collection 名称
    schema=build_schema(),  # collection 的 schema
    index_params=build_index(),  # collection 的 index
)

# 加载文档
docs = UnstructuredWordDocumentLoader(
    file_path="assets/sample.docx",
    mode="single",
).load()

# 文档切分
chunks = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],
    chunk_size=400,
    chunk_overlap=50,
).split_documents(docs)

# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
    model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)

# 计算嵌入向量
embeddings = embed_model.embed_documents([chunk.page_content for chunk in chunks])

# 转换数据格式
data = [
    {
        "vector": embedding,
        "text": chunk.page_content,
        "metadata": chunk.metadata,
    }
    for chunk, embedding in zip(chunks, embeddings)
]

# 插入实体
res = client.insert(collection_name="demo_collection", data=data)

# ========== 检索 ==========
def retrieval(query, embed_model, client):
    """检索并返回上下文"""
    query_embedding = embed_model.embed_query(query)  # 查询嵌入
    context = client.search(
        collection_name="demo_collection",  # collection 名称
        data=[query_embedding],  # 搜索的向量
        anns_field="vector",  # 进行向量搜索的字段
        # 度量方式:L2 欧氏距离/IP 内积/COSINE 余弦相似度
        search_params={"metric_type": "L2"},
        output_fields=["text", "metadata"],  # 输出字段
        limit=3,  # 搜索结果数量
    )
    return context

context = retrieval("不动产被占有了怎么办?", embed_model, client)
print(context)

生成

import os
from pymilvus import MilvusClient
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
    model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)

# ========== 检索 ==========
def retrieval(query, embed_model, client):
    """检索并返回上下文"""
    query_embedding = embed_model.embed_query(query)  # 查询嵌入
    context = client.search(
        collection_name="demo_collection",  # collection 名称
        data=[query_embedding],  # 搜索的向量
        anns_field="vector",  # 进行向量搜索的字段
        # 度量方式:L2 欧氏距离/IP 内积/COSINE 余弦相似度
        search_params={"metric_type": "L2"},
        output_fields=["text", "metadata"],  # 输出字段
        limit=3,  # 搜索结果数量
    )
    return context

# ========== 生成 ==========
llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

template = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "# 任务\n\n根据上下文参考,回答用户的问题。\n\n# 上下文参考\n\n{context}",
        ),
        ("human", "{query}"),
    ]
)

rag_chain = (
    {
        "query": RunnablePassthrough(),
        "context": lambda x: retrieval(query=x, embed_model=embed_model, client=client),
    }
    | RunnableLambda(lambda x: print(x) or x)  # 打印中间结果
    | template
    | llm
    | StrOutputParser()
)
res_chunks = rag_chain.stream(input="不动产被占有了怎么办?")
for chunk in res_chunks:
    print(chunk, end="", flush=True)
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐