微信扫码
与创始人交个朋友
我要投稿
最近,LlamaIndex 在其某个版本中引入了一个新功能,称为 Workflow,为 LLM 应用提供了事件驱动和逻辑解耦的能力。
在今天的文章中,我们将通过一个实际的迷你项目深入探讨这个功能,探索新内容和仍然不足之处。让我们开始吧。
越来越多的 LLM 应用程序正在转向智能代理架构,期望 LLM 通过调用不同的 API 或多次迭代调用来满足用户请求。
然而,这一转变带来了一个问题:随着代理应用程序进行更多的 API 调用,程序响应变得缓慢,代码逻辑变得更加复杂。
一个典型的例子是 ReActAgent,它涉及思考、行动、观察和最终答案等步骤,至少需要三次 LLM 调用和一次工具调用。如果需要循环,I/O 调用将会更多。
如上图所示,在传统编程模型中,所有 I/O 调用都是线性的;下一个任务必须等待上一个任务完成。
尽管主流 LLM 现在支持通过流输出生成结果,但在代理应用中,我们仍然需要等待 LLM 完成结果生成后才能返回或进入下一个阶段。
实际上,我们并不需要所有 I/O 调用按顺序进行;它们可以并发执行,如下图所示:
这个图看起来熟悉吗?是的,Python 的 asyncio
包提供了并发执行 I/O 绑定任务的能力,几乎所有基于 I/O 的 API,包括 LLM 客户端,都支持并发执行。
LlamaIndex 的工作流也利用了并发编程的原理。它更进一步,不仅封装了 asyncio
库的细节,还提供了一种事件机制,使我们能够解耦业务流程的不同部分。
现在我们了解了背景,让我们通过一个实际项目来逐步了解 LlamaIndex 工作流。
在主菜之前,让我们通过一个简单的代码示例来熟悉元素和基本原理,先来个开胃菜。
首先,我们需要导入必要的工具。Workflow 已经包含在最新版本的 LlamaIndex 中,无需单独安装。
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
Context,
step,
)
由于 Workflow 是一个事件驱动的框架,我们应该首先定义一些事件。
为了避免不一致,我们可以首先定义一个 BaseEvent
,确保所有事件使用关键字 payload
进行消息传递。
class BaseEvent(Event):
payload: str | dict | None
让我们定义今天的第一个事件:SecondStepEvent
class SecondStepEvent(BaseEvent):
...
接下来,让我们开始编写我们的第一个 Workflow 程序,它是 Workflow
的一个子类,包含两个方法:
class SimpleWorkflow(Workflow):
@step
async def start(self, ev: StartEvent) -> SecondStepEvent:
return SecondStepEvent(payload=ev.payload)
@step
async def second_step(self, ev: SecondStepEvent) -> StopEvent:
return StopEvent(result=ev.payload)
start
接受一个 StartEvent
,然后返回一个 SecondStepEvent
。second_step
接受一个 SecondStepEvent
,然后返回一个 StopEvent
。让我们运行代码,看看它是如何工作的。
s_wf = SimpleWorkflow(timeout=10, verbose=True)
result = await s_wf.run(payload="hello world")
print(result)
我们开启了 verbose
选项,以便详细查看代码的执行过程。
LlamaIndex 还慷慨地提供了一个小工具,让我们能够查看整个工作流程,非常直观。
from llama_index.utils.workflow import draw_all_possible_flows
draw_all_possible_flows(SimpleWorkflow, filename="simple_workflow.html")
快速查看源代码可以发现,Workflow 在内部维护一个 Context
,它不仅保持一个事件队列,还维护一个包含每个步骤的字典。
当 Workflow 被初始化时,step
装饰器分析每个方法的签名,以确定它将接收和返回哪些事件,开始监听事件队列,然后将该方法存储在 step
字典中。
当 Workflow 的 run
方法被启动时,它开始一个 runflow
循环,最初将一个 StartEvent
放入事件队列。如果有一个方法接受这个 StartEvent
,它开始执行并返回相应的事件,再将其放回事件队列。
step
方法还可以直接调用 Context 的 send_event
方法将事件放入队列。
如果 runflow 循环在队列中检测到 StopEvent
,它将退出流程并返回最终结果。
在对元素和实现原理有了基本了解后,我们现在可以通过一个实践项目来探索 Workflow 的优缺点。
在今天的实践项目中,我们将帮助超市的采购经理创建一个基于客户反馈的SKU库存管理系统,展示Workflow的分支和循环控制、流式事件以及并发执行功能。
在反馈监控的第一个版本中,我们将持续监控某个 SKU 的最新反馈,分析输入中隐含的反馈,然后采取相应的行动。
整个代码逻辑如下面的图所示:
首先,我们将定义一个 InventoryManager 类,使用 async
来实现 place_order 和 clear_out 方法。
class InventoryManager:
async def place_order(self, sku: str) -> None:
await asyncio.sleep(0.5)
print(f"Will place an order for {sku}")
async def clear_out(self, sku: str) -> None:
await asyncio.sleep(0.5)
print(f"Will clear out {sku}")
我们还需要实现四个事件:LoopEvent
、GetFeedbackEvent
、OrderEvent
和 ClearEvent
,它们都是 BaseEvent
的子类,确保它们遵循统一的消息传递接口。
class LoopEvent(BaseEvent):
...
class GetFeedbackEvent(BaseEvent):
...
class OrderEvent(BaseEvent):
...
class ClearEvent(BaseEvent):
...
接下来,我们开始实现 FeedbackMonitorWorkflow
类,其中包含核心业务逻辑。
class FeedbackMonitorWorkflow(Workflow):
def__init__(self, total_cycle: int = 1, *args, **kwargs) -> None:
self.total_cycle = total_cycle
self.counter = 0
self.manager = InventoryManager()
super().__init__(*args, **kwargs)
@step
asyncdefbegin(self, ev: StartEvent | LoopEvent) \
-> GetFeedbackEvent | StopEvent:
print("We now return to the begin step")
ifisinstance(ev, StartEvent):
self.sku = ev.payload
ifself.counter < self.total_cycle:
await asyncio.sleep(3)
self.counter += 1
return GetFeedbackEvent(payload=self.sku)
else:
return StopEvent(result="We're done for the day.")
@step
asyncdefget_feedback(self, ev: GetFeedbackEvent) -> OrderEvent | ClearEvent:
print(f"Wil get the latest feedback for {ev.payload}")
if random.random() < 0.3:
return ClearEvent(payload='Bad')
else:
return OrderEvent(payload='Good')
@step
asyncdeforder(self, ev: OrderEvent) -> LoopEvent:
print(f"We now buy some sku with feedback {ev.payload}.")
awaitself.manager.place_order(self.sku)
return LoopEvent(payload="Start a new cycle.")
@step
asyncdefclear(self, ev: ClearEvent) -> LoopEvent:
print(f"We now sell some sku with feedback {ev.payload}")
awaitself.manager.clear_out(self.sku)
return LoopEvent(payload="Start a new cycle.")
begin
方法是我们的入口点,接受 StartEvent
和 LoopEvent
。StartEvent
是启动代码的默认事件,我们通过此事件传递 SKU。GetFeedbackEvent
触发 get_feedback
方法以获取反馈信息。为了简单起见,我们使用 random
方法生成两个反馈,“Good”和“Bad”,然后根据反馈返回相应的 OrderEvent
或 ClearEvent
。LoopEvent
重新启动 begin
方法以进行新一轮循环。为了简化代码,我们仅设置一个循环。begin
方法返回一个 GetFeedbackEvent
以触发获取最新 SKU 反馈。如果所有循环完成,则返回 StopEvent
。OrderEvent
或 ClearEvent
时,相应的 step
方法根据消息体中的情感标志执行交易,并返回 LoopEvent
以启动新循环。如您所见,通过使用事件,我们可以解耦复杂的循环和分支过程,使相应的事件能够触发新的循环。
让我们使用 draw_all_possible_flows
工具来查看流程图是否与我们设计的业务逻辑图相匹配。
draw_all_possible_flows(FeedbackMonitorWorkflow, filename="feedback_monitor_workflow.html")
这就全部了吗?如果只是解耦循环和分支控制,我难道不能通过一些编程技巧来实现吗?
是的,但流控制只是最表面的层次。接下来,让我们体验将 asyncio
与 Workflow 结合所释放的强大潜力。
在构建代理链时,最让人头疼的问题之一就是如何在执行过程中向用户反馈消息,帮助他们理解代码执行的进展。
在上面的代码中,我们使用 print
方法在控制台实时打印进度,但这种方法对于网页应用程序来说并不可行。
一个解决方案是启动一个单独的管道以实时推送消息给用户,但当多个步骤并发执行时,如何处理这个管道就成了一个挑战。
幸运的是,Workflow 的上下文直接提供了一个消息流管道,我们可以方便地将消息写入这个管道,并通过 async for
循环在调用端统一处理它们。
让我们修改之前的交易程序:
class ProgressEvent(BaseEvent):
...
classFeedbackMonitorWorkflowV2(Workflow):
def__init__(self, total_cycle: int = 1, *args, **kwargs) -> None:
self.total_cycle = total_cycle
self.counter = 0
self.manager = InventoryManager()
super().__init__(*args, **kwargs)
@step
asyncdefbegin(self, ctx: Context,
ev: StartEvent | LoopEvent) \
-> GetFeedbackEvent | StopEvent:
ctx.write_event_to_stream(
ProgressEvent(payload="我们现在回到开始步骤")
)
...
@step
asyncdefget_feedback(self, ctx: Context,
ev: GetFeedbackEvent) -> OrderEvent | ClearEvent:
ctx.write_event_to_stream(
ProgressEvent(payload=f"将获取 {ev.payload} 的最新反馈")
)
...
@step
asyncdeforder(self, ctx: Context,
ev: OrderEvent) -> LoopEvent:
ctx.write_event_to_stream(
ProgressEvent(payload=f"我们现在购买一些 SKU,反馈为 {ev.payload}。")
)
...
@step
asyncdefclear(self, ctx: Context,
ev: ClearEvent) -> LoopEvent:
ctx.write_event_to_stream(
ProgressEvent(payload=f"我们现在出售一些 SKU,反馈为 {ev.payload}")
)
...
在第一步中,我们在 step
方法的签名中传递了一个 Context
类型的参数。这让 Workflow 知道将当前执行上下文传递给 step
方法。
然后,我们用 ctx.write_event_to_stream
方法替换了 print
方法,以实时将消息写入管道。
最后,在等待最终结果之前,我们使用 stream_events
方法遍历消息管道中的最新消息。
from datetime import datetime
def streaming_log(message: str) -> None:
current_time = datetime.now().strftime("%H:%M:%S")
print(f"{current_time} {message}")
feedback_monitor_v2 = FeedbackMonitorWorkflowV2(timeout=10, verbose=False)
handler = feedback_monitor_v2.run(payload="Apple")
async for event in handler.stream_events():
if isinstance(event , ProgressEvent):
streaming_log(event.payload)
final_result = await handler
print("最终结果: ", final_result)
正如文章开头提到的,对于 I/O 绑定的任务,我们可以使用 asyncio
包来实现代码的并发执行,从而大大提高运行效率。Workflow 为我们实现了这一机制,封装了 asyncio
执行代码,让我们专注于代码逻辑。
我们以 FeedbackMonitor
项目为例进行说明。
这次,我们将升级项目,使得 FeedbackMonitor
不仅通过一个来源来判断是好还是坏,而是同时通过在线、离线和机器学习趋势预测器来判断。
首先,我们添加六个事件:OnlineEvent
、OnlineFeedbackEvent
、OfflineEvent
、OfflineFeedbackEvent
、TrendingPredictionEvent
和 PredictionResultEvent
。
from collections import Counter
classOnlineEvent(BaseEvent):
...
classOnlineFeedbackEvent(BaseEvent):
...
classOfflineEvent(BaseEvent):
...
classOfflineFeedbackEvent(BaseEvent):
...
classTrendingPredictionEvent(BaseEvent):
...
classPredictionResultEvent(BaseEvent):
...
classTradeEvent(BaseEvent):
...
然后,我们编写一个 ComplexFeedbackMonitor
类作为新的 Workflow。
class ComplexFeedbackMonitor(Workflow):
def__init__(self, *args, **kwargs):
self.manager = InventoryManager()
super().__init__(*args, **kwargs)
@step
asyncdefstart(self, ctx: Context, ev: StartEvent) \
-> OnlineEvent | OfflineEvent | TrendingPredictionEvent:
self.sku = ev.payload
ctx.send_event(OnlineEvent(payload=ev.payload))
ctx.send_event(OfflineEvent(payload=ev.payload))
ctx.send_event(TrendingPredictionEvent(payload=ev.payload))
@step
asyncdefonline_feedback(self, ev: OnlineEvent) -> OnlineFeedbackEvent:
await asyncio.sleep(random.randint(1, 3))
if random.random() < 0.3:
return OnlineFeedbackEvent(payload='Bad')
else:
return OnlineFeedbackEvent(payload='Good')
@step
asyncdefoffline_feedback(self, ev: OfflineEvent) -> OfflineFeedbackEvent:
await asyncio.sleep(random.randint(1, 3))
if random.random() < 0.3:
return OfflineFeedbackEvent(payload='Bad')
else:
return OfflineFeedbackEvent(payload='Good')
@step
asyncdeftrending_predict(self, ev: TrendingPredictionEvent) -> PredictionResultEvent:
await asyncio.sleep(random.randint(1, 3))
if random.random() < 0.3:
return PredictionResultEvent(payload='Bad')
else:
return PredictionResultEvent(payload='Good')
@step
asyncdeftrading_decision(self, ctx: Context,
ev: OnlineFeedbackEvent | OfflineFeedbackEvent | PredictionResultEvent)\
-> TradeEvent:
results = ctx.collect_events(ev,
[OnlineFeedbackEvent, OfflineFeedbackEvent, PredictionResultEvent])
if results isnotNone:
voting = dict(Counter([ev.payload for ev in results]))
print(voting)
feedback = max(voting, key=voting.get)
return TradeEvent(payload=feedback)
@step
asyncdeftrade(self, ev: TradeEvent) -> StopEvent:
feedback = ev.payload
match feedback:
case'Goode':
awaitself.manager.place_order(self.sku)
case'Bad':
awaitself.manager.clear_out(self.sku)
case _:
print("Do nothing")
return StopEvent(result='We are done for the day.')
在 start
方法中,我们使用 ctx.send_event
同时发送 OnlineEvent
、OfflineEvent
和 TrendingPredictionEvent
。由于 Workflow 根据 step
方法的类型注解来确定抛出的消息,因此我们仍然需要标记返回消息的类型。
接下来,我们实现 online_feedback
、offline_feedback
和 trending_predict
方法,以获取交易信号并返回相应的事件。
我们仍然使用 random
方法来模拟客户反馈分析。
由于来自不同来源的内容需要不同的解析时间,我们希望在所有消息返回之前等待。在这一点上,我们可以在 trading_decision
方法中使用 ctx.collect_events
方法。
每当新的反馈事件返回时,trading_events
方法就会执行一次。
但是,ctx.collect_events
方法将我们需要等待的所有事件作为参数传递,并且其返回值在所有反馈事件返回之前保持为空。此时,返回值是三个反馈事件的列表。
我们可以使用 Counter
方法来计算“Good”和“Bad”出现的次数,然后取最多票的标记来做出交易决策。
最后,让我们使用 draw_all_possible_flows
工具来看看我们新设计的工作流有多酷:
draw_all_possible_flows(ComplexFeedbackMonitor, filename='complex_feedback_monitor.html')
接下来,让我们执行这个工作流并查看结果。
feedback_monitor = ComplexFeedbackMonitor(timeout=20, verbose=True)
result = await feedback_monitor.run(payload='Apple')
print(result)
我们可以观察到,从不同来源获取反馈的三个方法是同时触发的,但返回时间不同。
前两个返回的事件可以触发 trading_decision
方法,但不能继续触发 TradeEvent
。只有在所有三个事件返回并计算出最终交易决策后,才会触发 TradeEvent
。
正如你所看到的,借助 Workflow 的力量,我们确实可以使我们的代码架构既清晰又高效。
但不要过于乐观,因为在实践中经过一段时间后,我认为仍然存在一些不足之处。
如果你查看我们之前的代码,你会发现我们所有的代码逻辑都写在同一个工作流中,这对于简单的应用程序来说没问题,但对于复杂的现实应用程序来说却是灾难。
理想情况下,我们应该将不同的逻辑拆分成多个工作流,以保持“单一责任”原则的纯粹性。满足这一要求的官方解决方案是 嵌套工作流:
假设我们想将 FeedbackMonitor
中的交易订单逻辑拆分为一个独立的工作流。当我们需要下订单时,我们应该如何调用它?
官方解决方案是使用嵌套工作流,即在工作流 A 的 step
方法中将另一个工作流 B 作为参数传递。然后,在工作流 A 实例化后,添加工作流 B 的实例。如下所示的代码:
class OrderStation(Workflow):
def__init__(self, *args, **kwargs):
self.manager = InventoryManager()
super().__init__(*args, **kwargs)
@step
asyncdeftrade(self, ev: StartEvent) -> StopEvent:
print("We are now in a new workflow named OrderStation")
feedback = ev.feedback
match feedback:
case'Good':
awaitself.manager.place_order(ev.sku)
case'Bad':
awaitself.manager.clear_out(ev.sku)
return StopEvent(result="Done!")
classComplexFeedbackMonitorV2(ComplexFeedbackMonitor):
@step
asyncdeftrade(self, ev: TradeEvent, order_station: OrderStation) -> StopEvent:
feedback = ev.payload
await order_station.run(feedback=feedback, sku=self.sku)
return StopEvent(result='We are done for the day.')
feedback_monitor_v2 = ComplexFeedbackMonitorV2(timeout=20, verbose=False)
feedback_monitor_v2.add_workflows(
order_station=OrderStation(timeout=10, verbose=True)
)
result = await feedback_monitor_v2.run(payload='Apple')
print(result)
等等,如果你有 Java 开发经验,看到这段代码会不会感到惊讶:这不是依赖注入吗?
这确实与依赖注入相似,但不同之处在于,我们仍然需要在实例初始化后显式添加具体的工作流实例,因此仍然存在耦合,这是第一个问题。
我在编码过程中发现的另一个问题是,对于嵌套工作流,我只能通过 run
方法调用它们,而不能从外部工作流调用嵌套工作流中的相应 step
方法。
因此,这并不是工作流之间通信的好解决方案。
那么,是否有办法真正实现工作流之间的通信?我搜索了API文档,没有找到官方解决方案,我注意到这个问题也没有得到回答。因此,我决定自己尝试一下,看看能否解决它。
在再次审查源代码后,我认为ctx.send_event
方法有一些潜力,所以我首先想到的是,是否可以通过在两个工作流之间共享相同的上下文来解决这个问题?
我注意到实例化Context
需要传入一个workflow
实例,并且设置工作流自己的上下文可以通过在run
方法中传入来完成。
因此,代码如下,保持两个工作流不变,只有OrderStation
中的step
方法不再接受StartEvent
,而是接受特定的TradeEventV2
。
class TradeEventV2(Event):
feedback: str
sku: str
classOrderStation(Workflow):
def__init__(self, *args, **kwargs):
self.manager = InventoryManager()
super().__init__(*args, **kwargs)
@step
asyncdeftrade(self, ev: TradeEventV2) -> StopEvent:
print("We are now in a new workflow named OrderStation")
feedback = ev.feedback
match feedback:
case'Good':
awaitself.manager.place_order(ev.sku)
case'Bad':
awaitself.manager.clear_out(ev.sku)
return StopEvent(result="Done!")
classComplexFeedbackMonitorV3(ComplexFeedbackMonitor):
@step
asyncdeftrade(self, ctx: Context, ev: TradeEvent) -> StopEvent | TradeEventV2:
feedback = ev.payload
ctx.send_event(
TradeEventV2(feedback=feedback, sku=self.sku)
)
return StopEvent(result='We are done for the day.')
然后我使用OrderStation
创建一个上下文实例,并在运行方法执行期间将其传递给FeedbackMonitor
实例,果然,它抛出了一个错误:
feedback_monitor_v3 = ComplexFeedbackMonitorV3(timeout=20, verbose=False)
result = await feedback_monitor_v3.run(payload='Apple')
print(result)
似乎方法签名验证存在问题,让我们尝试关闭验证:
feedback_monitor_v3 = ComplexFeedbackMonitorV3(timeout=20, verbose=False, disable_validation=True)
order_station = OrderStation(timeout=10, verbose=True)
result = await feedback_monitor_v3.run(ctx=Context(workflow=order_station),
payload='Apple')
print(result)
仍然没有成功,看来这种方式行不通。
然后,我注意到文档提到了一种 Unbound 语法,似乎能够将每个步骤的逻辑与工作流解耦。示例代码如下:
class TestWorkflow(Workflow):
...
@step(workflow=TestWorkflow)
def some_step(ev: StartEvent) -> StopEvent:
return StopEvent()
虽然我们仍然只能在一个工作流内运行,但这让我感受到模块之间通信的可行性。
由于文章较长,我在这里不使用代码进行解释,让我给你展示一个如何使用 Unbound 语法进行模块通信的图示:
如图所示:首先,我们可以定义一个 Application
类作为工作流管道,同时定义所需的事件。
然后,每个项目团队可以编写自己的业务逻辑代码,并使用不同的 step
方法来监听和发送外部消息。
最后,我们可以在 fastapi API 中调用 Application
的 run
方法,调动各个模块完成任务。
通过这种方式,业务逻辑可以拆分为不同的模块进行开发,然后可以使用事件调用不同的 step
方法。
这确实达到了逻辑解耦的目的。然而,由于这种方法仅通过 add_step
方法在 step
装饰器中注册每个步骤到工作流中,它仍然没有实现工作流之间的真正通信。
LlamaIndex 的 Workflow 新功能使 RAG、LLM 生成和 I/O 调用的并行执行变得非常简单,而事件驱动架构也使程序能够与复杂的逻辑控制解耦。
在今天的文章中,我通过一个 FeedbackMonitor 项目展示了 Workflow 的几个特性。
在项目实践中,我们也发现 Workflow 在模块之间的通信方面仍然存在不足,并讨论了包括嵌套工作流和无绑定语法在内的不同解决方案。
最后,随着像 Langchain 和 AutoGen 这样的代理框架开始提出自己的事件驱动架构,我相信 Workflow 正在走上正确的道路,并将看到长期的发展。让我们保持关注。
53AI,企业落地应用大模型首选服务商
产品:大模型应用平台+智能体定制开发+落地咨询服务
承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-12-23
LlamaIndex工作流详解:提升数据处理效率的关键
2024-12-17
llamaindex实战-ChatEngine-Context(上下文)模式
2024-12-01
LlamaIndex,让AI唤醒你的数据
2024-11-29
llamaindex实战-Agent-自定义工具函数
2024-11-22
llamaindex实战-Agent-让Agent调用多个工具函数(本地部署)
2024-11-19
llamaindex实战-Workflow:工作流入门(本地部署断网运行)
2024-11-15
llamaindex实战-Agent-在Agent中使用RAG查询(本地部署)
2024-11-07
深度解析 REAcT Agent 的实现:利用 LlamaIndex 和 Gemini 提升智能代理工作流
2024-07-09
2024-04-20
2024-06-05
2024-04-25
2024-04-28
2024-05-09
2024-07-20
2024-04-26
2024-06-19
2024-04-08