前言

本章内容,我们将在已经构建的agent框架基础上,优化检索器,为检索器搭建ElasticSearch服务,实现问答系统的检索增强。

检索问题

通过测试天池大赛数据集的前100个问题,我们发现有很多问题RAG检索不到,例如:

  • {“id”: 34, “question”: “根据武汉兴图新科电子股份有限公司招股意向书,电子信息行业的上游涉及哪些企业?”}

通过查看日志,检索器没有检索到相关信息:

优化方案

分析上述case原因,检索器太过简单所致

  
class SimpleRetrieverWrapper():  
"""自定义检索器实现"""  
  
def__init__(self, store, llm, **kwargs):  
        self.store = store  
        self.llm = llm  
        logger.info(f'检索器所使用的Chat模型:{self.llm}')  
  
defcreate_retriever(self):  
        logger.info(f'初始化自定义的Retriever')  
  
        chromadb_retriever = self.store.as_retriever()  
return chromadb_retriever

基于以上问题,我们计划使用集成检索器,方案如下:

说明:

  • 将检索器改为使用 EnsembleRetriever

  • 集成检索器其中之一使用 ElasticSearch 检索器,这个检索器通过连接 ElasticSearch 服务,通过关键字查询相关信息。

  • 集成检索器另外一个使用 MultiQueryRetriever 检索器,这个检索器通过连接Chroma向量库查询信息。

关于MultiQueryRetriever和ElasticSearch,之前有文章做过基本内容的总结,详情请查看课程总结】day29:大模型之深入了解Retrievers解析器。

优化步骤

1、搭建ES服务

第一步:安装Docker,该内容不再赘述,具体请见10分钟学会Docker的安装和使用

第二步:创建网络

docker network create es-net

第三步:拉取镜像

docker pull elasticsearch:8.6.0

第四步:创建挂载点目录

smart-finance-bot \  
    |- app \                  
    |- docker \  
        |- elasticsearch \       # 创建elasticsearch挂载目录  
            |- data \            # 创建数据目录  
            |- config \          # 创建配置目录  
            |- plugins  \        # 创建插件目录

第五步:命令行中输入命令启动Docker容器

docker run -d \  
--restart=always \  
--name es \  
--network es-net \  
-p 9200:9200 \  
-p 9300:9300 \  
--privileged \  
-v /Users/deadwalk/Code/smart-finance-bot/docker/elasticsearch/data:/usr/share/elasticsearch/data \  
-v /Users/deadwalk/Code/smart-finance-bot/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \  
-e "discovery.type=single-node" \  
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \  
elasticsearch:8.6.0

注意:

  • 上述的/Users/deadwalk/Code/smart-finance-bot 请根据本地路径修改;

  • 运行完毕后请使用docker ps确认容器已经启动。

第六步:进入es容器

docker exec -it es /bin/bash

第七步:命令行输入重置密码命令(此处我们重置密码为123abc)

bin/elasticsearch-reset-password -i -u elastic

第八步:使用浏览器访问http://localhost:9200/,验证服务可以使用

2、添加数据到ES服务

2.1、测试ES的连接

编写ES连接测试代码,验证ES服务连接。

def test_es_connect():  
from elasticsearch importElasticsearch  
  
    ELASTIC_PASSWORD ="123abc"  
    host ="localhost"  
    port =9200  
    schema ="https"  
    url =f"{schema}://elastic:{ELASTIC_PASSWORD}@{host}:{port}"  
  
    client =Elasticsearch(  
        url,  
        verify_certs=False,  
)  
  
print(client.info())  

运行结果:

2.2、实现ElasticSearch连接代码

代码文件:app/rag/elasticsearch_db.py

# 引入  
from langchain_core.retrievers importBaseRetriever  
from langchain_core.documents importDocument  
# ES需要导入的库  
from typing importList  
import re  
import jieba  
import nltk  
from nltk.corpus import stopwords  
import time  
from elasticsearch importElasticsearch  
from elasticsearch.exceptions importConnectionError,AuthenticationException  
from elasticsearch import helpers  
import settings  
from utils.logger_config importLoggerManager  
from utils.util_nltk importUtilNltk  
import os  
import warnings  
  
  
warnings.simplefilter("ignore")# 屏蔽 ES 的一些Warnings  
utilnltk =UtilNltk()  
  
  
  
