微信扫码
添加专属顾问
我要投稿
掌握RAG优化策略,提升智能问答系统性能。 核心内容: 1. RAG技术在实际应用中的挑战与优化方向 2. DB-GPT框架下RAG关键流程源码解读 3. 知识加工与检索细节优化,提升召回准确率
本文主要围绕DB-GPT应用开发框架如何在实际落地场景做RAG优化。
背景
在过去两年中,检索增强生成(RAG,Retrieval-Augmented Generation)技术逐渐成为提升智能体的核心组成部分。通过结合检索与生成的双重能力,RAG能够引入外部知识,从而为大模型在复杂场景中的应用提供更多可能性。但是在实际落地场景中,往往会存在检索准确率低,噪音干扰多,召回完整性,专业性不够,导致LLM幻觉严重的问题。本次分享会聚焦RAG在实际落地场景中的知识加工和检索细节,如何去优化RAG Pineline链路,最终提升召回准确率。
快速搭建一个RAG智能问答应用很简单,但是需要在实际业务场景落地还需要做大量的工作。
本文将主要介绍围绕DB-GPT应用开发框架(https://github.com/eosphoros-ai/DB-GPT),如何在实际落地场景做RAG优化。
一、RAG关键流程源码解读
主要讲述在DB-GPT中,知识加工和RAG流程关键源码实现。
1.1 知识加工
知识加载 -> 知识切片 -> 信息抽取 -> 知识加工(embedding/graph/keywords) -> 知识存储
知识加载:通过知识工厂类将不同格式的非结构化文档进行实例化。
# 知识工厂进行实例化KnowledgeFactory -> create() -> load() -> Document- knowledge - markdown - pdf - docx - txt - html - pptx - url - ...
如何扩展:通过继承Knowledge接口,实现load(),support_chunk_strategy(),default_chunk_strategy()等方法
class Knowledge(ABC):
def load(self) -> List[Document]:
"""Load knowledge from data loader."""
@classmethod
def document_type(cls) -> Any:
"""Get document type."""
def support_chunk_strategy(cls) -> List[ChunkStrategy]:
"""Return supported chunk strategy."""
return [
ChunkStrategy.CHUNK_BY_SIZE,
ChunkStrategy.CHUNK_BY_PAGE,
ChunkStrategy.CHUNK_BY_PARAGRAPH,
ChunkStrategy.CHUNK_BY_MARKDOWN_HEADER,
ChunkStrategy.CHUNK_BY_SEPARATOR,
]
@classmethod
def default_chunk_strategy(cls) -> ChunkStrategy:
"""Return default chunk strategy.
Returns:
ChunkStrategy: default chunk strategy
"""
return ChunkStrategy.CHUNK_BY_SIZE
知识切片
ChunkManager: 通过加载后的知识数据,根据用户指定的分片策略和分片参数路由到对应的分片处理器进行分配。
class ChunkManager:
"""Manager for chunks."""
def __init__(
self,
knowledge: Knowledge,
chunk_parameter: Optional[ChunkParameters] = None,
extractor: Optional[Extractor] = None,
):
"""Create a new ChunkManager with the given knowledge.
Args:
knowledge: (Knowledge) Knowledge datasource.
chunk_parameter: (Optional[ChunkParameter]) Chunk parameter.
extractor: (Optional[Extractor]) Extractor to use for summarization.
"""
self._knowledge = knowledge
self._extractor = extractor
self._chunk_parameters = chunk_parameter or ChunkParameters()
self._chunk_strategy = (
chunk_parameter.chunk_strategy
if chunk_parameter and chunk_parameter.chunk_strategy
else self._knowledge.default_chunk_strategy().name
)
self._text_splitter = self._chunk_parameters.text_splitter
self._splitter_type = self._chunk_parameters.splitter_type
如何扩展:如果你想在界面上自定义一个新的分片策略
新增切片策略ChunkStrategy
新增Splitter实现逻辑
class ChunkStrategy(Enum):
"""Chunk Strategy Enum."""
CHUNK_BY_SIZE: _STRATEGY_ENUM_TYPE = (
RecursiveCharacterTextSplitter,
[
{
"param_name": "chunk_size",
"param_type": "int",
"default_value": 512,
"description": "The size of the data chunks used in processing.",
},
{
"param_name": "chunk_overlap",
"param_type": "int",
"default_value": 50,
"description": "The amount of overlap between adjacent data chunks.",
},
],
"chunk size",
"split document by chunk size",
)
CHUNK_BY_PAGE: _STRATEGY_ENUM_TYPE = (
PageTextSplitter,
[],
"page",
"split document by page",
)
CHUNK_BY_PARAGRAPH: _STRATEGY_ENUM_TYPE = (
ParagraphTextSplitter,
[
{
"param_name": "separator",
"param_type": "string",
"default_value": "\\n",
"description": "paragraph separator",
}
],
"paragraph",
"split document by paragraph",
)
CHUNK_BY_SEPARATOR: _STRATEGY_ENUM_TYPE = (
SeparatorTextSplitter,
[
{
"param_name": "separator",
"param_type": "string",
"default_value": "\\n",
"description": "chunk separator",
},
{
"param_name": "enable_merge",
"param_type": "boolean",
"default_value": False,
"description": "Whether to merge according to the chunk_size after "
"splitting by the separator.",
},
],
"separator",
"split document by separator",
)
CHUNK_BY_MARKDOWN_HEADER: _STRATEGY_ENUM_TYPE = (
MarkdownHeaderTextSplitter,
[],
"markdown header",
"split document by markdown header",
)
知识抽取,目前支持向量抽取,知识图谱抽取,关键词抽取。
向量抽取 -> embedding, 实现Embeddings接口
@abstractmethod
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs."""
@abstractmethod
def embed_query(self, text: str) -> List[float]:
"""Embed query text."""
async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
"""Asynchronous Embed search docs."""
return await asyncio.get_running_loop().run_in_executor(
None, self.embed_documents, texts
)
async def aembed_query(self, text: str) -> List[float]:
"""Asynchronous Embed query text."""
return await asyncio.get_running_loop().run_in_executor(
None, self.embed_query, text
)
# EMBEDDING_MODEL=proxy_openai
# proxy_openai_proxy_server_url=https://api.openai.com/v1
# proxy_openai_proxy_api_key={your-openai-sk}
# proxy_openai_proxy_backend=text-embedding-ada-002
#
# EMBEDDING_MODEL=proxy_tongyi
# proxy_tongyi_proxy_backend=text-embedding-v1
# proxy_tongyi_proxy_api_key={your-api-key}
#
#EMBEDDING_MODEL=proxy_qianfan
#proxy_qianfan_proxy_backend=bge-large-zh
#proxy_qianfan_proxy_api_key={your-api-key}
#proxy_qianfan_proxy_api_secret={your-secret-key}
知识图谱抽取 -> knowledge graph, 通过利用大模型提取(实体,关系,实体)三元组结构。
class TripletExtractor(LLMExtractor):
"""TripletExtractor class."""
def __init__(self, llm_client: LLMClient, model_name: str):
"""Initialize the TripletExtractor."""
super().__init__(llm_client, model_name, TRIPLET_EXTRACT_PT)
TRIPLET_EXTRACT_PT = (
"Some text is provided below. Given the text, "
"extract up to knowledge triplets as more as possible "
"in the form of (subject, predicate, object).\n"
"Avoid stopwords. The subject, predicate, object can not be none.\n"
"---------------------\n"
"Example:\n"
"Text: Alice is Bob's mother.\n"
"Triplets:\n(Alice, is mother of, Bob)\n"
"Text: Alice has 2 apples.\n"
"Triplets:\n(Alice, has 2, apple)\n"
"Text: Alice was given 1 apple by Bob.\n"
"Triplets:(Bob, gives 1 apple, Bob)\n"
"Text: Alice was pushed by Bob.\n"
"Triplets:(Bob, pushes, Alice)\n"
"Text: Bob's mother Alice has 2 apples.\n"
"Triplets:\n(Alice, is mother of, Bob)\n(Alice, has 2, apple)\n"
"Text: A Big monkey climbed up the tall fruit tree and picked 3 peaches.\n"
"Triplets:\n(monkey, climbed up, fruit tree)\n(monkey, picked 3, peach)\n"
"Text: Alice has 2 apples, she gives 1 to Bob.\n"
"Triplets:\n"
"(Alice, has 2, apple)\n(Alice, gives 1 apple, Bob)\n"
"Text: Philz is a coffee shop founded in Berkeley in 1982.\n"
"Triplets:\n"
"(Philz, is, coffee shop)\n(Philz, founded in, Berkeley)\n"
"(Philz, founded in, 1982)\n"
"---------------------\n"
"Text: {text}\n"
"Triplets:\n"
)
倒排索引抽取 -> keywords分词
可以用es默认的分词库,也可以使用es的插件模式自定义分词
知识存储
整个知识持久化统一实现了IndexStoreBase接口,目前提供了向量数据库、图数据库、全文索引三类实现。
VectorStore,向量数据库主要逻辑都在load_document(),包括索引schema创建,向量数据分批写入等等。
- VectorStoreBase
- ChromaStore
- MilvusStore
- OceanbaseStore
- ElasticsearchStore
- PGVectorStore
class VectorStoreBase(IndexStoreBase, ABC):
"""Vector store base class."""
@abstractmethod
def load_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database."""
@abstractmethod
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database."""
@abstractmethod
def similar_search_with_scores(
self,
text,
topk,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Similar search with scores in index database."""
def similar_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
return self.similar_search_with_scores(text, topk, 1.0, filters)
GraphStore ,具体的图存储提供了三元组写入的实现,一般会调用具体的图数据库的查询语言来完成。例如TuGraphStore会根据三元组生成具体的Cypher语句并执行。
图存储接口GraphStoreBase提供统一的图存储抽象,目前内置了MemoryGraphStore和TuGraphStore的实现,我们也提供Neo4j接口给开发者进行接入。
- GraphStoreBase
- TuGraphStore
- Neo4jStore
def insert_triplet(self, subj: str, rel: str, obj: str) -> None:
"""Add triplet."""
...TL;DR...
subj_query = f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
obj_query = f"MERGE (n1:{self._node_label} {{id:'{obj}'}})"
rel_query = (
f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
f"-[r:{self._edge_label} {{id:'{rel}'}}]->"
f"(n2:{self._node_label} {{id:'{obj}'}})"
)
self.conn.run(query=subj_query)
self.conn.run(query=obj_query)
self.conn.run(query=rel_query)
FullTextStore: 通过构建es索引,通过es内置分词算法进行分词,然后由es构建keyword->doc_id的倒排索引。
{ "analysis": {"analyzer": {"default": {"type": "standard"}}}, "similarity": { "custom_bm25": { "type": "BM25", "k1": self._k1, "b": self._b, } }, } self._es_mappings = { "properties": { "content": { "type": "text", "similarity": "custom_bm25", }, "metadata": { "type": "keyword", }, } }
目前提供的全文索引接口支持Elasticsearch,同时也定义了OpenSearch的接口
- FullTextStoreBase - ElasticDocumentStore - OpenSearchStore
1.2 知识检索
question -> rewrite -> similarity_search -> rerank -> context_candidates
接下来是知识检索,目前社区的检索逻辑主要分为这几步,如果你设置了查询改写参数,目前会通过大模型给你进行一轮问题改写,然后会根据你的知识加工方式路由到对应的检索器,如果你是通过向量进行加工的,那就会通过EmbeddingRetriever进行检索,如果你构建方式是通过知识图谱构建的,就会按照知识图谱方式进行检索,如果你设置了rerank模型,会给粗筛后的候选值进行精筛,让候选值和用户问题更有关联。
EmbeddingRetriever
class EmbeddingRetriever(BaseRetriever):
"""Embedding retriever."""
def __init__(
self,
index_store: IndexStoreBase,
top_k: int = 4,
query_rewrite: Optional[QueryRewrite] = None,
rerank: Optional[Ranker] = None,
retrieve_strategy: Optional[RetrieverStrategy] = RetrieverStrategy.EMBEDDING,
):
async def _aretrieve_with_score(
self,
query: str,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Retrieve knowledge chunks with score.
Args:
query (str): query text
score_threshold (float): score threshold
filters: metadata filters.
Return:
List[Chunk]: list of chunks with score
"""
queries = [query]
new_queries = await self._query_rewrite.rewrite(
origin_query=query, context=context, nums=1
)
queries.extend(new_queries)
candidates_with_score = [
self._similarity_search_with_score(
query, score_threshold, filters, root_tracer.get_current_span_id()
)
for query in queries
]
...
new_candidates_with_score = await self._rerank.arank(
new_candidates_with_score, query
)
return new_candidates_with_score
filters:Optional[MetadataFilters], 元数据信息过滤器,主要是可以用来前置通过属性信息筛掉一些不匹配的候选信息。
class FilterCondition(str, Enum):
"""Vector Store Meta data filter conditions."""
AND = "and"
OR = "or"
class MetadataFilter(BaseModel):
"""Meta data filter."""
key: str = Field(
...,
description="The key of metadata to filter.",
)
operator: FilterOperator = Field(
default=FilterOperator.EQ,
description="The operator of metadata filter.",
)
value: Union[str, int, float, List[str], List[int], List[float]] = Field(
...,
description="The value of metadata to filter.",
)
Graph RAG
首先通过模型进行关键词抽取,这里可以通过传统的nlp技术进行分词,也可以通过大模型进行分词,然后进行关键词按照同义词做扩充,找到关键词的候选列表,最好根据关键词候选列表调用explore方法召回局部子图。
KEYWORD_EXTRACT_PT = (
"A question is provided below. Given the question, extract up to "
"keywords from the text. Focus on extracting the keywords that we can use "
"to best lookup answers to the question.\n"
"Generate as more as possible synonyms or alias of the keywords "
"considering possible cases of capitalization, pluralization, "
"common expressions, etc.\n"
"Avoid stopwords.\n"
"Provide the keywords and synonyms in comma-separated format."
"Formatted keywords and synonyms text should be separated by a semicolon.\n"
"---------------------\n"
"Example:\n"
"Text: Alice is Bob's mother.\n"
"Keywords:\nAlice,mother,Bob;mummy\n"
"Text: Philz is a coffee shop founded in Berkeley in 1982.\n"
"Keywords:\nPhilz,coffee shop,Berkeley,1982;coffee bar,coffee house\n"
"---------------------\n"
"Text: {text}\n"
"Keywords:\n"
)
def explore(
self,
subs: List[str],
direct: Direction = Direction.BOTH,
depth: Optional[int] = None,
fan: Optional[int] = None,
limit: Optional[int] = None,
) -> Graph:
"""Explore on graph."""
DBSchemaRetriever 这部分是ChatData场景的schema-linking检索
主要是通过schema-linking方式通过二阶段相似度检索,首先先找到最相关的表,然后再最相关的字段信息。
优点:这种二阶段检索也是为了解决社区反馈的大宽表体验的问题。
def _similarity_search(
self, query, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
"""Similar search."""
table_chunks = self._table_vector_store_connector.similar_search_with_scores(
query, self._top_k, 0, filters
)
not_sep_chunks = [
chunk for chunk in table_chunks if not chunk.metadata.get("separated")
]
separated_chunks = [
chunk for chunk in table_chunks if chunk.metadata.get("separated")
]
if not separated_chunks:
return not_sep_chunks
# Create tasks list
tasks = [
lambda c=chunk: self._retrieve_field(c, query) for chunk in separated_chunks
]
# Run tasks concurrently
separated_result = run_tasks(tasks, concurrency_limit=3)
# Combine and return results
return not_sep_chunks + separated_result
table_vector_store_connector: 负责检索最相关的表。
field_vector_store_connector: 负责检索最相关的字段。
二、知识加工,知识检索优化思路
目前RAG智能问答应用几个痛点:
2.1知识处理优化
非结构化/半结构化/结构化数据的处理,准备决定着RAG应用的上限,因此首先需要在知识处理,索引阶段做大量的细粒度的ETL工作,主要优化的思路方向:
提取更加丰富的, 多元化的语义信息。
目的:需要对文档进行精确的解析,更多元化的识别到不同类型的数据。
优化建议:
保留图片链接,公式等信息,也统一处理成markdown的格式。
目的:保存上下文完整性和相关性,这直接关乎回复准确率。
保持在大模型的上下文限制内,分块保证输入到LLMs的文本不会超过其token限制。
优化建议:
图片 + 表格 单独抽取成Chunk,将表格和图片标题保留到metadata元数据里。
文档内容尽量按照标题层级或者Markdown Header进行拆分,尽可能保留chunk的完整性。
如果有自定义分隔符可以按照自定义分割符切分。
除了对文档进行Embedding向量抽取外,其他多元化的信息抽取能够对文档进行数据增强,显著提升RAG召回效果。
知识图谱
优点:1. 解决NativeRAG的完整性缺失,依然存在幻觉问题,知识的准确性,包括知识边界的完整性、知识结构和语义的清晰性,是对相似度检索的能力的一种语义补充。
适用场景:适用于严谨的专业领域(医疗,运维等),知识的准备需要受到约束的并且知识之间能够明显建立层级关系的。
如何实现:
1.依赖大模型提取(实体,关系,实体)三元组关系。
2. 依赖前期高质量,结构化的知识准备,清洗,抽取,通过业务规则通过手动或者自定义SOP流程构建知识图谱。
Doc Tree
适用场景:解决了上下文完整性不足的问题,也能匹配时完全依据语义和关键词,能够减少噪音
如何实现:以标题层级构建chunk的树形节点,形成一个多叉树结构,每一层级节点只需要存储文档标题,叶子节点存储具体的文本内容。这样利用树的遍历算法,如果用户问题命中相关非叶子标题节点,就可以将相关的子节点数据进行召回。这样就不会存在chunk完整性缺失的问题。
这部分的Feature我们也会在明年年初放到社区里面。
提取QA对,需要前置通过预定义或者模型抽取的方式提取QA对信息
适用场景:
能够在检索中命中问题并直接进行召回,直接检索到用户想要的答案,适用于一些FAQ场景,召回完整性不够的场景。
如何实现:
预定义:预先为每个chunk添加一些问题
模型抽取:通过给定一下上下文,让模型进行QA对抽取
元数据抽取
如何实现:根据自身业务数据特点,提取数据的特征进行保留,比如标签,类别,时间,版本等元数据属性。
适用场景:检索时候能够预先根据元数据属性进行过滤掉大部分噪音。
总结提取
适用场景:解决这篇文章讲了个啥,总结一下等全局问题场景。
如何实现:通过mapreduce等方式分段抽取,通过模型为每段chunk提取摘要信息。
目前DB-GPT知识库提供了文档上传 -> 解析 -> 切片 -> Embedding -> 知识图谱三元组抽取 -> 向量数据库存储 -> 图数据库存储等知识加工的能力,但是不具备对文档进行复杂的个性化的信息抽取能力,因此希望通过构建知识加工工作流模版来完成复杂的,可视化的,用户可自定义的知识抽取,转换,加工流程。
知识加工工作流:https://www.yuque.com/eosphoros/dbgpt-docs/vg2gsfyf3x9fuglf
2.2 RAG流程优化
RAG流程的优化我们又分为了静态文档的RAG和动态数据获取的RAG,目前大部分涉及到的RAG只覆盖了非结构化的文档静态资产,但是实际业务很多场景的问答是通过工具获取动态数据 + 静态知识数据共同回答的场景,不仅需要检索到静态的知识,同时需要RAG检索到工具资产库里面工具信息并执行获取动态数据。
目的:澄清用户语义,将用户的原始问题从模糊的,意图不清晰的查询优化为含义更丰富的一个可检索的Query
原始问题分类,通过问题分类可以
LLM分类(LLMExtractor)
构建embedding+逻辑回归实现双塔模型,text2nlu DB-GPT-Hub/src/dbgpt-hub-nlu/README.zh.md at main · eosphoros-ai/DB-GPT-Hub
tip:需要高质量的Embedding模型,推荐bge-v1.5-large
反问用户,如果语义不清晰将问题再抛给用户进行问题澄清,通过多轮交互
通过热搜词库根据语义相关性给用户推荐他想要的问题候选列表
槽位提取,目的是获取用户问题中的关键slot信息,比如意图,业务属性等等
LLM提取(LLMExtractor)
问题改写
热搜词库进行改写
多轮交互
当我们把索引分成许多chunks并且都存储在相同的知识空间里面,检索效率会成为问题。比如用户问"浙江我武科技公司"相关信息时,并不想召回其他公司的信息。因此,如果可以通过公司名称元数据属性先进行过滤,就会大大提升效率和相关度。
async def aretrieve(
self, query: str, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
"""Retrieve knowledge chunks.
Args:
query (str): async query text.
filters: (Optional[MetadataFilters]) metadata filters.
Returns:
List[Chunk]: list of chunks
"""
return await self._aretrieve(query, filters)
按照优先级召回,分别为不同的检索器定义优先级,检索到内容后立即返回
定义不同检索,比如qa_retriever, doc_tree_retriever写入到队列里面, 通过队列的先进先出的特性实现优先级召回。
class RetrieverChain(BaseRetriever):
"""Retriever chain class."""
def __init__(
self,
retrievers: Optional[List[BaseRetriever]] = None,
executor: Optional[Executor] = None,
):
"""Create retriever chain instance."""
self._retrievers = retrievers or []
self._executor = executor or ThreadPoolExecutor()
for retriever in self._retrievers:
candidates_with_scores = await retriever.aretrieve_with_scores(
query=query, score_threshold=score_threshold, filters=filters
)
if candidates_with_scores:
return candidates_with_scores
多知识索引/空间并行召回
通过知识的不同索引形式,通过并行召回方式获取候选列表,保证召回完整性。
经过粗筛候选列表后,怎么通过精筛过滤噪音呢
无关的候选分片剔除
业务属性不满足剔除
topk去重
重排序 仅仅靠粗筛的召回还不够,这时候我们需要有一些策略来对检索的结果做重排序,比如把组合相关度、匹配度等因素做一些重新调整,得到更符合我们业务场景的排序。因为在这一步之后,我们就会把结果送给LLM进行最终处理了,所以这一部分的结果很重要。
使用相关重排序模型进行精筛,可以使用开源的模型,也可以使用带业务语义微调的模型。
## Rerank model#RERANK_MODEL=bce-reranker-base#### If you not set RERANK_MODEL_PATH, DB-GPT will read the model path from EMBEDDING_MODEL_CONFIG based on the RERANK_MODEL.#RERANK_MODEL_PATH=/Users/chenketing/Desktop/project/DB-GPT-NEW/DB-GPT/models/bce-reranker-base_v1#### The number of rerank results to return#RERANK_TOP_K=5
根据不同索引召回的内容进行业务RRF加权综合打分剔除
score = 0.0
for q in queries:
if d in result(q):
score += 1.0 / ( k + rank( result(q), d ) )
return score
# where
# k is a ranking constant
# q is a query in the set of queries
# d is a document in the result set of q
# result(q) is the result set of q
# rank( result(q), d ) is d's rank within the result(q) starting from 1
让模型使用markdown的格式进行输出
基于以下给出的已知信息, 准守规范约束,专业、简要回答用户的问题.规范约束: 1.如果已知信息包含的图片、链接、表格、代码块等特殊markdown标签格式的信息,确保在答案中包含原文这些图片、链接、表格和代码标签,不要丢弃不要修改,如:图片格式:, 链接格式:[xxx](xxx), 表格格式:|xxx|xxx|xxx|, 代码格式:```xxx```. 2.如果无法从提供的内容中获取答案, 请说: "知识库中提供的内容不足以回答此问题" 禁止胡乱编造. 3.回答的时候最好按照1.2.3.点进行总结, 并以markdwon格式显示.
文档类知识是相对静态的,无法回答个性化以及动态的信息, 需要依赖一些第三方平台工具才可以回答,基于这种情况,我们需要一些动态RAG的方法,通过工具资产定义 -> 工具选择 -> 工具校验 -> 工具执行获取动态数据
构建企业领域工具资产库,将散落到各个平台的工具API,工具脚本进行整合,进而提供智能体端到端的使用能力。比如,除了静态知识库以外,我们可以通过导入工具库的方式进行工具的处理。
工具召回沿用静态知识的RAG召回的思路,再通过完整的工具执行生命周期来获取工具执行结果。
槽位提取:通过传统nlp获取LLM将用户问题进行解析,包括常用的业务类型,标签,领域模型参数等等。
工具选择:沿用静态RAG的思路召回,主要有两层,工具名召回和工具参数召回。
工具参数召回,和TableRAG思路类似,先召回表名,再召回字段名。
参数填充:需要根据召回的工具参数定义,和槽位提取出来的参数进行match
可以代码进行填充,也可以让模型进行填充。
优化思路:由于各个平台工具的同样的参数的参数名没有统一,也不方便去治理,建议可以先进行一轮领域模型数据扩充,拿到整个领域模型后,需要的参数都会存在。
参数校验
完整性校验:进行参数个数完整性校验
参数规则校验:进行参数名类型,参数值,枚举等等规则校验。
参数纠正/对齐,这部分主要是为了减少和用户的交互次数,自动化完成用户参数错误纠正,包括大小写规则,枚举规则等等。eg:
在评估智能问答流程时,需要单独对召回相关性准确率以及模型问答的相关性进行评估,然后再综合考虑,以判断RAG流程在哪些方面仍需改进。
评价指标:
EvaluationMetric├── LLMEvaluationMetric│ ├── AnswerRelevancyMetric├── RetrieverEvaluationMetric│ ├── RetrieverSimilarityMetric│ ├── RetrieverMRRMetric│ └── RetrieverHitRateMetric
RAG召回指标(RetrieverEvaluationMetric):
RetrieverHitRateMetric:命中率衡量的是RAG retriever召回出现在检索结果前top-k个文档中的比例。
RetrieverMRRMetric: Mean Reciprocal Rank通过分析最相关文档在检索结果里的排名来计算每个查询的准确性。更具体地说,它是所有查询的相关文档排名倒数的平均值。例如,若最相关的文档排在第一位,其倒数排名为 1;排在第二位时,为 1/2;以此类推。
RetrieverSimilarityMetric: 相似度指标计算,计算召回内容与预测内容的相似度。
模型生成答案指标:
AnswerRelevancyMetric:智能体答案相关性指标,通过智能体答案与用户提问的匹配程度。高相关性的答案不仅要求模型能够理解用户的问题,还要求其能够生成与问题密切相关的答案。这直接影响到用户的满意度和模型的实用性。
RAG评测教程参考:
评估(Evaluation)https://www.yuque.com/eosphoros/dbgpt-docs/czgl7bsfclc1xsmh
三、RAG落地案例分享
3.1数据基础设施领域的RAG
在数据基础设施领域,有很多运维SRE,每天会接收到大量的告警,因此很多时间来需要响应应急事件,进而进行故障诊断,然后故障复盘,进而进行经验沉淀。另外一部分时间又需要响应用户咨询,需要他们用他们的知识以及三方平台工具使用经验进行答疑。
因此我们希望通过打造一个数据基础设施的通用智能体来解决告警诊断,答疑的这些问题。
传统的 RAG + Agent 技术可以解决通用的,确定性没那么高的,单步任务场景。但是面对数据基础设施领域的专业场景,整个检索过程必须是确定,专业和真实的,并且是需要一步一步推理的。
右边是一个通过NativeRAG的一个泛泛而谈的总结,可能对于一个普通的用户,对专业的领域知识没那么了解时,可能是有用的信息,但是这部分对于数据基础设施领域的专业人士,就没有什么意义了。因此我们比较了通用的智能体和数据基础设施智能体在RAG上面的区别:
通用的智能体:传统的RAG对知识的严谨和专业性要求没那么高,适用于客服,旅游,平台答疑机器人这样的一些业务场景。
数据基础设施智能体:RAG流程是严谨和专业的,需要专属的RAG工作流程,上下文包括(DB告警->根因定位->应急止血->故障恢复),并且需要对专家沉淀的问答和应急经验,进行结构化的抽取,建立层次关系。因此我们选择知识图谱来作为数据承载。
基于数据基础设施的确定性和特殊性,我们选择通过结合知识图谱来作为诊断应急经验的知识承载。我们通过SRE沉淀下来的应急排查事件知识经验 结合应急复盘流程,建立了DB应急事件驱动的知识图谱,我们以DB抖动为例,影响DB抖动的几个事件,包括慢SQL问题,容量问题,我们在各个应急事件间建立了层级关系。
最后通过我们通过规范化应急事件规则,一步一步地建立了多源的知识 -> 知识结构化抽取 ->应急关系抽取 -> 专家审核 -> 知识存储的一套标准化的知识加工体系。
在智能体检索阶段,我们使用GraphRAG作为静态知识检索的承载,因此识别到DB抖动异常后,找到了与DB抖动异常节点相关的节点作为我们分析依据,由于在知识抽取阶段每一个节点还保留了每个事件的一些元数据信息,包括事件名,事件描述,相关工具,工具参数等等,
因此我们可以通过执行工具的执行生命周期链路来获取返回结果拿到动态数据来作为应急诊断的排查依据。通过这种动静结合的混合召回的方式比纯朴素的RAG召回,保障了数据基础设施智能体执行的确定性,专业性和严谨性。
最后通过社区AWEL+AGENT技术,通过AGENT编排的范式,打造了从意图专家-> 应急诊断专家 -> 诊断根因分析专家。
每个Agent的职能都是不一样的,意图专家负责识别解析用户的意图和识别告警信息诊断专家需要通过GraphRAG 定位到需要分析的根因节点,以及获取具体的根因信息。分析专家需要结合各个根因节点的数据 + 历史分析复盘报告生成诊断分析报告。
3.2金融财报分析领域的RAG
基于DB-GPT的财报分析助手:https://www.yuque.com/eosphoros/dbgpt-docs/cmogrzbtmqf057oe
四、总结
建议围绕各自领域构建属于自己的领域资产库包括,知识资产,工具资产以及知识图谱资产
领域资产:领域资产包括了领域知识库,领域API,工具脚本,领域知识图谱。
资产处理,整个资产数据链路涉及了领域资产加工,领域资产检索和领域资产评估。
提取更加丰富的语义信息。
资产检索:
后置过滤很重要,最好能通过业务语义一些规则进行过滤。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-09-04
2024-10-27
2024-07-18
2024-05-05
2024-06-20
2024-06-13
2024-07-09
2024-07-09
2024-05-19
2024-07-07
2025-03-11
2025-03-05
2025-03-03
2025-03-02
2025-02-28
2025-02-24
2025-02-23
2025-02-15