AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


如何将RAG模型扩展应用于生产环境
发布日期:2024-03-30 08:36:53 浏览次数: 1941 来源:大数据架构与数据科学


Retrieval-augmented Generation (RAG) combines Large Language Models (LLMs) with external data to reduce the probability of machine hallucinations - AI-generated information that misrepresents underlying data or reality. When developing RAG systems, scalability is often an afterthought. This creates problems when moving from initial development to production. Having to manually adjust code while your application grows can get very costly and is prone to errors.

检索增强型生成(Retrieval-augmented Generation, RAG)结合了大型语言模型(Large Language Models, LLMs)与外部数据,以减少机器幻觉的可能性——即人工智能产生的、歪曲底层数据或现实的信息。在开发RAG系统时,可扩展性常常是事后考虑的问题。这在从初步开发迁移到生产环境时会造成问题。随着应用程序的增长,不得不手动调整代码可能会非常昂贵,并且容易出错。

Our tutorial provides an example of how you can develop a RAG pipeline with production workloads in mind from the start, using the right tools - ones that are designed to scale your application.

我们的教程提供了一个如何从一开始就考虑生产工作负载来开发RAG管道的示例,使用正确的工具——这些工具被设计用来扩展您的应用程序。

Development vs. production

开发VS生产

The goals and requirements of development and production are usually very different. This is particularly true for new technologies like Large Language Models (LLMs) and Retrieval-augmented Generation (RAG), where organizations prioritize rapid experimentation to test the waters before committing more resources. Once important stakeholders are convinced, the focus shifts from demonstrating an application's potential for creating value to actually creating value, via production. Until a system is productionized, its ROI is typically zero.

开发和生产的目标和要求通常大相径庭。对于像大型语言模型(Large Language Models, LLMs)和检索增强型生成(Retrieval-augmented Generation, RAG)这样的新技术尤其如此,在这些技术上,组织通常会优先考虑快速的实验,以便在投入更多资源之前先测试水温。一旦重要的利益相关者被说服,焦点就会从展示应用程序创造价值的潜力转移到通过生产实际创造价值。在系统实现生产化之前,其投资回报率通常为零。

Productionizing, in the context of RAG systems, involves transitioning from a prototype or test environment to a stable, operational state, in which the system is readily accessible and reliable for remote end users, such as via URL - i.e., independent of the end user machine state. Productionizing also involves scaling the system to handle varying levels of user demand and traffic, ensuring consistent performance and availability.

在RAG系统的背景下,生产化涉及从原型或测试环境过渡到稳定的运营状态,在该状态下,系统可以方便地被远程最终用户访问和依赖,例如通过URL——即独立于最终用户机器状态。生产化还包括扩展系统以处理不同水平的用户需求和流量,确保性能和可用性的一致性。

Even though there is no ROI without productionizing, organizations often underesimate the hurdles involved in getting to an end product. Productionizing is always a trade-off between performance and costs, and this is no different for Retrieval-augmented Generation (RAG) systems. The goal is to achieve a stable, operational, and scalable end product while keeping costs low.

尽管没有生产化就没有投资回报,组织往往低估了达到最终产品所涉及的障碍。生产化始终是性能与成本之间的权衡,对于检索增强型生成(RAG)系统来说也不例外。目标是在保持低成本的同时,实现一个稳定、运营和可扩展的最终产品。

Let's look more closely at the basic requirements of an RAG system, before going in to the specifics of what you'll need to productionize it in a cost-effective but scalable way.

让我们更仔细地看看RAG系统的基本要求,然后再具体讨论您将如何以经济有效但可扩展的方式生产化它。

The basics of RAG

RAG基础

The most basic RAG workflow looks like this:

  • Submit a text query to an embedding model, which converts it into a semantically meaningful vector embedding.
  • Send the resulting query vector embedding to your document embeddings storage location - typically a vector database.
  • Retrieve the most relevant document chunks - based on proximity of document chunk embeddings to the query vector embedding.
  • Add the retrieved document chunks as context to the query vector embedding and send it to the LLM.
  • The LLM generates a response utilizing the retrieved context.

最基础的RAG工作流程如下所示:

  • 将文本查询提交给嵌入模型,该模型将其转换为具有语义意义的向量嵌入。
  • 将得到的查询向量嵌入发送到您的文档嵌入存储位置——通常是一个向量数据库。
  • 检索最相关的文档片段——基于文档片段嵌入与查询向量嵌入之间的接近度。
  • 将检索到的文档片段作为上下文添加到查询向量嵌入中,然后发送给LLM。
  • LLM利用检索到的上下文生成响应。

While RAG workflows can become significantly more complex, incorporating methods like metadata filtering and retrieval reranking, all RAG systems must contain the components involved in the basic workflow: an embedding model, a store for document and vector embeddings, a retriever, and a LLM.

While RAG workflows can become significantly more complex, incorporating methods like metadata filtering and retrieval reranking, all RAG systems must contain the components involved in the basic workflow: an embedding model, a store for document and vector embeddings, a retriever, and a LLM.

虽然RAG工作流程可能会变得更加复杂,例如融入元数据过滤和检索结果重排等方法,但所有RAG系统都必须包含基本工作流程中涉及的组件:嵌入模型、用于存储文档和向量嵌入的存储系统、检索器以及大型语言模型(LLM)。