logger =LoggerManager().logger  
  
  
classTraditionDB:  
defadd_documents(self, docs):  
"""  
        将文档添加到数据库  
        """  
raiseNotImplementedError("Subclasses should implement this method!")  
  
defget_store(self):  
"""  
        获得向量数据库的对象实例  
        """  
raiseNotImplementedError("Subclasses should implement this method!")  
  
  
classElasticsearchDB(TraditionDB):  
def__init__(self,  
                 schema=settings.ELASTIC_SCHEMA,  
                 host=settings.ELASTIC_HOST,  
                 port=settings.ELASTIC_PORT,  
                 index_name=settings.ELASTIC_INDEX_NAME,  
                 k=3  
                 #  docs=docs                 ):  
# 定义索引名称  
        self.index_name = index_name  
        self.k = k  
  
try:  
            url =f"{schema}://elastic:{settings.ELASTIC_PASSWORD}@{host}:{port}"  
            logger.info(f'初始化ES服务连接:{url}')  
  
            self.es =Elasticsearch(  
                url,  
                verify_certs=False,  
# ca_certs="./docker/elasticsearch/certs/ca/ca.crt",  
# basic_auth=("elastic", settings.ELASTIC_PASSWORD)  
)  
  
            response = self.es.info()# 尝试获取信息  
            logger.info(f'ES服务响应: {response}')  
except(ConnectionError,AuthenticationException)as e:  
            logger.error(f'连接 Elasticsearch 失败: {e}')  
raise  
exceptExceptionas e:  
            logger.error(f'发生其他错误: {e}')  
            logger.error(f'异常类型: {type(e).__name__}')# 记录异常类型  
raise  
  
defto_keywords(self, input_string):  
"""将句子转成检索关键词序列"""  
# 按搜索引擎模式分词  
        word_tokens = jieba.cut_for_search(input_string)  
# 加载停用词表  
        stop_words =set(stopwords.words('chinese'))  
# 去除停用词  
        filtered_sentence =[w for w in word_tokens ifnot w in stop_words]  
return' '.join(filtered_sentence)  
  
defsent_tokenize(self, input_string):  
"""按标点断句,没有用到"""  
# 按标点切分  
        sentences = re.split(r'(?<=[。!?;?!])', input_string)  
# 去掉空字符串  
return[sentence for sentence in sentences if sentence.strip()]  
  
defcreate_index(self):  
"""如果索引不存在,则创建索引"""  
ifnot self.es.indices.exists(index=self.index_name):  
# 创建索引  
            self.es.indices.create(index=self.index_name, ignore=400)  
  
defbluk_data(self, paragraphs):  
"""批量进行数据灌库"""  
# 灌库指令  
        actions =[  
{  
"_index": self.index_name,  
"_source":{  
"keywords": self.to_keywords(para.page_content),  
"text": para.page_content  
}  
}  
for para in paragraphs  
]  
# 文本灌库  
        helpers.bulk(self.es, actions)  
# # 灌库是异步的  
# time.sleep(2)  
  
defflush(self):  
# 刷新数据,数据入库完成以后刷新数据  
        self.es.indices.flush()  
  
defsearch(self, query_string):  
"""关键词检索"""  
# ES 的查询语言  
        search_query ={  
"match":{  
"keywords": self.to_keywords(query_string)  
}  
}  
        res = self.es.search(index=self.index_name, query=search_query, size=self.k)  
return[hit["_source"]["text"]for hit in res["hits"]["hits"]]  
  
defdelete(self):  
"""如果索引存在,则删除索引"""  
if self.es.indices.exists(index=self.index_name):  
# 创建索引  
            self.es.indices.delete(index=self.index_name, ignore=400)  
  
defadd_documents(self, docs):  
        self.bluk_data(docs)  
        self.flush()

说明:

  • elasticsearch后续的插入操作中,使用到了nltk分词,其代码已经封装在UtilNltk类中,具体代码请查看Github仓库代码,本文不再赘述。

  • LoggerManager是代码重构时,封装的一个日志管理类,具体代码请查看Github仓库代码,本文不再赘述。

2.3、修改PDF文件导入代码

在settings.py中添加elasticsearch配置信息:

  
"""  
ES数据库相关的配置  
"""  
# ES服务开关:True表示开启ES服务,False表示关闭ES服务  
ELASTIC_ENABLE_ES = True  
ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "123abc")  
ELASTIC_HOST = os.getenv("ELASTIC_HOST", "localhost")  
ELASTIC_PORT = os.getenv("ELASTIC_PORT", 9200)  
ELASTIC_SCHEMA = "https"  
ELASTIC_INDEX_NAME = "smart_test_index"  

