AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


【Agent智能体指北】LlamaIndex 工作流:一种创建复杂 AI 应用程序的新方法
发布日期:2024-08-15 07:03:25 浏览次数: 1678



LlamaIndex 推出了一个的功能:工作流,这是一种在日益复杂的 AI 应用程序中协调动作的机制。

随着大型语言模型(LLMs)的出现,一种趋势已经变成了事实上的标准:AI 应用程序由不同组件实现的多个任务组成。市场上的开源框架致力于通过提供易于使用的基础组件抽象,如数据加载器、LLMs、向量数据库和重排器,甚至外部服务,来简化 AI 工程师的工作。同时,所有这些框架也在寻找最佳的抽象方式来协调这些组件,研究对于 AI 开发者来说,实现将复合 AI 系统集成在一起的逻辑,哪种方式最直观和高效。

两种潜在的协调模式是链(Chains)和管道(Pipelines),它们都是有向无环图(DAG)抽象的实现。我们在今年年初发布的查询管道中尝试了这种方法——它是一个声明式 API,允许您针对不同用例(如问答、结构化提取和代理自动化)对数据执行简单到高级的查询工作流。但是,当我们尝试在其基础上构建并尝试添加循环以更好地支持更复杂的工作流时,我们注意到了几个问题,这让我们反思为什么 DAG 可能不适合代理景观,以及我们可以在框架中引入哪些替代方案。

图形基础 UX 的局限性

DAG 的一个基本方面是 DAG 中的“A”,它们是无环的,也就是说没有循环。但在一个越来越代理化的世界中,AI 应用程序逻辑中无法执行循环是不可接受的。例如,如果一个组件提供了不良结果,AI 开发者应该有办法告诉系统自我纠正并重试。

即使没有向 DAG 添加循环和循环,查询管道也存在一些明显的问题:

  • 当出现问题时很难调试

  • 它们隐藏了组件和模块的执行方式

  • 我们的管道协调器变得越来越复杂,必须处理大量不同的边缘情况

  • 对于复杂的管道来说,它们很难阅读

一旦我们向查询管道添加了循环,这些围绕图形的开发人员 UX 问题就被放大了。我们在诸如以下领域亲身经历了开发人员的痛苦:

  • 许多核心协调逻辑,如 if-else 语句和 while 循环,被嵌入到图形的边缘中。定义这些边缘变得繁琐且冗长。

  • 处理可选和默认值的边缘情况变得困难。对于我们这样的框架来说,很难弄清楚一个参数是否会从上游节点传递过来。

  • 定义具有循环的图形并不总是对构建代理的开发人员感觉自然。在这里,图形 UX 强制“代理”节点明确定义了传入边缘和传出边缘,迫使用户与其它节点定义冗长的通信模式。

我们问:图形真的是我们可以用来协调复合 AI 系统中组件的唯一抽象吗?

从图形到 EDA:转向事件驱动

复合 AI 系统可以通过 LlamaIndex 工作流来实现。工作流通过称为 步骤 的 Python 函数集合来回分发事件。每个步骤可以看作是系统的一个组件:一个处理查询的,一个与 LLM 交谈的,一个从向量数据库加载数据等等。每个步骤接收一个或多个事件进行处理,并可以选择发送回事件,这些事件将根据需要被中继到其他组件。

转向事件驱动架构会导致设计上的根本转变。在许多图形实现中,图形遍历算法负责确定下一个应该运行的组件以及应该传递的数据。在事件驱动架构中,组件订阅某些类型的事件,最终负责根据收到的数据决定要做什么。

在事件驱动系统中,像输入的可选性和默认值这样的概念在组件级别上得到解决,大大简化了协调代码。

工作流入门

为了澄清这个想法,让我们看一个例子。一个最小的 LlamaIndex 工作流看起来像这样:

from llama_index.core.workflow import(StartEvent,StopEvent,Workflow,step,)
from llama_index.llms.openai import OpenAI
classOpenAIGenerator(Workflow):@step()asyncdefgenerate(self, ev: StartEvent) -> StopEvent:query = ev.get("query")llm = OpenAI()response =await llm.acomplete(query)return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)result =await w.run(query="What's LlamaIndex?")print(result)

generate 函数使用 @step 装饰器标记为工作流步骤,并声明它想要接收和发送回的事件类型,使用方法签名和适当的类型注释。为了运行工作流,我们创建 OpenAIGenerator 类的实例,传递一些配置参数,如所需的超时时间,然后调用 run 方法。传递给 run 的任何关键字参数将被打包成一个特殊的 StartEvent 类型的事件,该事件将被中继到请求它的步骤(在这种情况下,只有 generate 步骤)。generate 步骤返回一个特殊的 StopEvent 类型的事件,这将向工作流发出信号,优雅地停止其执行。StopEvent 携带我们想要作为工作流结果返回给调用者的任何数据,在这种情况下是 LLM 响应。

工作流循环

在事件驱动架构中,循环与通信有关,而不是拓扑结构。任何步骤都可以通过创建和发送适当的事件多次调用另一个步骤。让我们看一个自我纠正循环的例子:

classExtractionDone(Event):output: strpassage: str
classValidationErrorEvent(Event):error: strwrong_output: strpassage: str
classReflectionWorkflow(Workflow):@step()asyncdefextract(self, ev: StartEvent | ValidationErrorEvent) -> StopEvent | ExtractionDone:ifisinstance(ev, StartEvent):passage = ev.get("passage")ifnot passage:return StopEvent(result="Please provide some text in input")reflection_prompt =""elifisinstance(ev, ValidationErrorEvent):passage = ev.passagereflection_prompt = REFLECTION_PROMPT.format(wrong_answer=ev.wrong_output, error=ev.error)
llm = Ollama(model="llama3", request_timeout=30)prompt = EXTRACTION_PROMPT.format(passage=passage, schema=CarCollection.schema_json())if reflection_prompt:prompt += reflection_prompt
output =await llm.acomplete(prompt)
return ExtractionDone(output=str(output), passage=passage)
@step()asyncdefvalidate(self, ev: ExtractionDone) -> StopEvent | ValidationErrorEvent:try:json.loads(ev.output)except Exception as e:print("Validation failed, retrying...")return ValidationErrorEvent(error=str(e), wrong_output=ev.output, passage=ev.passage)
return StopEvent(result=ev.output)
w = ReflectionWorkflow(timeout=60, verbose=True)result =await w.run(passage="There are two cars available: a Fiat Panda with 45Hp and a Honda Civic with 330Hp.")print(result)

在这个例子中,validate 步骤接收尝试的模式提取结果作为事件,并可以决定通过返回 ValidationErrorEvent 再次尝试,最终将被传递到 extract 步骤,该步骤将执行另一次尝试。请注意,在这个例子中,如果这个提取/验证循环长时间持续提供不良结果,工作流可能会超时,但另一种策略可能是在精确尝试次数后放弃,仅举一个例子。

工作流持久化

工作流在执行期间保持全局状态,并且这个状态可以根据请求共享并传播到其步骤。这个共享状态实现为 Context 对象,并且可以被步骤用来在迭代之间存储数据,也可以作为不同步骤之间的通信形式。让我们看一个更复杂的 RAG 例子的摘录,展示如何使用全局上下文:

classRAGWorkflow(Workflow):@step(pass_context=True)asyncdefingest(self, ctx: Context, ev: StartEvent) -> Optional[StopEvent]:dataset_name = ev.get("dataset")_, documents = download_llama_dataset(dsname, "./data")ctx.data["INDEX"] = VectorStoreIndex.from_documents(documents=documents)return StopEvent(result=f"Indexed {len(documents)} documents.")


在这种情况下,ingest 步骤创建了一个索引,并希望在工作流执行期间使其对可能需要它的任何其他步骤都可用。在 LlamaIndex 工作流中以符合规范的方式进行此操作的方法是声明步骤需要全局上下文的实例(@step(pass_context=True) 可以完成这个操作)并将索引存储在上下文中本身,使用其他步骤可能稍后访问的预定义键。

自定义工作流

除了工作流,我们还将发布一系列预定义的工作流,以便最常用的用例可以用一行代码实现。使用这些预定义流程,用户可能仍然想要稍微更改预定义的工作流,以引入一些自定义行为,而不必从头开始重写整个工作流。假设您想要

自定义 RAG 工作流并使用自定义重新排名步骤,您所需要做的就是子类化一个假设的内置 RAGWorkflow 类并覆盖 rerank 步骤,如下所示:

classMyWorkflow(RAGWorkflow):@step(pass_context=True)defrerank(self, ctx: Context, ev: Union[RetrieverEvent, StartEvent]) -> Optional[QueryResult]:# 我的自定义重新排名逻辑在这里
w = MyWorkflow(timeout=60, verbose=True)result =await w.run(query="Who is Paul Graham?")

调试工作流

您的工作流的复杂性将随着应用程序逻辑的复杂性而增长,有时仅通过查看 Python 代码可能很难理解事件在执行期间将如何流动。为了便于理解复杂的工作流并支持工作流执行的调试,LlamaIndex 提供了两个功能:

  • draw_all_possible_flows 生成一个图片,显示工作流中的所有步骤以及事件可能的流动方式

  • draw_most_recent_execution 生成一个类似的图片,只显示上次工作流执行期间实际发送的事件

除此之外,工作流可以手动执行,通过多次调用 run_step() 直到所有步骤都完成。每次 run_step 调用后,可以检查工作流,检查任何中间结果或调试日志。

开启工作流之旅

尽管处于开发的早期阶段,LlamaIndex 工作流已经代表了与查询管道相比的向前迈出的一步,扩展了它们的功能并增加了更多的灵活性。除此之外,工作流带有一套您通常会期望从更成熟的软件中获得的功能:

  • 完全异步,支持流式传输

  • 默认情况下进行了仪器化,提供一键可观察性与支持的集成

  • 逐步执行,便于调试

  • 事件驱动依赖的验证和可视化

  • 事件实现为 pydantic 模型,以简化自定义和进一步开发新功能


53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询