


发布日期:2024-03-30 08:36:53 浏览次数: 2053 来源:大数据架构与数据科学

Retrieval-augmented Generation (RAG) combines Large Language Models (LLMs) with external data to reduce the probability of machine hallucinations - AI-generated information that misrepresents underlying data or reality. When developing RAG systems, scalability is often an afterthought. This creates problems when moving from initial development to production. Having to manually adjust code while your application grows can get very costly and is prone to errors.

检索增强型生成(Retrieval-augmented Generation, RAG)结合了大型语言模型(Large Language Models, LLMs)与外部数据,以减少机器幻觉的可能性——即人工智能产生的、歪曲底层数据或现实的信息。在开发RAG系统时,可扩展性常常是事后考虑的问题。这在从初步开发迁移到生产环境时会造成问题。随着应用程序的增长,不得不手动调整代码可能会非常昂贵,并且容易出错。

Our tutorial provides an example of how you can develop a RAG pipeline with production workloads in mind from the start, using the right tools - ones that are designed to scale your application.


Development vs. production


The goals and requirements of development and production are usually very different. This is particularly true for new technologies like Large Language Models (LLMs) and Retrieval-augmented Generation (RAG), where organizations prioritize rapid experimentation to test the waters before committing more resources. Once important stakeholders are convinced, the focus shifts from demonstrating an application's potential for creating value to actually creating value, via production. Until a system is productionized, its ROI is typically zero.

开发和生产的目标和要求通常大相径庭。对于像大型语言模型(Large Language Models, LLMs)和检索增强型生成(Retrieval-augmented Generation, RAG)这样的新技术尤其如此,在这些技术上,组织通常会优先考虑快速的实验,以便在投入更多资源之前先测试水温。一旦重要的利益相关者被说服,焦点就会从展示应用程序创造价值的潜力转移到通过生产实际创造价值。在系统实现生产化之前,其投资回报率通常为零。

Productionizing, in the context of RAG systems, involves transitioning from a prototype or test environment to a stable, operational state, in which the system is readily accessible and reliable for remote end users, such as via URL - i.e., independent of the end user machine state. Productionizing also involves scaling the system to handle varying levels of user demand and traffic, ensuring consistent performance and availability.


Even though there is no ROI without productionizing, organizations often underesimate the hurdles involved in getting to an end product. Productionizing is always a trade-off between performance and costs, and this is no different for Retrieval-augmented Generation (RAG) systems. The goal is to achieve a stable, operational, and scalable end product while keeping costs low.


Let's look more closely at the basic requirements of an RAG system, before going in to the specifics of what you'll need to productionize it in a cost-effective but scalable way.


The basics of RAG


The most basic RAG workflow looks like this:

  • Submit a text query to an embedding model, which converts it into a semantically meaningful vector embedding.
  • Send the resulting query vector embedding to your document embeddings storage location - typically a vector database.
  • Retrieve the most relevant document chunks - based on proximity of document chunk embeddings to the query vector embedding.
  • Add the retrieved document chunks as context to the query vector embedding and send it to the LLM.
  • The LLM generates a response utilizing the retrieved context.


  • 将文本查询提交给嵌入模型,该模型将其转换为具有语义意义的向量嵌入。
  • 将得到的查询向量嵌入发送到您的文档嵌入存储位置——通常是一个向量数据库。
  • 检索最相关的文档片段——基于文档片段嵌入与查询向量嵌入之间的接近度。
  • 将检索到的文档片段作为上下文添加到查询向量嵌入中,然后发送给LLM。
  • LLM利用检索到的上下文生成响应。

While RAG workflows can become significantly more complex, incorporating methods like metadata filtering and retrieval reranking, all RAG systems must contain the components involved in the basic workflow: an embedding model, a store for document and vector embeddings, a retriever, and a LLM.

While RAG workflows can become significantly more complex, incorporating methods like metadata filtering and retrieval reranking, all RAG systems must contain the components involved in the basic workflow: an embedding model, a store for document and vector embeddings, a retriever, and a LLM.


But smart development, with productionization in mind, requires more than just setting up your components in a functional way. You must also develop with cost-effective scalability in mind. For this you'll need not just these basic components, but more specifically the tools appropriate to configuring a scalable RAG system.


Developing for scalability: the right tools


LLM library: LangChain