But smart development, with productionization in mind, requires more than just setting up your components in a functional way. You must also develop with cost-effective scalability in mind. For this you'll need not just these basic components, but more specifically the tools appropriate to configuring a scalable RAG system.

然而,以生产化为目的的智能开发不仅仅需要以功能性的方式搭建您的组件。您还必须考虑到成本效益高的可扩展性。为此,您需要的不仅是这些基本组件,更具体地是,您需要适合配置可扩展RAG系统的合适工具。

Developing for scalability: the right tools

为可扩展性而开发:选择合适的工具

LLM library: LangChain

大型语言模型库:LangChain

As of this writing, LangChain, while it has also been the subject of much criticism, is arguably the most prominent LLM library. A lot of developers turn to Langchain to build Proof-of-Concepts (PoCs) and Minimum Viable Products (MVPs), or simply to experiment with new ideas. Whether one chooses LangChain or one of the other major LLM and RAG libraries - for example, LlamaIndex or Haystack, to name our alternate personal favorites - they can all be used to productionize an RAG system. That is, all three have integrations for third-party libraries and providers that will handle production requirements. Which one you choose to interface with your other components depends on the details of your existing tech stack and use case.

截至目前,尽管LangChain也受到了不少批评,但可以说是最著名的大型语言模型(LLM)库。许多开发者转向LangChain来构建概念验证(Proof-of-Concepts,PoCs)和最小可行产品(Minimum Viable Products,MVPs),或者只是为了尝试新想法。无论是选择LangChain还是其他主要的LLM和RAG库——例如,我们其他个人喜欢的LlamaIndex或Haystack——它们都可以用来将RAG系统商用化。也就是说,所有这三个库都有集成第三方库和提供商的接口,这些接口能够处理生产环境的要求。您选择哪一个来与系统的其他组件交互,取决于您现有技术栈和用例的具体细节。

For the purpose of this tutorial, we'll use part of the Langchain documentation, along with Ray.

为了本教程的目的,我们将使用部分Langchain文档以及Ray。

Langchain是一个强大的大型语言模型库,Ray则是一个开源框架,它用于在分布式环境中构建和运行大规模的应用程序。结合使用Langchain和Ray,可以有效地搭建和扩展RAG系统,使其能够处理生产级的负载和需求。

使用Langchain,开发者可以通过其提供的高级API和构建模块轻松地集成和部署LLM。而通过Ray,可以管理资源和并行处理,以及自动缩放以适应不断变化的工作负载。这种组合提供了构建高度可扩展和可靠RAG应用程序的强大基础。

在本教程中,我们将展示如何利用Langchain和Ray的特性来开发一个可生产化的RAG系统,并介绍它们如何协同工作来支持系统的扩展和稳定运行。

Scaling with Ray

利用Ray进行扩展

Ray是一个开源的分布式计算框架,旨在简化并行和异步编程。

它非常适合用于扩展机器学习模型和复杂数据处理流程,使其能够处理大量的数据和请求。Ray提供了一个简单的API来定义任务和管理状态,同时提供了高性能的跨节点通信。

在利用Ray进行扩展时,您可以考虑以下几个方面:

  1. 分布式任务执行:Ray允许您将工作分配给集群中的不同节点,从而并行处理大量任务。这对于处理RAG系统中高并发的检索和生成任务至关重要。
  2. 动态资源管理:Ray可以动态调整资源分配,以适应不同的工作负载。例如,如果检索任务的数量激增,Ray可以自动启动更多的工作器节点来处理这些任务。
  3. 故障恢复:Ray自带故障恢复机制,如果某个节点出现问题,Ray可以在其他节点上重新执行失败的任务,从而确保系统的稳定性。
  4. 内存和对象管理:Ray优化了内存使用,减少了数据在节点之间传输的需要,这提高了处理速度并减少了延迟。
  5. 集成第三方库:Ray支持与多种流行的数据处理和机器学习库集成,如Pandas、Scikit-learn和PyTorch。

在您的RAG系统中整合Ray,意味着您可以构建一个可以横向扩展以满足增长需求的系统。通过将Langchain与Ray结合,可以实现一个强大的、可扩展的RAG系统,无论是处理增加的用户请求,还是对更大数据集进行检索和语言模型生成时,都可以保持高性能。

Because our goal is to build a 1) simple, 2) scalable, and 3) economically feasible option, not reliant on proprietary solutions, we have chosen to use Ray, a Python framework for productionizing and scaling machine learning (ML) workloads. Ray is designed with a range of auto-scaling features that seamlessly scale ML systems. It's also adaptable to both local environments and Kubernetes, efficiently managing all workload requirements.

因为我们的目标是构建一个1) 简单的,2) 可扩展的,以及3) 经济可行的选项,而且不依赖于专有解决方案,我们选择使用Ray,一个用于生产化和扩展机器学习(ML)工作负载的Python框架。Ray设计了一系列自动扩展功能,无缝扩展ML系统。它还能够适应本地环境和Kubernetes,有效管理所有工作负载的需求。

