微信扫码
添加专属顾问
我要投稿
深入解析RAG技术在实际应用中的优化策略,提升智能问答系统的准确性和召回率。 核心内容: 1. RAG技术在智能问答中的应用挑战与优化需求 2. RAG关键流程源码解读:知识加工与检索细节 3. 如何提升RAG系统的召回准确率和专业性
快速搭建一个RAG智能问答应用很简单,但是在实际业务场景落地还需要做大量的准备工作。
RAG关键流程源码解读
主要分为知识加工和RAG部分关键流程:
知识加载 -> 知识切片 -> 信息抽取 -> 知识加工(embedding/graph/keywords) -> 知识存储
知识加载
# 知识工厂进行实例化KnowledgeFactory -> create() -> load() -> Document- knowledge- markdown- pdf- docx- txt- html- pptx- url- ...
如何扩展:
class Knowledge(ABC):
def load(self) -> List[Document]:
"""Load knowledge from data loader."""
@classmethod
def document_type(cls) -> Any:
"""Get document type."""
def support_chunk_strategy(cls) -> List[ChunkStrategy]:
"""Return supported chunk strategy."""
return [
ChunkStrategy.CHUNK_BY_SIZE,
ChunkStrategy.CHUNK_BY_PAGE,
ChunkStrategy.CHUNK_BY_PARAGRAPH,
ChunkStrategy.CHUNK_BY_MARKDOWN_HEADER,
ChunkStrategy.CHUNK_BY_SEPARATOR,
]
@classmethod
def default_chunk_strategy(cls) -> ChunkStrategy:
"""Return default chunk strategy.
Returns:
ChunkStrategy: default chunk strategy
"""
return ChunkStrategy.CHUNK_BY_SIZE
class ChunkManager:
"""Manager for chunks."""
def __init__(
self,
knowledge: Knowledge,
chunk_parameter: Optional[ChunkParameters] = None,
extractor: Optional[Extractor] = None,
):
"""Create a new ChunkManager with the given knowledge.
Args:
knowledge: (Knowledge) Knowledge datasource.
chunk_parameter: (Optional[ChunkParameter]) Chunk parameter.
extractor: (Optional[Extractor]) Extractor to use for summarization.
"""
self._knowledge = knowledge
self._extractor = extractor
self._chunk_parameters = chunk_parameter or ChunkParameters()
self._chunk_strategy = (
chunk_parameter.chunk_strategy
if chunk_parameter and chunk_parameter.chunk_strategy
else self._knowledge.default_chunk_strategy().name
)
self._text_splitter = self._chunk_parameters.text_splitter
self._splitter_type = self._chunk_parameters.splitter_type
class ChunkStrategy(Enum):
"""Chunk Strategy Enum."""
CHUNK_BY_SIZE: _STRATEGY_ENUM_TYPE = (
RecursiveCharacterTextSplitter,
[
{
"param_name": "chunk_size",
"param_type": "int",
"default_value": 512,
"description": "The size of the data chunks used in processing.",
},
{
"param_name": "chunk_overlap",
"param_type": "int",
"default_value": 50,
"description": "The amount of overlap between adjacent data chunks.",
},
],
"chunk size",
"split document by chunk size",
)
CHUNK_BY_PAGE: _STRATEGY_ENUM_TYPE = (
PageTextSplitter,
[],
"page",
"split document by page",
)
CHUNK_BY_PARAGRAPH: _STRATEGY_ENUM_TYPE = (
ParagraphTextSplitter,
[
{
"param_name": "separator",
"param_type": "string",
"default_value": "\\n",
"description": "paragraph separator",
}
],
"paragraph",
"split document by paragraph",
)
CHUNK_BY_SEPARATOR: _STRATEGY_ENUM_TYPE = (
SeparatorTextSplitter,
[
{
"param_name": "separator",
"param_type": "string",
"default_value": "\\n",
"description": "chunk separator",
},
{
"param_name": "enable_merge",
"param_type": "boolean",
"default_value": False,
"description": "Whether to merge according to the chunk_size after "
"splitting by the separator.",
},
],
"separator",
"split document by separator",
)
CHUNK_BY_MARKDOWN_HEADER: _STRATEGY_ENUM_TYPE = (
MarkdownHeaderTextSplitter,
[],
"markdown header",
"split document by markdown header",
)
Embeddings
接口
@abstractmethod
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs."""
@abstractmethod
def embed_query(self, text: str) -> List[float]:
"""Embed query text."""
async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
"""Asynchronous Embed search docs."""
return await asyncio.get_running_loop().run_in_executor(
None, self.embed_documents, texts
)
async def aembed_query(self, text: str) -> List[float]:
"""Asynchronous Embed query text."""
return await asyncio.get_running_loop().run_in_executor(
None, self.embed_query, text
)
# EMBEDDING_MODEL=proxy_openai
# proxy_openai_proxy_server_url=https://api.openai.com/v1
# proxy_openai_proxy_api_key={your-openai-sk}
# proxy_openai_proxy_backend=text-embedding-ada-002
## qwen embedding model, See dbgpt/model/parameter.py
# EMBEDDING_MODEL=proxy_tongyi
# proxy_tongyi_proxy_backend=text-embedding-v1
# proxy_tongyi_proxy_api_key={your-api-key}
## qianfan embedding model, See dbgpt/model/parameter.py
#EMBEDDING_MODEL=proxy_qianfan
#proxy_qianfan_proxy_backend=bge-large-zh
#proxy_qianfan_proxy_api_key={your-api-key}
#proxy_qianfan_proxy_api_secret={your-secret-key}
class TripletExtractor(LLMExtractor):
"""TripletExtractor class."""
def __init__(self, llm_client: LLMClient, model_name: str):
"""Initialize the TripletExtractor."""
super().__init__(llm_client, model_name, TRIPLET_EXTRACT_PT)
TRIPLET_EXTRACT_PT = (
"Some text is provided below. Given the text, "
"extract up to knowledge triplets as more as possible "
"in the form of (subject, predicate, object).\n"
"Avoid stopwords. The subject, predicate, object can not be none.\n"
"---------------------\n"
"Example:\n"
"Text: Alice is Bob's mother.\n"
"Triplets:\n(Alice, is mother of, Bob)\n"
"Text: Alice has 2 apples.\n"
"Triplets:\n(Alice, has 2, apple)\n"
"Text: Alice was given 1 apple by Bob.\n"
"Triplets:(Bob, gives 1 apple, Bob)\n"
"Text: Alice was pushed by Bob.\n"
"Triplets:(Bob, pushes, Alice)\n"
"Text: Bob's mother Alice has 2 apples.\n"
"Triplets:\n(Alice, is mother of, Bob)\n(Alice, has 2, apple)\n"
"Text: A Big monkey climbed up the tall fruit tree and picked 3 peaches.\n"
"Triplets:\n(monkey, climbed up, fruit tree)\n(monkey, picked 3, peach)\n"
"Text: Alice has 2 apples, she gives 1 to Bob.\n"
"Triplets:\n"
"(Alice, has 2, apple)\n(Alice, gives 1 apple, Bob)\n"
"Text: Philz is a coffee shop founded in Berkeley in 1982.\n"
"Triplets:\n"
"(Philz, is, coffee shop)\n(Philz, founded in, Berkeley)\n"
"(Philz, founded in, 1982)\n"
"---------------------\n"
"Text: {text}\n"
"Triplets:\n"
)
IndexStoreBase
接口,目前提供了向量数据库、图数据库、全文索引三类实现- VectorStoreBase
- ChromaStore
- MilvusStore
- OceanbaseStore
- ElasticsearchStore
- PGVectorStore
class VectorStoreBase(IndexStoreBase, ABC):
"""Vector store base class."""
@abstractmethod
def load_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database."""
@abstractmethod
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database."""
@abstractmethod
def similar_search_with_scores(
self,
text,
topk,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Similar search with scores in index database."""
def similar_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
return self.similar_search_with_scores(text, topk, 1.0, filters)
- GraphStoreBase
- TuGraphStore
- Neo4jStore
def insert_triplet(self, subj: str, rel: str, obj: str) -> None:
"""Add triplet."""
...TL;DR...
subj_query = f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
obj_query = f"MERGE (n1:{self._node_label} {{id:'{obj}'}})"
rel_query = (
f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
f"-[r:{self._edge_label} {{id:'{rel}'}}]->"
f"(n2:{self._node_label} {{id:'{obj}'}})"
)
self.conn.run(query=subj_query)
self.conn.run(query=obj_query)
self.conn.run(query=rel_query)
{ "analysis": {"analyzer": {"default": {"type": "standard"}}}, "similarity": { "custom_bm25": { "type": "BM25", "k1": self._k1, "b": self._b, } }, } self._es_mappings = { "properties": { "content": { "type": "text", "similarity": "custom_bm25", }, "metadata": { "type": "keyword", }, } } - FullTextStoreBase - ElasticDocumentStore - OpenSearchStore
question -> rewrite -> similarity_search -> rerank -> context_candidates
接下来是知识检索,目前社区的检索逻辑主要分为这几步,如果你设置了查询改写参数,目前会通过大模型给你进行一轮问题改写,然后会根据你的知识加工方式路由到对应的检索器,如果你是通过向量进行加工的,那就会通过EmbeddingRetriever进行检索,如果你构建方式是通过知识图谱构建的,就会按照知识图谱方式进行检索,如果你设置了rerank模型,会给粗筛后的候选值进行精筛,让候选值和用户问题更有关联。
EmbeddingRetriever
class EmbeddingRetriever(BaseRetriever):
"""Embedding retriever."""
def __init__(
self,
index_store: IndexStoreBase,
top_k: int = 4,
query_rewrite: Optional[QueryRewrite] = None,
rerank: Optional[Ranker] = None,
retrieve_strategy: Optional[RetrieverStrategy] = RetrieverStrategy.EMBEDDING,
):
async def _aretrieve_with_score(
self,
query: str,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Retrieve knowledge chunks with score.
Args:
query (str): query text
score_threshold (float): score threshold
filters: metadata filters.
Return:
List[Chunk]: list of chunks with score
"""
queries = [query]
new_queries = await self._query_rewrite.rewrite(
origin_query=query, context=context, nums=1
)
queries.extend(new_queries)
candidates_with_score = [
self._similarity_search_with_score(
query, score_threshold, filters, root_tracer.get_current_span_id()
)
for query in queries
]
...
new_candidates_with_score = await self._rerank.arank(
new_candidates_with_score, query
)
return new_candidates_with_score
index_store: 具体的向量数据库
top_k: 返回的具体候选chunk个数
query_rewrite:查询改写函数
rerank:重排序函数
query:原始查询
score_threshold:得分,我们默认会把相似度得分小于阈值的上下文信息给过滤掉
filters:Optional[MetadataFilters]
, 元数据信息过滤器,主要是可以用来前置通过属性信息筛掉一些不匹配的候选信息。
class FilterCondition(str, Enum): """Vector Store Meta data filter conditions.""" AND ="and" OR ="or"class MetadataFilter(BaseModel): """Meta data filter.""" key: str = Field( ..., description="The key of metadata to filter.", ) operator: FilterOperator = Field( default=FilterOperator.EQ, description="The operator of metadata filter.", ) value: Union[str, int,float, List[str], List[int], List[float]] = Field( ..., description="The value of metadata to filter.", )
Graph RAG
首先通过模型进行关键词抽取,这里可以通过传统的nlp技术进行分词,也可以通过大模型进行分词,然后进行关键词按照同义词做扩充,找到关键词的候选列表,最好根据关键词候选列表调用explore方法召回局部子图。
KEYWORD_EXTRACT_PT = (
"A question is provided below. Given the question, extract up to "
"keywords from the text. Focus on extracting the keywords that we can use "
"to best lookup answers to the question.\n"
"Generate as more as possible synonyms or alias of the keywords "
"considering possible cases of capitalization, pluralization, "
"common expressions, etc.\n"
"Avoid stopwords.\n"
"Provide the keywords and synonyms in comma-separated format."
"Formatted keywords and synonyms text should be separated by a semicolon.\n"
"---------------------\n"
"Example:\n"
"Text: Alice is Bob's mother.\n"
"Keywords:\nAlice,mother,Bob;mummy\n"
"Text: Philz is a coffee shop founded in Berkeley in 1982.\n"
"Keywords:\nPhilz,coffee shop,Berkeley,1982;coffee bar,coffee house\n"
"---------------------\n"
"Text: {text}\n"
"Keywords:\n"
)
def explore(
self,
subs: List[str],
direct: Direction = Direction.BOTH,
depth: Optional[int] = None,
fan: Optional[int] = None,
limit: Optional[int] = None,
) -> Graph:
"""Explore on graph."""
DBSchemaRetriever
这部分是ChatData场景的schema-linking检索
主要是通过schema-linking方式通过二阶段相似度检索,首先先找到最相关的表,然后再最相关的字段信息。
优点:这种二阶段检索也是为了解决社区反馈的大宽表体验的问题。
def _similarity_search(
self, query, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
"""Similar search."""
table_chunks = self._table_vector_store_connector.similar_search_with_scores(
query, self._top_k, 0, filters
)
not_sep_chunks = [
chunk for chunk in table_chunks if not chunk.metadata.get("separated")
]
separated_chunks = [
chunk for chunk in table_chunks if chunk.metadata.get("separated")
]
if not separated_chunks:
return not_sep_chunks
# Create tasks list
tasks = [
lambda c=chunk: self._retrieve_field(c, query) for chunk in separated_chunks
]
# Run tasks concurrently
separated_result = run_tasks(tasks, concurrency_limit=3)
# Combine and return results
return not_sep_chunks + separated_result
table_vector_store_connector: 负责检索最相关的表。
field_vector_store_connector: 负责检索最相关的字段。
知识加工,知识检索优化思路
目前RAG智能问答应用几个痛点:
知识库文档越来越多以后,检索噪音大,召回准确率不高
召回不全,完整性不够
召回和用户问题意图相关性不大
只能回答静态数据,无法动态获取知识,导致答疑应用比较呆,比较笨。
非结构化/半结构化/结构化数据的处理,准备决定着RAG应用的上限,因此首先需要在知识处理,索引阶段做大量的细粒度的ETL工作,主要优化的思路方向:
非结构化 -> 结构化:有条理地组织知识信息。
提取更加丰富的, 多元化的语义信息。
目的:需要对文档进行精确的解析,更多元化的识别到不同类型的数据。
优化建议:
建议将docx、txt或者其他文本事先处理为pdf或者markdown格式,这样可以利用一些识别工具更好地提取文本中的各项内容。
提取文本中的表格信息。
保留markdown和pdf的标题层级信息,为接下来的层级关系树等索引方式准备。
保留图片链接,公式等信息,也统一处理成markdown的格式。
目的:保存上下文完整性和相关性,这直接关乎回复准确率。
保持在大模型的上下文限制内,分块保证输入到LLMs的文本不会超过其token限制。
优化建议:
图片 + 表格 单独抽取成Chunk,将表格和图片标题保留到metadata元数据里
文档内容尽量按照标题层级或者Markdown Header进行拆分,尽可能保留chunk的完整性。
如果有自定义分隔符可以按照自定义分割符切分
除了对文档进行Embedding向量抽取外,其他多元化的信息抽取能够对文档进行数据增强,显著提升RAG召回效果。
知识图谱
优点:1. 解决NativeRAG的完整性缺失,依然存在幻觉问题,知识的准确性,包括知识边界的完整性、知识结构和语义的清晰性,是对相似度检索的能力的一种语义补充。
适用场景:适用于严谨的专业领域(医疗,运维等),知识的准备需要受到约束的并且知识之间能够明显建立层级关系的。
如何实现:
1.依赖大模型提取(实体,关系,实体)三元组关系。
2. 依赖前期高质量,结构化的知识准备,清洗,抽取,通过业务规则通过手动或者自定义SOP流程构建知识图谱。
Doc Tree
适用场景:解决了上下文完整性不足的问题,也能匹配时完全依据语义和关键词,能够减少噪音
如何实现:以标题层级构建chunk的树形节点,形成一个多叉树结构,每一层级节点只需要存储文档标题,叶子节点存储具体的文本内容。这样利用树的遍历算法,如果用户问题命中相关非叶子标题节点,就可以将相关的子节点数据进行召回。这样就不会存在chunk完整性缺失的问题。
这部分的Feature我们也会在明年年初放到社区里面。
这篇文章讲了个啥
,总结一下
等全局问题场景。目的:澄清用户语义,将用户的原始问题从模糊的,意图不清晰的查询优化为含义更丰富的一个可检索的Query
LLMExtractor
)LLMExtractor
)async def aretrieve(
self, query: str, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
"""Retrieve knowledge chunks.
Args:
query (str): async query text.
filters: (Optional[MetadataFilters]) metadata filters.
Returns:
List[Chunk]: list of chunks
"""
return await self._aretrieve(query, filters)
按照优先级召回,分别为不同的检索器定义优先级,检索到内容后立即返回
定义不同检索,比如qa_retriever, doc_tree_retriever写入到队列里面, 通过队列的先进先出的特性实现优先级召回。
class RetrieverChain(BaseRetriever):
"""Retriever chain class."""
def __init__(
self,
retrievers: Optional[List[BaseRetriever]] = None,
executor: Optional[Executor] = None,
):
"""Create retriever chain instance."""
self._retrievers = retrievers or []
self._executor = executor or ThreadPoolExecutor()
for retriever in self._retrievers:
candidates_with_scores = await retriever.aretrieve_with_scores(
query=query, score_threshold=score_threshold, filters=filters
)
if candidates_with_scores:
return candidates_with_scores
多知识索引/空间并行召回
通过知识的不同索引形式,通过并行召回方式获取候选列表,保证召回完整性
经过粗筛候选列表后,怎么通过精筛过滤噪音呢
无关的候选分片剔除
时效性剔除
业务属性不满足剔除
topk去重
重排序 仅仅靠粗筛的召回还不够,这时候我们需要有一些策略来对检索的结果做重排序,比如把组合相关度、匹配度等因素做一些重新调整,得到更符合我们业务场景的排序。因为在这一步之后,我们就会把结果送给LLM进行最终处理了,所以这一部分的结果很重要。
## Rerank model#RERANK_MODEL=bce-reranker-base#### If you not set RERANK_MODEL_PATH, DB-GPT will read the model path from EMBEDDING_MODEL_CONFIG based on the RERANK_MODEL.#RERANK_MODEL_PATH=/Users/chenketing/Desktop/project/DB-GPT-NEW/DB-GPT/models/bce-reranker-base_v1#### The number of rerank results to return#RERANK_TOP_K=5
score = 0.0
for q in queries:
if d in result(q):
score += 1.0 / ( k + rank( result(q), d ) )
return score
# where
# k is a ranking constant
# q is a query in the set of queries
# d is a document in the result set of q
# result(q) is the result set of q
# rank( result(q), d ) is d's rank within the result(q) starting from 1
让模型使用markdown的格式进行输出
基于以下给出的已知信息, 准守规范约束,专业、简要回答用户的问题.规范约束: 1.如果已知信息包含的图片、链接、表格、代码块等特殊markdown标签格式的信息,确保在答案中包含原文这些图片、链接、表格和代码标签,不要丢弃不要修改,如:图片格式:, 链接格式:[xxx](xxx), 表格格式:|xxx|xxx|xxx|, 代码格式:```xxx```. 2.如果无法从提供的内容中获取答案, 请说: "知识库中提供的内容不足以回答此问题" 禁止胡乱编造. 3.回答的时候最好按照1.2.3.点进行总结, 并以markdwon格式显示.
构建企业领域工具资产库,将散落到各个平台的工具API,工具脚本进行整合,进而提供智能体端到端的使用能力。比如,除了静态知识库以外,我们可以通过导入工具库的方式进行工具的处理。
工具召回沿用静态知识的RAG召回的思路,再通过完整的工具执行生命周期来获取工具执行结果。
槽位提取:通过传统nlp获取LLM将用户问题进行解析,包括常用的业务类型,环境标,领域模型参数等等
工具选择:沿用静态RAG的思路召回,主要有两层,工具名召回和工具参数召回。
工具参数召回,和TableRAG思路类似,先召回表名,再召回字段名。
参数填充:需要根据召回的工具参数定义,和槽位提取出来的参数进行match
可以代码进行填充,也可以让模型进行填充。
优化思路:由于各个平台工具的同样的参数的参数名没有统一,也不方便去治理,建议可以先进行一轮领域模型数据扩充,拿到整个领域模型后,需要的参数都会存在。
参数校验
完整性校验:进行参数个数完整性校验
参数规则校验:进行参数名类型,参数值,枚举等等规则校验。
参数纠正/对齐,这部分主要是为了减少和用户的交互次数,自动化完成用户参数错误纠正,包括大小写规则,枚举规则等等。eg:
评价指标:
EvaluationMetric├── LLMEvaluationMetric│ ├── AnswerRelevancyMetric├── RetrieverEvaluationMetric│ ├── RetrieverSimilarityMetric│ ├── RetrieverMRRMetric│ └── RetrieverHitRateMetric
RAG
召回指标(RetrieverEvaluationMetric):
RetrieverHitRateMetric
:命中率衡量的是RAGretriever
召回出现在检索结果前top-k个文档中的比例。
RetrieverMRRMetric
:Mean Reciprocal Rank
通过分析最相关文档在检索结果里的排名来计算每个查询的准确性。更具体地说,它是所有查询的相关文档排名倒数的平均值。例如,若最相关的文档排在第一位,其倒数排名为 1;排在第二位时,为 1/2;以此类推。
RetrieverSimilarityMetric
: 相似度指标计算,计算召回内容与预测内容的相似度。
模型生成
答案指标:
AnswerRelevancyMetric
:智能体答案相关性指标,通过智能体答案与用户提问的匹配程度。高相关性的答案不仅要求模型能够理解用户的问题,还要求其能够生成与问题密切相关的答案。这直接影响到用户的满意度和模型的实用性。
通用的智能体:传统的RAG对知识的严谨和专业性要求没那么高,适用于客服,旅游,平台答疑机器人这样的一些业务场景。
数据基础设施智能体:RAG流程是严谨和专业的,需要专属的RAG工作流程,上下文包括(告警->定位->止血->恢复),并且需要对专家沉淀的问答和应急经验,进行结构化的抽取,建立层次关系。因此我们选择知识图谱来作为数据承载。
最后通过我们通过规范化应急事件规则,一步一步地建立了多源的知识 -> 知识结构化抽取 ->应急关系抽取 -> 专家审核 -> 知识存储的一套标准化的知识加工体系。
最后通过社区AWEL+AGENT技术,通过AGENT编排的范式,打造了从意图专家-> 应急诊断专家 -> 诊断根因分析专家。
每个Agent的职能都是不一样的,意图专家负责识别解析用户的意图和识别告警信息诊断专家需要通过GraphRAG 定位到需要分析的根因节点,以及获取具体的根因信息。分析专家需要结合各个根因节点的数据 + 历史分析复盘报告生成诊断分析报告
最新实践!如何基于 DB-GPT 搭建财报分析助手?
可以围绕各自领域构建属于自己的领域资产库包括,知识资产,工具资产以及知识图谱资产
领域资产:领域资产包括了知识库,API,工具脚本。
资产处理,整个资产数据链路涉及了领域资产加工,领域资产检索和领域资产评估。
非结构化 -> 结构化:有条理地归类,正确地组织知识信息。
提取更加丰富的语义信息。
资产检索:
希望是有层级,优先级的检索而并非单一的检索
后置过滤很重要,最好能通过业务语义一些规则进行过滤。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-10-27
2024-09-04
2024-07-18
2024-05-05
2024-06-20
2024-06-13
2024-07-09
2024-07-09
2024-05-19
2024-07-07
2025-04-04
2025-04-03
2025-04-02
2025-04-01
2025-04-01
2025-03-30
2025-03-28
2025-03-27