As of this writing, LangChain, while it has also been the subject of much criticism, is arguably the most prominent LLM library. A lot of developers turn to Langchain to build Proof-of-Concepts (PoCs) and Minimum Viable Products (MVPs), or simply to experiment with new ideas. Whether one chooses LangChain or one of the other major LLM and RAG libraries - for example, LlamaIndex or Haystack, to name our alternate personal favorites - they can all be used to productionize an RAG system. That is, all three have integrations for third-party libraries and providers that will handle production requirements. Which one you choose to interface with your other components depends on the details of your existing tech stack and use case.

截至目前,尽管LangChain也受到了不少批评,但可以说是最著名的大型语言模型(LLM)库。许多开发者转向LangChain来构建概念验证(Proof-of-Concepts,PoCs)和最小可行产品(Minimum Viable Products,MVPs),或者只是为了尝试新想法。无论是选择LangChain还是其他主要的LLM和RAG库——例如,我们其他个人喜欢的LlamaIndex或Haystack——它们都可以用来将RAG系统商用化。也就是说,所有这三个库都有集成第三方库和提供商的接口,这些接口能够处理生产环境的要求。您选择哪一个来与系统的其他组件交互,取决于您现有技术栈和用例的具体细节。

For the purpose of this tutorial, we'll use part of the Langchain documentation, along with Ray.





Scaling with Ray





  1. 分布式任务执行:Ray允许您将工作分配给集群中的不同节点,从而并行处理大量任务。这对于处理RAG系统中高并发的检索和生成任务至关重要。
  2. 动态资源管理:Ray可以动态调整资源分配,以适应不同的工作负载。例如,如果检索任务的数量激增,Ray可以自动启动更多的工作器节点来处理这些任务。
  3. 故障恢复:Ray自带故障恢复机制,如果某个节点出现问题,Ray可以在其他节点上重新执行失败的任务,从而确保系统的稳定性。
  4. 内存和对象管理:Ray优化了内存使用,减少了数据在节点之间传输的需要,这提高了处理速度并减少了延迟。
  5. 集成第三方库:Ray支持与多种流行的数据处理和机器学习库集成,如Pandas、Scikit-learn和PyTorch。


Because our goal is to build a 1) simple, 2) scalable, and 3) economically feasible option, not reliant on proprietary solutions, we have chosen to use Ray, a Python framework for productionizing and scaling machine learning (ML) workloads. Ray is designed with a range of auto-scaling features that seamlessly scale ML systems. It's also adaptable to both local environments and Kubernetes, efficiently managing all workload requirements.

因为我们的目标是构建一个1) 简单的,2) 可扩展的,以及3) 经济可行的选项,而且不依赖于专有解决方案,我们选择使用Ray,一个用于生产化和扩展机器学习(ML)工作负载的Python框架。Ray设计了一系列自动扩展功能,无缝扩展ML系统。它还能够适应本地环境和Kubernetes,有效管理所有工作负载的需求。

Ray permits us to keep our tutorial system simple, non-proprietary, and on our own network, rather than the cloud. While LangChain, LlamaIndex, and Haystack libraries support cloud deployment for AWS, Azure, and GCP, the details of cloud deployment heavily depend on - and are therefore very particular to - the specific cloud provider you choose. These libraries also all contain Ray integrations to enable scaling. But using Ray directly will provide us with more universally applicable insights, given that the Ray integrations within LangChain, LlamaIndex, and Haystack are built upon the same underlying framework.


Now that we have our LLM library sorted, let's turn to data gathering and processing.


Data gathering and processing



  1. 数据源识别:确定最相关的数据来源。这可能包括公开的数据集、通过API访问的内容、爬虫收集的网络数据或其他专有数据源。
  2. 数据抓取和存储:从识别的数据源收集数据,并将数据存储在适合后续处理的格式和系统中,例如数据库、数据湖或文件系统。
  3. 数据清洗:处理原始数据以去除无关内容、纠正错误和填充缺失值。这包括文本规范化、去除停用词和标点、语法修正等。
  4. 数据标注:如果需要,对数据进行标注以提供监督学习的标签。这可能包括实体识别、情感分析或其他相关任务。
  5. 特征提取:转换文本数据为机器学习模型可以理解的格式。这通常涉及到生成文本嵌入或其他形式的向量化表示。
  6. 数据分割:将数据划分为训练集、验证集和测试集,以评估模型性能并防止过拟合。
  7. 扩展预处理管道:使用Ray等工具,您可以构建一个可扩展的预处理管道,它可以并行处理大量数据,并确保数据在整个RAG系统中流动的效率。