Ray permits us to keep our tutorial system simple, non-proprietary, and on our own network, rather than the cloud. While LangChain, LlamaIndex, and Haystack libraries support cloud deployment for AWS, Azure, and GCP, the details of cloud deployment heavily depend on - and are therefore very particular to - the specific cloud provider you choose. These libraries also all contain Ray integrations to enable scaling. But using Ray directly will provide us with more universally applicable insights, given that the Ray integrations within LangChain, LlamaIndex, and Haystack are built upon the same underlying framework.

Ray使我们能够保持教程系统简单、非专有,并在我们自己的网络上运行,而不是云端。虽然LangChain、LlamaIndex和Haystack库都支持AWS、Azure和GCP的云部署,但云部署的细节非常依赖于您选择的特定云提供商,因此特点也很明显。这些库也都包含了Ray集成以实现扩展。但直接使用Ray将提供给我们更普遍适用的见解,因为LangChain、LlamaIndex和Haystack内部的Ray集成都是建立在相同的底层框架之上的。

Now that we have our LLM library sorted, let's turn to data gathering and processing.

现在我们已经确定了LLM库,接下来让我们转向数据收集和处理。

Data gathering and processing

数据收集和处理

在构建任何基于检索增强型生成(RAG)的系统时,数据收集和处理是至关重要的步骤。系统的性能很大程度上依赖于输入数据的质量和格式。以下是实施有效的数据收集和处理策略的关键步骤:

  1. 数据源识别:确定最相关的数据来源。这可能包括公开的数据集、通过API访问的内容、爬虫收集的网络数据或其他专有数据源。
  2. 数据抓取和存储:从识别的数据源收集数据,并将数据存储在适合后续处理的格式和系统中,例如数据库、数据湖或文件系统。
  3. 数据清洗:处理原始数据以去除无关内容、纠正错误和填充缺失值。这包括文本规范化、去除停用词和标点、语法修正等。
  4. 数据标注:如果需要,对数据进行标注以提供监督学习的标签。这可能包括实体识别、情感分析或其他相关任务。
  5. 特征提取:转换文本数据为机器学习模型可以理解的格式。这通常涉及到生成文本嵌入或其他形式的向量化表示。
  6. 数据分割:将数据划分为训练集、验证集和测试集,以评估模型性能并防止过拟合。
  7. 扩展预处理管道:使用Ray等工具,您可以构建一个可扩展的预处理管道,它可以并行处理大量数据,并确保数据在整个RAG系统中流动的效率。

数据收集和处理不仅为RAG系统的训练提供了基础,而且对于系统的长期维护和更新也是必不可少的。确保数据的质量和可用性是生产化和扩展RAG系统的一个重要方面。通过自动化和优化这些步骤,您可以加快开发周期,提高系统的准确性和稳定性。

Gathering the data

Every ML journey starts with data, and that data needs to be gathered and stored somewhere. For this tutorial, we gather data from part of the LangChain documentation. We first download the html files and then create a Ray dataset of them.

每个机器学习项目都始于数据收集,这些数据需要被搜集并存储在某处。在本教程中,我们将从LangChain文档的一部分中收集数据。我们首先下载HTML文件,然后创建一个包含这些文件的Ray数据集。

We start by installing all the dependencies that we'll use:

我们首先安装我们将要使用的所有依赖项:

pip install ray langchain sentence-transformers qdrant-client einops openai tiktoken fastapi "ray[serve]"

We use the OpenAI API in this tutorial, so we'll need an API key. We export our API key as an environmental variable, and then initialize our Ray environment like this:

在本教程中,我们使用了OpenAI的API,因此我们需要一个API密钥。我们将API密钥导出为环境变量,然后像这样初始化Ray环境:

import os
import ray

working_dir = "downloaded_docs"

if not os.path.exists(working_dir):
    os.makedirs(working_dir)

# Setting up our Ray environment
ray.init(runtime_env={
    "env_vars": {
        "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"],
    },
    "working_dir": str(working_dir)
})

To work with the LangChain documentation, we need to download the html files and process them. Scraping html files can get very tricky and the details depend heavily on the structure of the website you’re trying to scrape. The functions below are only meant to be used in the context of this tutorial.

为了处理LangChain文档,我们需要下载HTML文件并对其进行处理。抓取HTML文件可能会非常棘手,具体细节很大程度上取决于您尝试抓取的网站的结构。下面的函数仅在本教程的上下文中使用。

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import re

def sanitize_filename(filename):
    filename = re.sub(r'[\\/*?:"<>|]''', filename)  # Remove problematic characters
    filename = re.sub(r'[^\x00-\x7F]+''_', filename)  # Replace non-ASCII characters
    return filename

def is_valid(url, base_domain):
    parsed = urlparse(url)
    valid = bool(parsed.netloc) and parsed.path.startswith("/docs/expression_language/")
    return valid

