微信扫码
与创始人交个朋友
我要投稿
我们在上一章《【项目实战】基于Agent的金融问答系统之项目简介》中简单介绍了项目背景以及数据集情况,本章将介绍RAG模块的实现。
参考之前所学内容《大模型之初识RAG》,我们需要实现如下功能:
• 向量库的基础功能
• 连接向量库
• 数据入库
• 文件导入
• PDF文件的读取
• PDF文件的切分
• 调用向量库接口入库
• 文件检索
• 连接向量库
• 检索器检索文件
项目开始之后,我们如果能够抑制住直接撸代码的冲动,改为提前做好规划,这会是一个好的习惯。 为此,我提前做了如下规划:
• 代码使用Git进行管理,这样后期多人协作时方便代码更新、代码Merge、冲突解决等。
• 由于国内访问Github优势会ban,所以我们将代码仓库放在Gitee上。
• 为代码仓库起了一个响亮的名称后,仓库地址定为https://gitee.com/deadwalk/smart-finance-bot
考虑这个项目会涉及到前端、后端、数据集、模型等,所以项目目录规划如下:
smart-finance-bot \
|- dataset \ # 该目录用于保存PDF以及SQLite数据
|- doc \ # 该目录用于保存文档类文件,例如:需求文档、说明文档、数据文档
|- app \ # 该目录用于服务端代码
|- agent \ # 该目录用于保存agent相关代码
|- rag \ # 该目录用于保存rag相关代码
|-test \ # 该目录用于保存测试类驱动相关代码
|- conf \ # 该目录用于保存配置文件
|-.qwen # 该文件保存QWen的配置文件(请自行添加对应的API KEY)
|-.ernie # 该文件保存百度千帆的配置文件(请自行添加对应的API KEY)
|- chatweb \ # 该目录用于保存前端页面代码
|- scripts \ # 该目录用于保存脚本文件,例如:启动脚本、导入向量数据库脚本等
|- test_result \ # 该目录用于保存测试结果
|- docker \
|- backend \ # 该目录对应后端python服务的Dockerfile
|- frontend \ # 该目录对应前端python服务的Dockerfile
上述目录中,dataset
是直接使用git的submodul命令,直接将天池大赛提供的数据集引入到本项目中,方便后续使用。
引入方法:
git submodule add https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.git dataset
项目如果能够约束统一的命名规范,这对于后续代码的可读性、可维护性会提供需要便利,在此我沿用了约定俗成的代码命名规范:
• 类名:使用大驼峰命名法,例如:MyClass
• 函数名:使用小驼峰命名法,例如:my_function
• 变量名:使用小驼峰命名法,例如:my_variable
• 文件夹:使用小驼峰命名法。
整体命名时,要尽量见文知意。
代码文件及目录:app/utils/util.py
from dotenv import load_dotenv
import os
# 获取当前文件的目录
current_dir = os.path.dirname(__file__)
# 构建到 conf/.qwen 的相对路径
conf_file_path_qwen = os.path.join(current_dir,'..','conf','.qwen')
# 加载千问环境变量
load_dotenv(dotenv_path=conf_file_path_qwen)
defget_qwen_models():
"""
加载千问系列大模型
"""
# llm 大模型
from langchain_community.llms.tongyi importTongyi
llm =Tongyi(model="qwen-max", temperature=0.1, top_p=0.7, max_tokens=1024)
# chat 大模型
from langchain_community.chat_models importChatTongyi
chat =ChatTongyi(model="qwen-max", temperature=0.01, top_p=0.2, max_tokens=1024)
# embedding 大模型
from langchain_community.embeddings importDashScopeEmbeddings
embed =DashScopeEmbeddings(model="text-embedding-v3")
return llm, chat, embed
在app/conf/.qwen中,添加对应的API KEY,例如:
DASHSCOPE_API_KEY = sk-xxxxxx
向量库文件考虑使用Chroma实现,所以我们先实现一个向量库的类,用于完成基本的向量库连接、数据入库操作。
代码文件及目录:app/rag/chroma_conn.py
import chromadb
from chromadb importSettings
from langchain_chroma importChroma
classChromaDB:
def__init__(self,
chroma_server_type="local", # 服务器类型:http是http方式连接方式,local是本地读取文件方式
host="localhost", port=8000, # 服务器地址,http方式必须指定
persist_path="chroma_db", # 数据库的路径:如果是本地连接,需要指定数据库的路径
collection_name="langchain", # 数据库的集合名称
embed=None # 数据库的向量化函数
):
self.host = host
self.port = port
self.path = persist_path
self.embed = embed
self.store =None
# 如果是http协议方式连接数据库
if chroma_server_type =="http":
client = chromadb.HttpClient(host=host, port=port)
self.store =Chroma(collection_name=collection_name,
embedding_function=embed,
client=client)
if chroma_server_type =="local":
self.store =Chroma(collection_name=collection_name,
embedding_function=embed,
persist_directory=persist_path)
if self.store isNone:
raiseValueError("Chroma store init failed!")
defadd_with_langchain(self, docs):
"""
将文档添加到数据库
"""
self.store.add_documents(documents=docs)
defget_store(self):
"""
获得向量数据库的对象实例
"""
return self.store
在实际项目实践过程中,我们发现导入Chroma数据时使用本地化连接方式更快一些,所以对连接方式做了两个参数的扩展,local 代表本地连接,http 代表远程连接。
本着先跑通流程,再优化交互过程的思路,对于PDF文件入向量库的过程,我们先通过一段脚本实现(暂不做前端UI的交互)。
代码文件及目录:app/rag/pdf_processor.py
import os
import logging
import time
from tqdm import tqdm
from langchain_community.document_loaders importPyMuPDFLoader
from langchain_text_splitters importRecursiveCharacterTextSplitter
from rag.chroma_conn importChromaDB
classPDFProcessor:
def__init__(self,
directory, # PDF文件所在目录
chroma_server_type, # ChromaDB服务器类型
persist_path, # ChromaDB持久化路径
embed):# 向量化函数
self.directory = directory
self.file_group_num =80# 每组处理的文件数
self.batch_num =6# 每次插入的批次数量
self.chunksize =500# 切分文本的大小
self.overlap =100# 切分文本的重叠大小
self.chroma_db =ChromaDB(chroma_server_type=chroma_server_type,
persist_path=persist_path,
embed=embed)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
defload_pdf_files(self):
"""
加载目录下的所有PDF文件
"""
pdf_files =[]
for file in os.listdir(self.directory):
if file.lower().endswith('.pdf'):
pdf_files.append(os.path.join(self.directory, file))
logging.info(f"Found {len(pdf_files)} PDF files.")
return pdf_files
defload_pdf_content(self, pdf_path):
"""
读取PDF文件内容
"""
pdf_loader =PyMuPDFLoader(file_path=pdf_path)
docs = pdf_loader.load()
logging.info(f"Loading content from {pdf_path}.")
return docs
defsplit_text(self, documents):
"""
将文本切分成小段
"""
# 切分文档
text_splitter =RecursiveCharacterTextSplitter(
chunk_size=self.chunksize,
chunk_overlap=self.overlap,
length_function=len,
add_start_index=True,
)
docs = text_splitter.split_documents(documents)
logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")
return docs
definsert_docs_chromadb(self, docs, batch_size=6):
"""
将文档插入到ChromaDB
"""
# 分批入库
logging.info(f"Inserting {len(docs)} documents into ChromaDB.")
# 记录开始时间
start_time = time.time()
total_docs_inserted =0
# 计算总批次
total_batches =(len(docs)+ batch_size -1)// batch_size
with tqdm(total=total_batches, desc="Inserting batches", unit="batch")as pbar:
for i inrange(0,len(docs), batch_size):
# 获取当前批次的样本
batch = docs[i:i + batch_size]
# 将样本入库
self.chroma_db.add_with_langchain(batch)
# self.chroma_db.async_add_with_langchain(batch)
# 更新已插入的文档数量
total_docs_inserted +=len(batch)
# 计算并显示当前的TPM
elapsed_time = time.time()- start_time # 计算已用时间(秒)
if elapsed_time >0:# 防止除以零
tpm =(total_docs_inserted / elapsed_time)*60# 转换为每分钟插入的文档数
pbar.set_postfix({"TPM":f"{tpm:.2f}"})# 更新进度条的后缀信息
# 更新进度条
pbar.update(1)
defprocess_pdfs_group(self, pdf_files_group):
# 读取PDF文件内容
pdf_contents =[]
for pdf_path in pdf_files_group:
# 读取PDF文件内容
documents = self.load_pdf_content(pdf_path)
# 将documents 逐一添加到pdf_contents
pdf_contents.extend(documents)
# 将文本切分成小段
docs = self.split_text(pdf_contents)
# 将文档插入到ChromaDB
self.insert_docs_chromadb(docs, self.batch_num)
defprocess_pdfs(self):
# 获取目录下所有的PDF文件
pdf_files = self.load_pdf_files()
group_num = self.file_group_num
# group_num 个PDF文件为一组,分批处理
for i inrange(0,len(pdf_files), group_num):
pdf_files_group = pdf_files[i:i + group_num]
self.process_pdfs_group(pdf_files_group)
print("PDFs processed successfully!")
因为Python的导入库的原因(一般都是从工作目录查找),所以我们在项目根目录下创建test_framework.py,方便后续统一测试工作。
smart-finance-bot \
|- app \ # 该目录用于服务端代码
|- rag \ # 该目录用于保存rag相关代码
|- pdf_processor.py
|- chroma_conn.py
|- test_framework.py
代码文件: app/test_framework.py
# 测试导入PDF到向量库主流程
deftest_import():
from rag.pdf_processor importPDFProcessor
from utils.util import get_qwen_models
llm , chat , embed = get_qwen_models()
# embed = get_huggingface_embeddings()
directory ="./app/dataset/pdf"
persist_path ="chroma_db"
server_type ="local"
# 创建 PDFProcessor 实例
pdf_processor =PDFProcessor(directory=directory,
chroma_server_type=server_type,
persist_path=persist_path,
embed=embed)
# 处理 PDF 文件
pdf_processor.process_pdfs()
if __name__ =="__main__":
test_import()
1、通过命令行启动ChromaDB服务端:
chroma run --path chroma_db --port 8000 --host localhost
2、运行test_framework.py
运行效果:
备注:一般测试框架会使用Pytest并且编写相应的单元测试函数,本次项目中由于项目较小且函数返回结果不固定,所以就没有写UnitTest。如果想了解Pytest的使用示例,可以参考我的其他代码仓库,例如:UnitTest的使用
代码文件: app/rag/rag.py
import logging
from langchain_core.prompts importChatPromptTemplate
from langchain_core.runnables importRunnablePassthrough
from langchain_core.runnables.base importRunnableLambda
from langchain_core.output_parsers importStrOutputParser
from.chroma_conn importChromaDB
# 配置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
classRagManager:
def__init__(self,
chroma_server_type="http",
host="localhost", port=8000,
persist_path="chroma_db",
llm=None, embed=None):
self.llm = llm
self.embed = embed
chrom_db =ChromaDB(chroma_server_type=chroma_server_type,
host=host, port=port,
persist_path=persist_path,
embed=embed)
self.store = chrom_db.get_store()
defget_chain(self, retriever):
"""获取RAG查询链"""
# RAG系统经典的 Prompt (A 增强的过程)
prompt =ChatPromptTemplate.from_messages([
("human","""You are an assistant for question-answering tasks. Use the following pieces
of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise.
Question: {question}
Context: {context}
Answer:""")
])
# 将 format_docs 方法包装为 Runnable
format_docs_runnable =RunnableLambda(self.format_docs)
# RAG 链
rag_chain =(
{"context": retriever | format_docs_runnable,
"question":RunnablePassthrough()}
| prompt
| self.llm
|StrOutputParser()
)
return rag_chain
defformat_docs(self, docs):
# 返回检索到的资料文件名称
logging.info(f"检索到资料文件个数:{len(docs)}")
retrieved_files ="\n".join([doc.metadata["source"]for doc in docs])
logging.info(f"资料文件分别是:\n{retrieved_files}")
retrieved_content ="\n\n".join(doc.page_content for doc in docs)
logging.info(f"检索到的资料为:\n{retrieved_content}")
return retrieved_content
defget_retriever(self, k=4, mutuality=0.3):
retriever = self.store.as_retriever(search_type="similarity_score_threshold",
search_kwargs={"k": k,"score_threshold": mutuality})
return retriever
defget_result(self, question, k=4, mutuality=0.3):
"""获取RAG查询结果"""
retriever = self.get_retriever(k, mutuality)
rag_chain = self.get_chain(retriever)
return rag_chain.invoke(input=question)
以上是实现了一个使用基本检索器的RAG,其中:
• 代码中通过chroma_conn.py模块连接到ChromaDB数据库,并使用ChromaDB的as_retriever方法创建一个检索器。
在test_framework.py中添加RAG的测试调用函数。
代码文件:app/test_framework.py
# 测试RAG主流程
deftest_rag():
from rag.rag importRagManager
from utils.util import get_qwen_models
llm, chat, embed = get_qwen_models()
rag =RagManager(host="localhost", port=8000, llm=llm, embed=embed)
result = rag.get_result("湖南长远锂科股份有限公司变更设立时作为发起人的法人有哪些?")
print(result)
if __name__ =="__main__":
test_rag()# RAG测试函数
# test_import() # 批量导入PDF测试函数
注释掉批量导入函数,开启test_rag()函数,运行效果:
至此,我们完成了RAG模块的基本功能,它包括PDF文件的批量导入以及检索功能。
• 首先,我们创建了一个ChromaDB的类,封装了基础的Chroma连接、插入、检索。
• 其次,我们实现了PDFProcessor类,该类中会调用ChromaDB类的插入函数,将批量读取的PDF文件进行切分后保存至向量库。
• 然后,我们实现了RagManager类,该类中封装了RAG的检索链,并定义了检索的参数。
• 最后,我们实现了一个测试函数,用于测试RAG的检索功能。
• 除此之外,有两个注意事项:
• 在项目初期,进行合理的项目文件目录规划,可以有效减少项目维护的难度。
• 在项目行进中,通过搭建测试框架,可以方便函数验证以及后续重构的回归测试。
53AI,企业落地应用大模型首选服务商
产品:大模型应用平台+智能体定制开发+落地咨询服务
承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-07-18
2024-05-05
2024-06-20
2024-09-04
2024-05-19
2024-07-09
2024-07-09
2024-07-07
2024-06-13
2024-07-07