Gathering the data

Every ML journey starts with data, and that data needs to be gathered and stored somewhere. For this tutorial, we gather data from part of the LangChain documentation. We first download the html files and then create a Ray dataset of them.


We start by installing all the dependencies that we'll use:


pip install ray langchain sentence-transformers qdrant-client einops openai tiktoken fastapi "ray[serve]"

We use the OpenAI API in this tutorial, so we'll need an API key. We export our API key as an environmental variable, and then initialize our Ray environment like this:


import os
import ray

working_dir = "downloaded_docs"

if not os.path.exists(working_dir):

# Setting up our Ray environment
    "env_vars": {
        "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"],
    "working_dir": str(working_dir)

To work with the LangChain documentation, we need to download the html files and process them. Scraping html files can get very tricky and the details depend heavily on the structure of the website you’re trying to scrape. The functions below are only meant to be used in the context of this tutorial.


import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import re

def sanitize_filename(filename):
    filename = re.sub(r'[\\/*?:"<>|]''', filename)  # Remove problematic characters
    filename = re.sub(r'[^\x00-\x7F]+''_', filename)  # Replace non-ASCII characters
    return filename

def is_valid(url, base_domain):
    parsed = urlparse(url)
    valid = bool(parsed.netloc) and parsed.path.startswith("/docs/expression_language/")
    return valid