def save_html(url, folder):
    try:
        headers = {'User-Agent''Mozilla/5.0'}
        response = requests.get(url, headers=headers)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, 'html.parser')
        title = soup.title.string if soup.title else os.path.basename(urlparse(url).path)
        sanitized_title = sanitize_filename(title)
        filename = os.path.join(folder, sanitized_title.replace(" ""_") + ".html")

        if not os.path.exists(filename):
            with open(filename, 'w', encoding='utf-8'as file:
                file.write(str(soup))
            print(f"Saved: {filename}")

            links = [urljoin(url, link.get('href')) for link in soup.find_all('a'if link.get('href'and is_valid(urljoin(url, link.get('href')), base_domain)]
            return links
        else:
            return []
    except Exception as e:
        print(f"Error processing {url}{e}")
        return []

def download_all(start_url, folder, max_workers=5):
    visited = set()
    to_visit = {start_url}

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        while to_visit:
            future_to_url = {executor.submit(save_html, url, folder): url for url in to_visit}
            visited.update(to_visit)
            to_visit.clear()

            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    new_links = future.result()
                    for link in new_links:
                        if link not in visited:
                            to_visit.add(link)
                except Exception as e:
                    print(f"Error with future for {url}{e}")
                    

Because the LangChain documentation is very large, we'll download only a subset of it: LangChain's Expression Language (LCEL), which consists of 28 html pages.

由于LangChain文档非常庞大,我们只下载其中的一部分:LangChain的表达式语言(LCEL),它由28个HTML页面组成。

base_domain = "python.langchain.com"
start_url = "https://python.langchain.com/docs/expression_language/"
folder = working_dir

download_all(start_url, folder, max_workers=10)

Now that we've downloaded the files, we can use them to create our Ray dataset: 现在我们已经下载了文件,我们可以使用它们来创建我们的Ray数据集:

from pathlib import Path

# Ray dataset
document_dir = Path(folder)
ds = ray.data.from_items([{"path": path.absolute()} for path in document_dir.rglob("*.html"if not path.is_dir()])
print(f"{ds.count()} documents")

Great! But there's one more step left before we can move on to the next phase of our workflow. We need to extract the relevant text from our html files and clean up all the html syntax. For this, we import BeautifulSoup to parse the files and find relevant html tags.

很好!但在我们进入工作流程的下一个阶段之前,还有一个步骤需要完成。我们需要从HTML文件中提取相关文本,并清理所有的HTML语法。为此,我们需要导入BeautifulSoup来解析文件并查找相关的HTML标签。

from bs4 import BeautifulSoup, NavigableString

def extract_text_from_element(element):
    texts = []
    for elem in element.descendants:
        if isinstance(elem, NavigableString):
            text = elem.strip()
            if text:
                texts.append(text)
    return "\n".join(texts)

def extract_main_content(record):
    with open(record["path"], "r", encoding="utf-8"as html_file:
        soup = BeautifulSoup(html_file, "html.parser")

    main_content = soup.find(['main''article'])  # Add any other tags or class_="some-class-name" here
    if main_content:
        text = extract_text_from_element(main_content)
    else:
        text = "No main content found."

    path = record["path"]
    return {"path": path, "text": text}

We can now use Ray's map() function to run this extraction process. Ray lets us run multiple processes in parallel.

我们现在可以使用Ray的map()函数来运行这个提取过程。Ray允许我们并行运行多个进程。

# Extract content
content_ds = ds.map(extract_main_content)
content_ds.count()

Awesome! The results of the above extraction are our dataset. Because Ray datasets are optimized for scaled performance in production, they don't require us to make costly and error-prone adjustments to our code when our application grows.

太棒了!上述提取的结果就是我们的数据集。因为Ray数据集针对生产中的扩展性能进行了优化,当我们的应用程序增长时,它们不需要我们对代码进行代价高昂且易出错的调整。

Processing the data

To process our dataset, our next three steps are chunking, embedding, and indexing. 为了处理我们的数据集,接下来的三个步骤是分块(chunking)、嵌入(embedding)、和索引(indexing)。

  • Chunking: 分块是将文档划分为更小的部分,以便于处理和检索。例如,可以按段落、句子或固定的字符数来分块。分块有助于提高检索效率,并且可以帮助嵌入模型更好地处理和理解文本内容。

  • Embedding: 嵌入是指使用语言模型(例如BERT、GPT-3等)将文本转换成固定长度的向量。这些向量捕捉了文本的语义信息,并可以用于计算文档间的相似度或者进行机器学习任务。

  • Indexing: 索引是创建一个能够快速检索分块和嵌入的系统。这通常涉及到建立数据结构(如反向索引、KD树或者球树)以加快检索速度。当查询进来时,可以使用索引快速找到与查询最相关的文本块。

在Ray或类似的分布式处理系统中执行这些步骤时,可以通过并行处理和在多个节点上分布工作负载来实现高性能和可扩展性。这意味着即使数据集非常大,这些步骤也可以在合理的时间内完成。在此过程中,通常需要考虑内存管理、负载平衡和故障恢复等问题。

使用Ray来进行这些操作,你可以利用其map、reduce等方法,并利用Ray的分布式执行框架来在集群中执行这些操作。每个步骤都可以被设计为一个任务或一个actor,Ray会处理任务调度和资源分配。

Chunking the data

分块处理数据

Chunking - splitting your documents into multiple smaller parts - is necessary to make your data meet the LLM’s context length limits, and helps keep contexts specific enough to remain relevant. Chunks also need to not be too small. When chunks are too small, the information retrieved may become too narrow to provide adequate query responses. The optimal chunk size will depend on your data, the models you use, and your use case. We will use a common chunking value here, one that has been used in a lot of applications.

分块处理 - 将文档拆分成多个较小的部分 - 是必要的,以使数据满足大型语言模型(LLM)的上下文长度限制,并帮助保持上下文具体到足以保持相关性。同时,块也不应该太小。当块太小时,检索到的信息可能会变得过于狭窄,无法提供足够的查询响应。最佳的块大小将取决于您的数据、您使用的模型和您的用例。我们将在这里使用一个常见的分块值,这是在许多应用程序中使用过的值。

Let’s define our text splitting logic first, using a standard text splitter from LangChain:

让我们首先定义我们的文本分割逻辑,使用LangChain的标准文本分割器:

from functools import partial
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Defining our text splitting function
def chunking(document, chunk_size, chunk_overlap):
    text_splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n""\n"],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len)

    chunks = text_splitter.create_documents(
        texts=[document["text"]], 
        metadatas=[{"path": document["path"]}])
    return [{"text": chunk.page_content, "path": chunk.metadata["path"]} for chunk in chunks]

Again, we utilize Ray's map() function to ensure scalability:

再次,我们利用Ray的map()函数来确保可伸缩性:

chunks_ds = content_ds.flat_map(partial(
    chunking, 
    chunk_size=512
    chunk_overlap=50))
print(f"{chunks_ds.count()} chunks")

Now that we've gathered and chunked our data scalably, we need to embed and index it, so that we can efficiently retrieve relevant answers to our queries.

现在我们已经可扩展地收集并分块处理了我们的数据,接下来我们需要嵌入和索引它,这样我们就能高效地检索到与我们查询相关的答案。

Embedding the data

嵌入数据

We use a pretrained model to create vector embeddings for both our data chunks and the query itself. By measuring the distance between the chunk embeddings and the query embedding, we can identify the most relevant, or "top-k," chunks. Of the various pretrained models, we'll use the popular 'bge-base-en-v1.5' model, which, at the time of writing this tutorial, ranks as the highest-performing model of its size on the MTEB Leaderboard. For convenience, we continue using LangChain:

我们使用预训练的模型为我们的数据块和查询本身创建向量嵌入。通过测量数据块嵌入和查询嵌入之间的距离,我们可以确定最相关的,或者说“最佳k个”数据块。在各种预训练模型中,我们将使用流行的'bge-base-en-v1.5'模型,截至编写本教程时,它在MTEB排行榜上以其大小的模型性能排名最高。为方便起见,我们继续使用LangChain:

from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from ray.data import ActorPoolStrategy

def get_embedding_model(embedding_model_name, model_kwargs, encode_kwargs):
    embedding_model = HuggingFaceEmbeddings(
            model_name=embedding_model_name,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs)
    return embedding_model

This time, instead of map(), we want to use map_batches(), which requires defining a class object to perform a call on.

这一次,我们不使用map(),而是要使用map_batches(),这要求定义一个类对象来执行调用。

class EmbedChunks:
    def __init__(self, model_name):
        self.embedding_model = get_embedding_model(
            embedding_model_name=model_name,
            model_kwargs={"device""cuda"},
            encode_kwargs={"device""cuda""batch_size"100})
    def __call__(self, batch):
        embeddings = self.embedding_model.embed_documents(batch["text"])
        return {"text": batch["text"], "path": batch["path"], "embeddings": embeddings}

# Embedding our chunks
embedding_model_name = "BAAI/bge-base-en-v1.5"
embedded_chunks = chunks_ds.map_batches(
    EmbedChunks,
    fn_constructor_kwargs={"model_name": embedding_model_name},
    batch_size=100
    num_gpus=1,
    concurrency=1)

Indexing the data

索引数据

Now that our chunks are embedded, we need to store them somewhere. For the sake of this tutorial, we'll utilize Qdrant’s new in-memory feature, which lets us experiment with our code rapidly without needing to set up a fully-fledged instance. However, for deployment in a production environment, you should rely on more robust and scalable solutions — hosted either within your own network or by a third-party provider. For example, to fully productionize, we would need to point to our Qdrant (or your preferred hosted vendor) instance instead of using it in-memory. Detailed guidance on self-hosted solutions, such as setting up a Kubernetes cluster, are beyond the scope of this tutorial.

现在我们的数据块已经嵌入了,我们需要将它们存储在某个地方。为了本教程,我们将利用Qdrant的新内存功能,它允许我们在不需要设置一个完全成熟的实例的情况下迅速地试验我们的代码。然而,在生产环境中部署时,你应该依赖于更加健壮和可扩展的解决方案——无论是托管在你自己的网络内还是由第三方提供商托管。例如,要完全投入生产,我们需要指向我们的Qdrant(或你选择的托管供应商)实例,而不是在内存中使用它。关于自托管解决方案的详细指导,例如设置Kubernetes集群,超出了本教程的范围。

from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams

# Initalizing a local client in-memory
client = QdrantClient(":memory:")

client.recreate_collection(
   collection_name="documents",
   vectors_config=VectorParams(size=embedding_size, distance=Distance.COSINE),
)

To perform the next processing step, storage, using Ray would require more than 2 CPU scores, making this tutorial incompatible with the free tier of Google Colab. Instead, then, we'll use pandas. Fortunately, Ray allows us to convert our dataset into a pandas DataFrame with a single line of code:

为了执行下一个处理步骤——存储——使用Ray将需要超过2个CPU分数,这会使得本教程与Google Colab的免费层不兼容。因此,我们将改用pandas。幸运的是,Ray允许我们用一行代码将我们的数据集转换为一个pandas DataFrame:

emb_chunks_df = embedded_chunks.to_pandas()

Now that our dataset is converted to pandas, we define and execute our data storage function:

一旦将数据集转换成pandas DataFrame,就可以定义并执行数据存储函数了。存储函数的具体实现将取决于你选择的存储目标和格式。以下是一个示例性的数据存储函数,它将DataFrame数据导出到一个CSV文件中:

from qdrant_client.models import PointStruct

def store_results(df, collection_name="documents", client=client):
   # Defining our data structure
    points = [
        # PointStruct is the data classs used in Qdrant
        PointStruct(
            id=hash(path),  # Unique ID for each point
            vector=embedding,
            payload={
                "text": text,
                "source": path
            }
        )
        for text, path, embedding in zip(df["text"], df["path"], df["embeddings"])
    ]
  
  # Adding our data points to the collection
    client.upsert(
        collection_name=collection_name,
        points=points
    )

store_results(emb_chunks_df)

This wraps up the data processing part! Our data is now stored in our vector database and ready to be retrieved.

这标志着数据处理部分的结束!现在我们的数据已经存储在向量数据库中,可以随时检索了。

Data retrieval

数据检索 数据检索是指通过一个或多个查询条件从存储系统中获取相关信息的过程。在我们的案例中,数据检索通常涉及到使用由预训练模型创建的向量嵌入来搜索最相关的数据块。

When you retrieve data from vector storage, it's important to use the same embedding model for your query that you used for your source data. Otherwise, vector comparison to surface relevant content may result in mismatched or non-nuanced results (due to semantic drift, loss of context, or inconsistent distance metrics).

在从向量存储中检索数据时,使用与源数据相同的嵌入模型对查询进行处理至关重要。否则,向量比较以提取相关内容可能会导致不匹配或缺乏细微差别的结果(由于语义漂移、上下文丢失或距离度量不一致)。

import numpy as np 

# Embed query
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
query = "How to run agents?"
query_embedding = np.array(embedding_model.embed_query(query))
len(query_embedding)

Recall from above that we measure the distance between the query embedding and chunk embeddings to identify the most relevant, or 'top-k' chunks. In Qdrant’s search, the 'limit' parameter is equivalent to 'k'. By default, the search uses cosine similarity as the metric, and retrieves from our database the 5 chunks closest to our query embedding:

回想前文所述,我们通过测量查询嵌入与数据块嵌入之间的距离来识别最相关的或“top-k”数据块。在Qdrant的搜索中,“limit”参数等同于“k”。默认情况下,搜索使用余弦相似度作为度量标准,并从数据库中检索与我们的查询嵌入最接近的5个数据块:

hits = client.search(
    collection_name="documents",
    query_vector=query_embedding,
    limit=5  # Return 5 closest points
)

context_list = [hit.payload["text"for hit in hits]
context = "\n".join(context_list)

We rewrite this as a function for later use: 我们将这个过程重写为一个函数,以便后续使用:

def semantic_search(query, embedding_model, k):
    query_embedding = np.array(embedding_model.embed_query(query))
    hits = client.search(
      collection_name="documents",
      query_vector=query_embedding,
      limit=5  # Return 5 closest points
    )

    context_list = [{"id": hit.id, "source": str(hit.payload["source"]), "text": hit.payload["text"]} for hit in hits]
    return context_list

Generation

We're now very close to being able to field queries and retrieve answers! We've set up everything we need to query our LLM at scale. But before querying the model for a response, we want to first inform the query with our data, by retrieving relevant context from our vector database and then adding it to the query.

我们现在已经非常接近能够接收查询并检索答案的阶段了!我们已经设置了一切所需的设施来大规模地查询我们的大型语言模型(LLM)。但是,在向模型查询响应之前,我们首先想通过从我们的向量数据库中检索相关上下文信息并将其添加到查询中,以此来增强查询的信息量。

To do this, we use a simplified version of the generate.py script provided in Ray's LLM repository. This version is adapted to our code and - to simplify and keep our focus on how to scale a basic RAG system - leaves out a bunch of advanced retrieval techniques, such as reranking and hybrid search. For our LLM, we use gpt-3.5-turbo, and query it via the OpenAI API.

为了实现这一点,我们使用了 Ray's LLM 仓库中提供的 generate.py 脚本的简化版本。这个版本经过修改以适应我们的代码,并且为了简化和保持我们对如何扩展基本 RAG(检索增强生成)系统的关注,省略了一些高级检索技术,比如重排和混合搜索。对于我们的大型语言模型(LLM),我们使用了 gpt-3.5-turbo,并通过 OpenAI API 进行调用。

from openai import OpenAI

def get_client(llm):
  api_key = os.environ["OPENAI_API_KEY"]
  client = OpenAI(api_key=api_key)
  return client

def generate_response(
    llm,
    max_tokens=None,
    temperature=0.0,
    stream=False,
    system_content="",
    assistant_content="",
    user_content="",
    max_retries=1,
    retry_interval=60,
)
:

    """Generate response from an LLM."""
    retry_count = 0
    client = get_client(llm=llm)
    messages = [
        {"role": role, "content": content}
        for role, content in [
            ("system", system_content),
            ("assistant", assistant_content),
            ("user", user_content),
        ]
        if content
    ]
    while retry_count <= max_retries:
        try:
            chat_completion = client.chat.completions.create(
                model=llm,
                max_tokens=max_tokens,
                temperature=temperature,
                stream=stream,
                messages=messages,
            )
            return prepare_response(chat_completion, stream=stream)

        except Exception as e:
            print(f"Exception: {e}")
            time.sleep(retry_interval)  # default is per-minute rate limits
            retry_count += 1
    return ""

def response_stream(chat_completion):
    for chunk in chat_completion:
        content = chunk.choices[0].delta.content
        if content is not None:
            yield content

def prepare_response(chat_completion, stream):
    if stream:
        return response_stream(chat_completion)
    else:
        return chat_completion.choices[0].message.content

Finally, we generate a response:

最终,我们生成一个响应:

# Generating our response
query = "How to run agents?"
response = generate_response(
    llm="gpt-3.5-turbo",
    temperature=0.0,
    stream=True,
    system_content="Answer the query using the context provided. Be succinct.",
    user_content=f"query: {query}, context: {context_list}")
# Stream response
for content in response:
    print(content, end='', flush=True)

To make using our application even more convenient, we can simply adapt Ray's official documentation to implement our workflow within a single QueryAgent class, which bundles together and takes care of all of the steps we implemented above - retrieving embeddings, embedding the search query, performing vector search, processing the results, and querying the LLM to generate a response. Using this single class approach, we no longer need to sequentially call all of these functions, and can also include utility functions. (Specifically, Get_num_tokens encodes our text and gets the number of tokens, to calculate the length of the input. To maintain our standard 50:50 ratio to allocate space to each of input and generation, we use (text, max_context_length) to trim input text if it's too long.)

为了使我们的应用程序使用起来更加方便,我们可以简单地根据 Ray 的官方文档来实现我们的工作流程,将其整合到单个 QueryAgent 类中,这个类将我们上面实现的所有步骤——检索嵌入,对搜索查询进行嵌入,执行向量搜索,处理结果,以及查询大型语言模型(LLM)以生成响应——打包在一起并进行处理。采用这种单个类的方法,我们不再需要顺序调用所有这些函数,同时还可以包括实用功能函数。(具体来说,Get_num_tokens 函数对我们的文本进行编码并获取令牌的数量,以计算输入的长度。为了保持我们标准的 50:50 比例来分配空间给输入和生成,我们使用 (text, max_context_length) 来修剪过长的输入文本。)

import tiktoken

def get_num_tokens(text):
    enc = tiktoken.get_encoding("cl100k_base")
    return len(enc.encode(text))


def trim(text, max_context_length):
    enc = tiktoken.get_encoding("cl100k_base")
    return enc.decode(enc.encode(text)[:max_context_length])

class QueryAgent:
    def __init__(
        self,
        embedding_model_name="BAAI/bge-base-en-v1.5",
        llm="gpt-3.5-turbo",
        temperature=0.0,
        max_context_length=4096,
        system_content="",
        assistant_content="",
    )
:

        # Embedding model
        self.embedding_model = get_embedding_model(
            embedding_model_name=embedding_model_name,
            model_kwargs={"device""cuda"},
            encode_kwargs={"device""cuda""batch_size"100},
        )

        # LLM
        self.llm = llm
        self.temperature = temperature
        self.context_length = int(
            0.5 * max_context_length
        ) - get_num_tokens(  # 50% of total context reserved for input
            system_content + assistant_content
        )
        self.max_tokens = int(
            0.5 * max_context_length
        )  # max sampled output (the other 50% of total context)
        self.system_content = system_content
        self.assistant_content = assistant_content

    def __call__(
        self,
        query,
        num_chunks=5,
        stream=True,
    )
:

        # Get top_k context
        context_results = semantic_search(
            query=query, embedding_model=self.embedding_model, k=num_chunks
        )

        # Generate response
        document_ids = [item["id"for item in context_results]
        context = [item["text"for item in context_results]
        sources = [item["source"for item in context_results]
        user_content = f"query: {query}, context: {context}"
        answer = generate_response(
            llm=self.llm,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
            stream=stream,
            system_content=self.system_content,
            assistant_content=self.assistant_content,
            user_content=trim(user_content, self.context_length),
        )

        # Result
        result = {
            "question": query,
            "sources": sources,
            "document_ids": document_ids,
            "answer": answer,
            "llm": self.llm,
        }
        return result

To embed our query and retrieve relevant vectors, and then generate a response, we run our QueryAgent as follows:

为了嵌入我们的查询并检索相关向量,然后生成响应,我们如下运行我们的 QueryAgent:

import json 

query = "How to run an agent?"
system_content = "Answer the query using the context provided. Be succinct."
agent = QueryAgent(
    embedding_model_name="BAAI/bge-base-en-v1.5",
    llm="gpt-3.5-turbo",
    max_context_length=4096,
    system_content=system_content)
result = agent(query=query, stream=False)
print(json.dumps(result, indent=2))

Serving our application

部署我们的应用程序

Our application is now running! Our last productionizing step is to serve it. Ray's Ray Serve module makes this step very straightforward. We combine Ray Serve with FastAPI and pydantic. The @serve.deployment decorator lets us define how many replicas and compute resources we want to use, and Ray’s autoscaling will handle the rest. Two Ray Serve decorators are all we need to modify our FastAPI application for production.

我们的应用程序现在已经在运行了!我们最后一个生产化的步骤是对外提供服务。Ray 的 Ray Serve 模块使这个步骤变得非常简单。我们将 Ray Serve 与 FastAPI 和 pydantic 结合使用。@serve.deployment 装饰器允许我们定义想要使用的副本数量和计算资源,Ray 的自动扩展将处理其余的部分。为了将我们的 FastAPI 应用程序转变为生产环境,我们只需要两个 Ray Serve 装饰器。

import pickle
import requests
from typing import List

from fastapi import FastAPI
from pydantic import BaseModel
from ray import serve

# Initialize application
app = FastAPI()

class Query(BaseModel):
    query: str

class Response(BaseModel):
    llm: str
    question: str
    sources: List[str]
    response: str

@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 2, "num_gpus": 1})
@serve.ingress(app)
class RayAssistantDeployment:
    def __init__(self, embedding_model_name, embedding_dim, llm):
        
        # Query agent
        system_content = "Answer the query using the context provided. Be succinct. " \
            "Contexts are organized in a list of dictionaries [{'text': <context>}, {'text': <context>}, ...]. " \
            "Feel free to ignore any contexts in the list that don't seem relevant to the query. "
        self.gpt_agent = QueryAgent(
            embedding_model_name=embedding_model_name,
            llm="gpt-3.5-turbo",
            max_context_length=4096,
            system_content=system_content)

    @app.post("/query")
    def query(self, query: Query) -> Response:
        result = self.gpt_agent(
            query=query.query, 
            stream=False
            )
        return Response.parse_obj(result)

Now, we're ready to deploy our application:

现在,我们准备部署我们的应用程序:

# Deploying our application with Ray Serve
deployment = RayAssistantDeployment.bind(
    embedding_model_name="BAAI/bge-base-en-v1.5",
    embedding_dim=768,
    llm="gpt-3.5.-turbo")

serve.run(deployment, route_prefix="/")

Our FastAPI endpoint is capable of being queried like any other API, while Ray take care of the workload automatically:

我们的 FastAPI 端点可以像任何其他 API 那样被查询,同时 Ray 会自动处理工作负载:

# Performing inference
data = {"query""How to run an agent?"}
response = requests.post(
"https://127.0.0.1:8000/query", json=data
)

try:
  print(response.json())
except:
  print(response.text)

Wow! We've been on quite a journey. We gathered our data using Ray and some LangChain documentation, processed it by chunking, embedding, and indexing it, set up our retrieval and generation, and, finally, served our application using Ray Serve. Our tutorial has so far covered an example of how to develop scalably and economically - how to productionize from the very start of development.

确实,我们已经经历了一段不小的旅程。我们使用 Ray 和一些 LangChain 文档收集了我们的数据,通过分块、嵌入和索引处理数据,设置了我们的检索和生成系统,最后,使用 Ray Serve 对我们的应用程序进行了部署。到目前为止,我们的教程已经涵盖了如何可扩展且经济地开发的示例——即如何从开发的一开始就实现生产化。

Still, there is one last crucial step.

然而,还有一个至关重要的最后一步。

Production is only the start: maintenance

应用程序投入生产环境只是一个起点:维护

To fully productionize any application, you also need to maintain it. And maintaining your application is a continuous task.

要完全实现任何应用程序的生产化,您还需要对其进行维护。维护您的应用程序是一项持续的任务。

Maintenance involves regular assessment and improvement of your application. You may need to routinely update your dataset if your application relies on being current with real-world changes. And, of course, you should monitor application performance to prevent  degradation. For smoother operations, we recommend integrating your workflows with CI/CD pipelines.

维护包括对您的应用程序进行定期评估和改进。如果您的应用程序依赖于与现实世界变化保持同步,您可能需要定期更新您的数据集。当然,您应该监控应用程序性能以防止性能下降。为了更顺畅的操作,我们建议将您的工作流程与 CI/CD 管道集成。

Limitations and future discussion

局限性和未来讨论

Other critical aspects of scalably productionizing fall outside of the scope of this article, but will be explored in future articles, including:

  • Advanced Development Pre-training, finetuning, prompt engineering and other in-depth development techniques
  • Evaluation Randomness and qualitative metrics, and complex multi-part structure of RAG can make LLM evaluation difficult
  • Compliance Adhering to data privacy laws and regulations, especially when handling personal or sensitive information

本文未涵盖可扩展生产化的其他关键方面,但将在未来的文章中探讨,包括:

  • 高级开发预训练、微调、提示工程以及其他深入的开发技巧
  • 评估随机性和定性指标以及 RAG 复杂的多部分结构可能会使大型语言模型(LLM)的评估变得困难
  • 合规性遵守数据隐私法律和规定,尤其是在处理个人或敏感信息时

原文地址:https://hub.superlinked.com/scaling-rag-for-production#Erspl



53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询