确认PDFProcessor.py中已经添加了对于Elasticsearch的插入操作支持,具体代码在【项目实战】基于Agent的金融问答系统:代码重构已做介绍,所以本文不再赘述。

2.4、测试PDF文件导入代码

在test_framework.py中添加如下代码

  
def test_import_elasticsearch():  
# from rag.elasticsearch_db import TraditionDB  
from rag.elasticsearch_db importElasticsearchDB  
from rag.pdf_processor importPDFProcessor  
  
    llm, chat, embed = settings.LLM, settings.CHAT, settings.EMBED  
  
# 导入文件的文件目录  
    directory ="./dataset/pdf"  
  
# 创建 Elasticsearch 数据库实例  
    es_db =ElasticsearchDB()  
  
# 创建 PDFProcessor 实例  
    pdf_processor =PDFProcessor(directory=directory,  
                                 db_type="es",  
                                 es_client=es_db,  
                                 embed=embed)  
  
# 处理 PDF 文件  
    pdf_processor.process_pdfs()

运行结果:

3、修改检索器增加Elasticsearch检索

代码文件:app/rag/retrievers.py

from langchain_core.callbacks importCallbackManagerForRetrieverRun  
from utils.logger_config importLoggerManager  
from langchain_core.retrievers importBaseRetriever  
from langchain_core.documents importDocument  
from langchain.retrievers importEnsembleRetriever  
from langchain.retrievers.multi_query importMultiQueryRetriever  
from rag.elasticsearch_db importElasticsearchDB  
# ES需要导入的库  
from typing importList  
import logging  
import settings  
  
  
logger =LoggerManager().logger  
  
  
classSimpleRetrieverWrapper():  
"""自定义检索器实现"""  
  
def__init__(self, store, llm, **kwargs):  
        self.store = store  
        self.llm = llm  
        logger.info(f'检索器所使用的Chat模型:{self.llm}')  
  
defcreate_retriever(self):  
        logger.info(f'初始化自定义的Retriever')  
  
# 初始化一个空的检索器列表  
        retrievers =[]  
        weights =[]  
  
# Step1:创建一个 多路召回检索器 MultiQueryRetriever  
        chromadb_retriever = self.store.as_retriever()  
        mq_retriever =MultiQueryRetriever.from_llm(retriever=chromadb_retriever, llm=self.llm)  
  
        retrievers.append(mq_retriever)  
        weights.append(0.5)  
        logger.info(f'已启用 MultiQueryRetriever')  
  
# Step2:创建一个 ES 检索器  
if settings.ELASTIC_ENABLE_ES isTrue:  
            es_retriever =ElasticsearchRetriever()  
            retrievers.append(es_retriever)  
            weights.append(0.5)  
            logger.info(f'已启用 ElasticsearchRetriever')  
  
# 使用集成检索器,将所有启用的检索器集合在一起  
        ensemble_retriever =EnsembleRetriever(retrievers=retrievers, weights=weights)  
return ensemble_retriever  
  
classElasticsearchRetriever(BaseRetriever):  
def_get_relevant_documents(self, query: str, )->List[Document]:  
"""Return the first k documents from the list of documents"""  
        es_connector =ElasticsearchDB()  
        query_result = es_connector.search(query)  
  
        logger.info(f"ElasticSearch检索到资料文件个数:{len(query_result)}")  
  
if query_result:  
return[Document(page_content=doc)for doc in query_result]  
return[]  
  
asyncdef_aget_relevant_documents(self, query: str)->List[Document]:  
"""(Optional) async native implementation."""  
        es_connector =ElasticsearchDB()  
        query_result = es_connector.search(query)  
