LangChain+Milvus本地RAG实现:从文档切分到检索问答完整实践
·
目录
切分策略
- 按照固定字符数或 Token 数来切分,但可能会在不适当的位置切断句子。
- 递归使用多个分隔符切分,同时尽量保证字符数或 Token数不超出限制。能保证不切断完整的句子。
- 语义切分:根据文本的语义内容切分,旨在保持相关信息的集中和完整,适用于需要高度语义保持的场景。但处理速度较慢,且可能出现不同块之间长度极不均衡的情况。具体切分过程为:将相邻的几个句子拼成一个句组。对所有句组进行嵌入,并比较嵌入向量的距离,找到语义变化大的位置,根据阈值确定切分点(比如计算相邻句子嵌入向量的余弦距离,取距离分布的第
N 百分位值作为阈值,高于此值则切分)。按照切分点切分出若干个语义段,并合并某些长度很短的语义段。
RecursiveCharacterTextSplitter递归字符文本切分器
RecursiveCharacterTextSplitte(递归字符文本切分器)是最常用的切分器,它由一个字符列表作为参数,默认列表为 ["\n\n", "\n", " ", ""],并且会尝试按顺序使用这些字符进行切分,直到块足够小。由此尽可能地将所有段落(然后是句子,最后是词)保持在一起,因为这些段落通常看起来是语义上最相关的文本片段。同时为了保证段之间语义完整,可以设置每个块之间有一部分重叠。
# pip install langchain-text-splitters
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
# 加载文档
docs = UnstructuredWordDocumentLoader(
file_path="assets/sample.docx", mode="single"
).load()
# 切分为文本块
chunks = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""], # 分隔符列表
chunk_size=400, # 每个块的最大长度
chunk_overlap=50, # 每个块重叠的长度
length_function=len, # 可选:计算文本长度的函数,默认为字符串长度,可自定义函数来实现按 token 数切分
add_start_index=True, # 可选:块的元数据中添加此块起始索引
).split_documents(docs)
print(chunks)
文档嵌入
# pip install sentence-transformers langchain_huggingface
import os
from langchain_huggingface import HuggingFaceEmbeddings
# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)
# 单文本嵌入
query = "你好,世界"
print(embed_model.embed_query(query))
# 多文本嵌入
docs = ["你好,世界", "你好,世界"]
print(embed_model.embed_documents(docs))
向量数据库Milvus
1.构建Collection
2.定义Collection的schema信息(数据结构,有哪些字段)
3.定义构建索引的逻辑,加速数据查询
4.将通过embedding model获取到的向量存储到collection中
5.在做语义检索时,通过milvus内置的检索算法,可以检索到相关的向量及其他内容
创建Collection(类似mysql中的创建表)
# pip install pymilvus[milvus_lite]
from pprint import pprint
from pymilvus import MilvusClient, DataType
# 实例化向量数据库客户端
client = MilvusClient(
uri="./milvus_demo.db", # 数据存储在本地当前目录下
)
# 创建 schema
def build_schema():
return (
MilvusClient.create_schema(
# 自动分配主键
auto_id=True,
# 启用动态字段,未在 Schema 中声明的字段会以键值对的形式存储在这个动态字段
enable_dynamic_field=True,
)
# 添加 id 字段,类型为整数,设置为主键
.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
# 添加 vector 字段,类型为浮点数向量,维度为 768
.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
# 添加 text 字段,类型为字符串,最大长度为 1024
.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1024)
# 添加 metadata 字段,类型为 JSON
.add_field(field_name="metadata", datatype=DataType.JSON)
)
# 创建 index
def build_index():
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector", # 建立索引的字段
index_type="AUTOINDEX", # 索引类型
metric_type="L2", # 向量相似度度量方式
)
return index_params
# 创建 collection
if client.has_collection(collection_name="demo_collection"):
# 删除 collection
# 在 Milvus 中删除数据后,存储空间不会立即释放。虽然删除数据会将实体标记为 "逻辑删除",但实际空间可能不会立即释放。
# Milvus 会在后台自动压缩数据。这个过程会将较小的数据段合并为较大的数据段,并删除"逻辑删除"的数据或已超过有效时间的数据。
# 一个名为 Garbage Collection (GC) 的独立进程会定期删除这些 "已删除 "的数据段,从而释放它们占用的存储空间。
client.drop_collection(collection_name="demo_collection")
client.create_collection(
collection_name="demo_collection", # collection 名称
schema=build_schema(), # collection 的 schema
index_params=build_index(), # collection 的 index
)
# 查看 collection
print(client.list_collections())
# 查看 collection 描述
pprint(client.describe_collection(collection_name="demo_collection"))
插入实体
import os
from pymilvus import MilvusClient
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
# 实例化向量数据库客户端
client = MilvusClient(
uri="./milvus_demo.db", # 数据存储在本地当前目录下
)
# 加载文档
docs = UnstructuredWordDocumentLoader(
file_path="assets/sample.docx",
mode="single",
).load()
# 文档切分
chunks = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],
chunk_size=400,
chunk_overlap=50,
).split_documents(docs)
# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)
# 计算嵌入向量
embeddings = embed_model.embed_documents([chunk.page_content for chunk in chunks])
# 转换数据格式
data = [
{
"vector": embedding,
"text": chunk.page_content,
"metadata": chunk.metadata,
}
for chunk, embedding in zip(chunks, embeddings)
]
# 插入实体
res = client.insert(collection_name="demo_collection", data=data)
print(res)
查询实体
from pymilvus import MilvusClient
# 实例化向量数据库客户端
client = MilvusClient(
uri="./milvus_demo.db", # 数据存储在本地当前目录下
)
# 通过主键查询实体
res = client.get(
collection_name="demo_collection",
ids=[461484610130804912, 461484610130804913],
output_fields=["text", "metadata"],
)
print(res)
# 通过过滤条件(https://milvus.io/docs/zh/boolean.md)查询实体
res = client.query(
collection_name="demo_collection",
filter='metadata["source"] == "assets/sample.docx"', # 使用 metadata["source"] 进行过滤
output_fields=["text", "metadata"],
limit=1,
)
print(res)
删除实体
from pymilvus import MilvusClient
# 实例化向量数据库客户端
client = MilvusClient(
uri="./milvus_demo.db", # 数据存储在本地当前目录下
)
# 通过主键删除实体
res = client.delete(
collection_name="demo_collection",
ids=[461484610130804912, 461484610130804913],
)
print(res)
# 通过过滤条件(https://milvus.io/docs/zh/boolean.md)删除实体
res = client.delete(
collection_name="demo_collection",
filter='text LIKE "第%"', # 使用 text 前缀过滤
)
print(res)
检索与生成
检索
import os
from pymilvus import MilvusClient, DataType
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
# 实例化向量数据库客户端
client = MilvusClient(
uri="./milvus_demo.db", # 数据存储在本地当前目录下
)
# ========== 构建索引 ==========
# 创建 schema
def build_schema():
return (
MilvusClient.create_schema(auto_id=True, enable_dynamic_field=True)
.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1024)
.add_field(field_name="metadata", datatype=DataType.JSON)
)
# 创建 index
def build_index():
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector", # 建立索引的字段
index_type="AUTOINDEX", # 索引类型
metric_type="L2", # 向量相似度度量方式
)
return index_params
# 创建 collection
if client.has_collection(collection_name="demo_collection"):
client.drop_collection(collection_name="demo_collection")
client.create_collection(
collection_name="demo_collection", # collection 名称
schema=build_schema(), # collection 的 schema
index_params=build_index(), # collection 的 index
)
# 加载文档
docs = UnstructuredWordDocumentLoader(
file_path="assets/sample.docx",
mode="single",
).load()
# 文档切分
chunks = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],
chunk_size=400,
chunk_overlap=50,
).split_documents(docs)
# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)
# 计算嵌入向量
embeddings = embed_model.embed_documents([chunk.page_content for chunk in chunks])
# 转换数据格式
data = [
{
"vector": embedding,
"text": chunk.page_content,
"metadata": chunk.metadata,
}
for chunk, embedding in zip(chunks, embeddings)
]
# 插入实体
res = client.insert(collection_name="demo_collection", data=data)
# ========== 检索 ==========
def retrieval(query, embed_model, client):
"""检索并返回上下文"""
query_embedding = embed_model.embed_query(query) # 查询嵌入
context = client.search(
collection_name="demo_collection", # collection 名称
data=[query_embedding], # 搜索的向量
anns_field="vector", # 进行向量搜索的字段
# 度量方式:L2 欧氏距离/IP 内积/COSINE 余弦相似度
search_params={"metric_type": "L2"},
output_fields=["text", "metadata"], # 输出字段
limit=3, # 搜索结果数量
)
return context
context = retrieval("不动产被占有了怎么办?", embed_model, client)
print(context)
生成
import os
from pymilvus import MilvusClient
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
# 实例化向量数据库客户端
client = MilvusClient(
uri="./milvus_demo.db", # 数据存储在本地当前目录下
)
# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)
# ========== 检索 ==========
def retrieval(query, embed_model, client):
"""检索并返回上下文"""
query_embedding = embed_model.embed_query(query) # 查询嵌入
context = client.search(
collection_name="demo_collection", # collection 名称
data=[query_embedding], # 搜索的向量
anns_field="vector", # 进行向量搜索的字段
# 度量方式:L2 欧氏距离/IP 内积/COSINE 余弦相似度
search_params={"metric_type": "L2"},
output_fields=["text", "metadata"], # 输出字段
limit=3, # 搜索结果数量
)
return context
# ========== 生成 ==========
llm = init_chat_model(
model="openai/gpt-oss-20b:free",
model_provider="openai",
base_url="https://openrouter.ai/api/v1",
api_key=os.getenv("OPENROUTER_API_KEY"),
)
template = ChatPromptTemplate.from_messages(
[
(
"system",
"# 任务\n\n根据上下文参考,回答用户的问题。\n\n# 上下文参考\n\n{context}",
),
("human", "{query}"),
]
)
rag_chain = (
{
"query": RunnablePassthrough(),
"context": lambda x: retrieval(query=x, embed_model=embed_model, client=client),
}
| RunnableLambda(lambda x: print(x) or x) # 打印中间结果
| template
| llm
| StrOutputParser()
)
res_chunks = rag_chain.stream(input="不动产被占有了怎么办?")
for chunk in res_chunks:
print(chunk, end="", flush=True)
更多推荐

所有评论(0)