def save_html(url, folder):
        headers = {'User-Agent''Mozilla/5.0'}
        response = requests.get(url, headers=headers)

        soup = BeautifulSoup(response.content, 'html.parser')
        title = soup.title.string if soup.title else os.path.basename(urlparse(url).path)
        sanitized_title = sanitize_filename(title)
        filename = os.path.join(folder, sanitized_title.replace(" ""_") + ".html")

        if not os.path.exists(filename):
            with open(filename, 'w', encoding='utf-8'as file:
            print(f"Saved: {filename}")

            links = [urljoin(url, link.get('href')) for link in soup.find_all('a'if link.get('href'and is_valid(urljoin(url, link.get('href')), base_domain)]
            return links
            return []
    except Exception as e:
        print(f"Error processing {url}{e}")
        return []

def download_all(start_url, folder, max_workers=5):
    visited = set()
    to_visit = {start_url}

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        while to_visit:
            future_to_url = {executor.submit(save_html, url, folder): url for url in to_visit}

            for future in as_completed(future_to_url):
                url = future_to_url[future]
                    new_links = future.result()
                    for link in new_links:
                        if link not in visited:
                except Exception as e:
                    print(f"Error with future for {url}{e}")

Because the LangChain documentation is very large, we'll download only a subset of it: LangChain's Expression Language (LCEL), which consists of 28 html pages.


base_domain = ""
start_url = ""
folder = working_dir

download_all(start_url, folder, max_workers=10)

Now that we've downloaded the files, we can use them to create our Ray dataset: 现在我们已经下载了文件,我们可以使用它们来创建我们的Ray数据集:

from pathlib import Path

# Ray dataset
document_dir = Path(folder)
ds =[{"path": path.absolute()} for path in document_dir.rglob("*.html"if not path.is_dir()])
print(f"{ds.count()} documents")

Great! But there's one more step left before we can move on to the next phase of our workflow. We need to extract the relevant text from our html files and clean up all the html syntax. For this, we import BeautifulSoup to parse the files and find relevant html tags.


from bs4 import BeautifulSoup, NavigableString

def extract_text_from_element(element):
    texts = []
    for elem in element.descendants:
        if isinstance(elem, NavigableString):
            text = elem.strip()
            if text:
    return "\n".join(texts)

def extract_main_content(record):
    with open(record["path"], "r", encoding="utf-8"as html_file:
        soup = BeautifulSoup(html_file, "html.parser")

    main_content = soup.find(['main''article'])  # Add any other tags or class_="some-class-name" here
    if main_content:
        text = extract_text_from_element(main_content)
        text = "No main content found."

    path = record["path"]
    return {"path": path, "text": text}

We can now use Ray's map() function to run this extraction process. Ray lets us run multiple processes in parallel.


# Extract content
content_ds =

Awesome! The results of the above extraction are our dataset. Because Ray datasets are optimized for scaled performance in production, they don't require us to make costly and error-prone adjustments to our code when our application grows.


Processing the data

To process our dataset, our next three steps are chunking, embedding, and indexing. 为了处理我们的数据集,接下来的三个步骤是分块(chunking)、嵌入(embedding)、和索引(indexing)。

  • Chunking: 分块是将文档划分为更小的部分,以便于处理和检索。例如,可以按段落、句子或固定的字符数来分块。分块有助于提高检索效率,并且可以帮助嵌入模型更好地处理和理解文本内容。

  • Embedding: 嵌入是指使用语言模型(例如BERT、GPT-3等)将文本转换成固定长度的向量。这些向量捕捉了文本的语义信息,并可以用于计算文档间的相似度或者进行机器学习任务。

  • Indexing: 索引是创建一个能够快速检索分块和嵌入的系统。这通常涉及到建立数据结构(如反向索引、KD树或者球树)以加快检索速度。当查询进来时,可以使用索引快速找到与查询最相关的文本块。



Chunking the data


Chunking - splitting your documents into multiple smaller parts - is necessary to make your data meet the LLM’s context length limits, and helps keep contexts specific enough to remain relevant. Chunks also need to not be too small. When chunks are too small, the information retrieved may become too narrow to provide adequate query responses. The optimal chunk size will depend on your data, the models you use, and your use case. We will use a common chunking value here, one that has been used in a lot of applications.

分块处理 - 将文档拆分成多个较小的部分 - 是必要的,以使数据满足大型语言模型(LLM)的上下文长度限制,并帮助保持上下文具体到足以保持相关性。同时,块也不应该太小。当块太小时,检索到的信息可能会变得过于狭窄,无法提供足够的查询响应。最佳的块大小将取决于您的数据、您使用的模型和您的用例。我们将在这里使用一个常见的分块值,这是在许多应用程序中使用过的值。

Let’s define our text splitting logic first, using a standard text splitter from LangChain:


from functools import partial
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Defining our text splitting function
def chunking(document, chunk_size, chunk_overlap):
    text_splitter = RecursiveCharacterTextSplitter(

    chunks = text_splitter.create_documents(
        metadatas=[{"path": document["path"]}])
    return [{"text": chunk.page_content, "path": chunk.metadata["path"]} for chunk in chunks]

Again, we utilize Ray's map() function to ensure scalability:


chunks_ds = content_ds.flat_map(partial(
print(f"{chunks_ds.count()} chunks")

Now that we've gathered and chunked our data scalably, we need to embed and index it, so that we can efficiently retrieve relevant answers to our queries.


Embedding the data


We use a pretrained model to create vector embeddings for both our data chunks and the query itself. By measuring the distance between the chunk embeddings and the query embedding, we can identify the most relevant, or "top-k," chunks. Of the various pretrained models, we'll use the popular 'bge-base-en-v1.5' model, which, at the time of writing this tutorial, ranks as the highest-performing model of its size on the MTEB Leaderboard. For convenience, we continue using LangChain:


from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from import ActorPoolStrategy

def get_embedding_model(embedding_model_name, model_kwargs, encode_kwargs):
    embedding_model = HuggingFaceEmbeddings(
    return embedding_model

This time, instead of map(), we want to use map_batches(), which requires defining a class object to perform a call on.


class EmbedChunks:
    def __init__(self, model_name):
        self.embedding_model = get_embedding_model(
    def __call__(self, batch):
        embeddings = self.embedding_model.embed_documents(batch["text"])
        return {"text": batch["text"], "path": batch["path"], "embeddings": embeddings}

# Embedding our chunks
embedding_model_name = "BAAI/bge-base-en-v1.5"
embedded_chunks = chunks_ds.map_batches(
    fn_constructor_kwargs={"model_name": embedding_model_name},

Indexing the data


Now that our chunks are embedded, we need to store them somewhere. For the sake of this tutorial, we'll utilize Qdrant’s new in-memory feature, which lets us experiment with our code rapidly without needing to set up a fully-fledged instance. However, for deployment in a production environment, you should rely on more robust and scalable solutions — hosted either within your own network or by a third-party provider. For example, to fully productionize, we would need to point to our Qdrant (or your preferred hosted vendor) instance instead of using it in-memory. Detailed guidance on self-hosted solutions, such as setting up a Kubernetes cluster, are beyond the scope of this tutorial.


from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams

# Initalizing a local client in-memory
client = QdrantClient(":memory:")

   vectors_config=VectorParams(size=embedding_size, distance=Distance.COSINE),

To perform the next processing step, storage, using Ray would require more than 2 CPU scores, making this tutorial incompatible with the free tier of Google Colab. Instead, then, we'll use pandas. Fortunately, Ray allows us to convert our dataset into a pandas DataFrame with a single line of code:

为了执行下一个处理步骤——存储——使用Ray将需要超过2个CPU分数,这会使得本教程与Google Colab的免费层不兼容。因此,我们将改用pandas。幸运的是,Ray允许我们用一行代码将我们的数据集转换为一个pandas DataFrame:

emb_chunks_df = embedded_chunks.to_pandas()

Now that our dataset is converted to pandas, we define and execute our data storage function:

一旦将数据集转换成pandas DataFrame,就可以定义并执行数据存储函数了。存储函数的具体实现将取决于你选择的存储目标和格式。以下是一个示例性的数据存储函数,它将DataFrame数据导出到一个CSV文件中:

from qdrant_client.models import PointStruct

def store_results(df, collection_name="documents", client=client):
   # Defining our data structure
    points = [
        # PointStruct is the data classs used in Qdrant
            id=hash(path),  # Unique ID for each point
                "text": text,
                "source": path
        for text, path, embedding in zip(df["text"], df["path"], df["embeddings"])
  # Adding our data points to the collection


This wraps up the data processing part! Our data is now stored in our vector database and ready to be retrieved.


Data retrieval

数据检索 数据检索是指通过一个或多个查询条件从存储系统中获取相关信息的过程。在我们的案例中,数据检索通常涉及到使用由预训练模型创建的向量嵌入来搜索最相关的数据块。

When you retrieve data from vector storage, it's important to use the same embedding model for your query that you used for your source data. Otherwise, vector comparison to surface relevant content may result in mismatched or non-nuanced results (due to semantic drift, loss of context, or inconsistent distance metrics).


import numpy as np 

# Embed query
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
query = "How to run agents?"
query_embedding = np.array(embedding_model.embed_query(query))

Recall from above that we measure the distance between the query embedding and chunk embeddings to identify the most relevant, or 'top-k' chunks. In Qdrant’s search, the 'limit' parameter is equivalent to 'k'. By default, the search uses cosine similarity as the metric, and retrieves from our database the 5 chunks closest to our query embedding:


hits =
    limit=5  # Return 5 closest points

context_list = [hit.payload["text"for hit in hits]
context = "\n".join(context_list)

We rewrite this as a function for later use: 我们将这个过程重写为一个函数,以便后续使用:

def semantic_search(query, embedding_model, k):
    query_embedding = np.array(embedding_model.embed_query(query))
    hits =
      limit=5  # Return 5 closest points

    context_list = [{"id":, "source": str(hit.payload["source"]), "text": hit.payload["text"]} for hit in hits]
    return context_list


We're now very close to being able to field queries and retrieve answers! We've set up everything we need to query our LLM at scale. But before querying the model for a response, we want to first inform the query with our data, by retrieving relevant context from our vector database and then adding it to the query.


To do this, we use a simplified version of the script provided in Ray's LLM repository. This version is adapted to our code and - to simplify and keep our focus on how to scale a basic RAG system - leaves out a bunch of advanced retrieval techniques, such as reranking and hybrid search. For our LLM, we use gpt-3.5-turbo, and query it via the OpenAI API.

为了实现这一点,我们使用了 Ray's LLM 仓库中提供的 脚本的简化版本。这个版本经过修改以适应我们的代码,并且为了简化和保持我们对如何扩展基本 RAG(检索增强生成)系统的关注,省略了一些高级检索技术,比如重排和混合搜索。对于我们的大型语言模型(LLM),我们使用了 gpt-3.5-turbo,并通过 OpenAI API 进行调用。

from openai import OpenAI

def get_client(llm):
  api_key = os.environ["OPENAI_API_KEY"]
  client = OpenAI(api_key=api_key)
  return client

def generate_response(

    """Generate response from an LLM."""
    retry_count = 0
    client = get_client(llm=llm)
    messages = [
        {"role": role, "content": content}
        for role, content in [
            ("system", system_content),
            ("assistant", assistant_content),
            ("user", user_content),
        if content
    while retry_count <= max_retries:
            chat_completion =
            return prepare_response(chat_completion, stream=stream)

        except Exception as e:
            print(f"Exception: {e}")
            time.sleep(retry_interval)  # default is per-minute rate limits
            retry_count += 1
    return ""

def response_stream(chat_completion):
    for chunk in chat_completion:
        content = chunk.choices[0].delta.content
        if content is not None:
            yield content

def prepare_response(chat_completion, stream):
    if stream:
        return response_stream(chat_completion)
        return chat_completion.choices[0].message.content

Finally, we generate a response:


# Generating our response
query = "How to run agents?"
response = generate_response(
    system_content="Answer the query using the context provided. Be succinct.",
    user_content=f"query: {query}, context: {context_list}")
# Stream response
for content in response:
    print(content, end='', flush=True)

To make using our application even more convenient, we can simply adapt Ray's official documentation to implement our workflow within a single QueryAgent class, which bundles together and takes care of all of the steps we implemented above - retrieving embeddings, embedding the search query, performing vector search, processing the results, and querying the LLM to generate a response. Using this single class approach, we no longer need to sequentially call all of these functions, and can also include utility functions. (Specifically, Get_num_tokens encodes our text and gets the number of tokens, to calculate the length of the input. To maintain our standard 50:50 ratio to allocate space to each of input and generation, we use (text, max_context_length) to trim input text if it's too long.)

为了使我们的应用程序使用起来更加方便,我们可以简单地根据 Ray 的官方文档来实现我们的工作流程,将其整合到单个 QueryAgent 类中,这个类将我们上面实现的所有步骤——检索嵌入,对搜索查询进行嵌入,执行向量搜索,处理结果,以及查询大型语言模型(LLM)以生成响应——打包在一起并进行处理。采用这种单个类的方法,我们不再需要顺序调用所有这些函数,同时还可以包括实用功能函数。(具体来说,Get_num_tokens 函数对我们的文本进行编码并获取令牌的数量,以计算输入的长度。为了保持我们标准的 50:50 比例来分配空间给输入和生成,我们使用 (text, max_context_length) 来修剪过长的输入文本。)

import tiktoken

def get_num_tokens(text):
    enc = tiktoken.get_encoding("cl100k_base")
    return len(enc.encode(text))

def trim(text, max_context_length):
    enc = tiktoken.get_encoding("cl100k_base")
    return enc.decode(enc.encode(text)[:max_context_length])

class QueryAgent:
    def __init__(

        # Embedding model
        self.embedding_model = get_embedding_model(

        # LLM
        self.llm = llm
        self.temperature = temperature
        self.context_length = int(
            0.5 * max_context_length
        ) - get_num_tokens(  # 50% of total context reserved for input
            system_content + assistant_content
        self.max_tokens = int(
            0.5 * max_context_length
        )  # max sampled output (the other 50% of total context)
        self.system_content = system_content
        self.assistant_content = assistant_content

    def __call__(

        # Get top_k context
        context_results = semantic_search(
            query=query, embedding_model=self.embedding_model, k=num_chunks

        # Generate response
        document_ids = [item["id"for item in context_results]
        context = [item["text"for item in context_results]
        sources = [item["source"for item in context_results]
        user_content = f"query: {query}, context: {context}"
        answer = generate_response(
            user_content=trim(user_content, self.context_length),

        # Result
        result = {
            "question": query,
            "sources": sources,
            "document_ids": document_ids,
            "answer": answer,
            "llm": self.llm,
        return result

To embed our query and retrieve relevant vectors, and then generate a response, we run our QueryAgent as follows:

为了嵌入我们的查询并检索相关向量,然后生成响应,我们如下运行我们的 QueryAgent:

import json 

query = "How to run an agent?"
system_content = "Answer the query using the context provided. Be succinct."
agent = QueryAgent(
result = agent(query=query, stream=False)
print(json.dumps(result, indent=2))

Serving our application


Our application is now running! Our last productionizing step is to serve it. Ray's Ray Serve module makes this step very straightforward. We combine Ray Serve with FastAPI and pydantic. The @serve.deployment decorator lets us define how many replicas and compute resources we want to use, and Ray’s autoscaling will handle the rest. Two Ray Serve decorators are all we need to modify our FastAPI application for production.

我们的应用程序现在已经在运行了!我们最后一个生产化的步骤是对外提供服务。Ray 的 Ray Serve 模块使这个步骤变得非常简单。我们将 Ray Serve 与 FastAPI 和 pydantic 结合使用。@serve.deployment 装饰器允许我们定义想要使用的副本数量和计算资源,Ray 的自动扩展将处理其余的部分。为了将我们的 FastAPI 应用程序转变为生产环境,我们只需要两个 Ray Serve 装饰器。

import pickle
import requests
from typing import List

from fastapi import FastAPI
from pydantic import BaseModel
from ray import serve

# Initialize application
app = FastAPI()

class Query(BaseModel):
    query: str

class Response(BaseModel):
    llm: str
    question: str
    sources: List[str]
    response: str

@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 2, "num_gpus": 1})
class RayAssistantDeployment:
    def __init__(self, embedding_model_name, embedding_dim, llm):
        # Query agent
        system_content = "Answer the query using the context provided. Be succinct. " \
            "Contexts are organized in a list of dictionaries [{'text': <context>}, {'text': <context>}, ...]. " \
            "Feel free to ignore any contexts in the list that don't seem relevant to the query. "
        self.gpt_agent = QueryAgent(
    def query(self, query: Query) -> Response:
        result = self.gpt_agent(
        return Response.parse_obj(result)

Now, we're ready to deploy our application:


# Deploying our application with Ray Serve
deployment = RayAssistantDeployment.bind(
    llm="gpt-3.5.-turbo"), route_prefix="/")

Our FastAPI endpoint is capable of being queried like any other API, while Ray take care of the workload automatically:

我们的 FastAPI 端点可以像任何其他 API 那样被查询,同时 Ray 会自动处理工作负载:

# Performing inference
data = {"query""How to run an agent?"}
response =
"", json=data


Wow! We've been on quite a journey. We gathered our data using Ray and some LangChain documentation, processed it by chunking, embedding, and indexing it, set up our retrieval and generation, and, finally, served our application using Ray Serve. Our tutorial has so far covered an example of how to develop scalably and economically - how to productionize from the very start of development.

确实,我们已经经历了一段不小的旅程。我们使用 Ray 和一些 LangChain 文档收集了我们的数据,通过分块、嵌入和索引处理数据,设置了我们的检索和生成系统,最后,使用 Ray Serve 对我们的应用程序进行了部署。到目前为止,我们的教程已经涵盖了如何可扩展且经济地开发的示例——即如何从开发的一开始就实现生产化。

Still, there is one last crucial step.


Production is only the start: maintenance


To fully productionize any application, you also need to maintain it. And maintaining your application is a continuous task.


Maintenance involves regular assessment and improvement of your application. You may need to routinely update your dataset if your application relies on being current with real-world changes. And, of course, you should monitor application performance to prevent  degradation. For smoother operations, we recommend integrating your workflows with CI/CD pipelines.

维护包括对您的应用程序进行定期评估和改进。如果您的应用程序依赖于与现实世界变化保持同步,您可能需要定期更新您的数据集。当然,您应该监控应用程序性能以防止性能下降。为了更顺畅的操作,我们建议将您的工作流程与 CI/CD 管道集成。

Limitations and future discussion


Other critical aspects of scalably productionizing fall outside of the scope of this article, but will be explored in future articles, including:

  • Advanced Development Pre-training, finetuning, prompt engineering and other in-depth development techniques
  • Evaluation Randomness and qualitative metrics, and complex multi-part structure of RAG can make LLM evaluation difficult
  • Compliance Adhering to data privacy laws and regulations, especially when handling personal or sensitive information


  • 高级开发预训练、微调、提示工程以及其他深入的开发技巧
  • 评估随机性和定性指标以及 RAG 复杂的多部分结构可能会使大型语言模型(LLM)的评估变得困难
  • 合规性遵守数据隐私法律和规定,尤其是在处理个人或敏感信息时






186 6662 7370
185 8882 0121