if query_result:  
return[Document(page_content=doc)for doc in query_result]  
return []

4、测试验证

在test_framework.py中运行test_financebot_ex()函数,测试检索功能。

def test_financebot_ex():  
    from finance_bot_ex import FinanceBotEx  
    # 使用Chroma 的向量库  
    financebot = FinanceBotEx()  
  
    example_query = "根据武汉兴图新科电子股份有限公司招股意向书,电子信息行业的上游涉及哪些企业?"  
  
    financebot.handle_query(example_query)

运行结果: 连接ES后检索到3个资料文件

使用多路召回,生成3个检索问题

最终通过集成检索器检索到答案

优化效果

通过对天池大赛前100个问题的对比测试,我们最终得到如下对比验证结果:

如何学习大模型 AI ?

由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。

但是具体到个人,只能说是:

“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。

这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。

我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。

我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。

在这里插入图片描述

第一阶段(10天):初阶应用

该阶段让大家对大模型 AI有一个最前沿的认识,对大模型 AI 的理解超过 95% 的人,可以在相关讨论时发表高级、不跟风、又接地气的见解,别人只会和 AI 聊天,而你能调教 AI,并能用代码将大模型和业务衔接。

  • 大模型 AI 能干什么?
  • 大模型是怎样获得「智能」的?
  • 用好 AI 的核心心法
  • 大模型应用业务架构
  • 大模型应用技术架构
  • 代码示例:向 GPT-3.5 灌入新知识
  • 提示工程的意义和核心思想
  • Prompt 典型构成
  • 指令调优方法论
  • 思维链和思维树
  • Prompt 攻击和防范

第二阶段(30天):高阶应用

该阶段我们正式进入大模型 AI 进阶实战学习,学会构造私有知识库,扩展 AI 的能力。快速开发一个完整的基于 agent 对话机器人。掌握功能最强的大模型开发框架,抓住最新的技术进展,适合 Python 和 JavaScript 程序员。

  • 为什么要做 RAG
  • 搭建一个简单的 ChatPDF
  • 检索的基础概念
  • 什么是向量表示(Embeddings)
  • 向量数据库与向量检索
  • 基于向量检索的 RAG
  • 搭建 RAG 系统的扩展知识
  • 混合检索与 RAG-Fusion 简介
  • 向量模型本地部署

第三阶段(30天):模型训练

恭喜你,如果学到这里,你基本可以找到一份大模型 AI相关的工作,自己也能训练 GPT 了!通过微调,训练自己的垂直大模型,能独立训练开源多模态大模型,掌握更多技术方案。

到此为止,大概2个月的时间。你已经成为了一名“AI小子”。那么你还想往下探索吗?

  • 为什么要做 RAG
  • 什么是模型
  • 什么是模型训练
  • 求解器 & 损失函数简介
  • 小实验2:手写一个简单的神经网络并训练它
  • 什么是训练/预训练/微调/轻量化微调
  • Transformer结构简介
  • 轻量化微调
  • 实验数据集的构建

第四阶段(20天):商业闭环

对全球大模型从性能、吞吐量、成本等方面有一定的认知,可以在云端和本地等多种环境下部署大模型,找到适合自己的项目/创业方向,做一名被 AI 武装的产品经理。

  • 硬件选型
  • 带你了解全球大模型
  • 使用国产大模型服务
  • 搭建 OpenAI 代理
  • 热身:基于阿里云 PAI 部署 Stable Diffusion
  • 在本地计算机运行大模型
  • 大模型的私有化部署
  • 基于 vLLM 部署大模型
  • 案例:如何优雅地在阿里云私有部署开源大模型
  • 部署一套开源 LLM 项目
  • 内容安全
  • 互联网信息服务算法备案

学习是一个过程,只要学习就会有挑战。天道酬勤,你越努力,就会成为越优秀的自己。

如果你能在15天内完成所有的任务,那你堪称天才。然而,如果你能完成 60-70% 的内容,你就已经开始具备成为一名大模型 AI 的正确特征了。

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

Logo

Agent 垂直技术社区,欢迎活跃、内容共建,欢迎商务合作。wx: diudiu5555

更多推荐