AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


深入解析LlamaIndex Workflows【下篇】:实现ReAct模式AI智能体的新方法
发布日期:2024-10-11 21:22:25 浏览次数: 1787 来源:AI大模型应用实践


在本篇中,我们将继续学习如何基于Workflows来构建一个ReAct模式的AI智能体。尽管在LlamaIndex框架中已经提供了开箱即用的ReActAgent组件,但通过Workflows来从零构建ReAct智能体,可以更深入的了解ReAct智能体的内部原理,在未来帮助实现更底层、更灵活的控制能力。

01

ReAct Agent再回顾

很多人都对ReAct智能体有所了解,在LlamaIndex与LangChain框架中也都有现成的ReActAgent封装组件,可以开箱即用的构建ReAct模式的AI智能体

ReAct模式的AI智能体采用迭代式的推理(Reasoning)到行动(Acting)的工作流程,旨在应对更复杂的人工任务和问题。它通过将推理步骤与实际行动相结合,使得智体可以逐步理解任务、采取行动,并观察行动获得的新信息以推理后续步骤。过程大致如下:

  1. 推理:智能体会分析任务与环境、推理步骤、决定下一步行动

  2. 行动:调用外部工具,如搜索、执行计算、与外部API交互等

  3. 观察并循环:观察行动结果,推理后续步骤,调整策略,直至任务完成


ReAct模式具备很好的动态性,使得AI能够应对复杂和未知情况,适用于更开放性的问题和探索性的任务,展现出更高的自主决策智能。

ReAct Agent基本构成

02

设计ReAct Agent工作流

根据ReAct智能体的基本思想,其工作流中最核心的步骤(step)应该包括:

  1. 将输入问题(或任务)、已有对话、工具信息、已经获得的信息(即已调用工具的返回内容)等输入LLM,让LLM推理下一步动作

  2. 如果此时LLM可以回答,则直接给出答案,结束流程

  3. 如果此时LLM无法回答,则给出使用工具的信息(工具名、输入参数等)

  4. 如果需要使用工具,则根据第3步给出的信息进行工具调用,并获得返回

  5. 循环到第一步,进行迭代,直到在第2步能够完成任务;

基于LlamaIndex Workflows开发ReAct Agent的工作流程图如下:

现在可以参考这个工作流来实现ReAct智能体,这里基于官方的样例进行讲解。

03

基于Workflows实现ReAct Agent

【定义Event】

参考上面的工作流图,定义几个需要的Event类型:

from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event
import os

#通知事件
class PrepEvent(Event):
    pass

#LLM输入事件:包含输入LLM的历史消息
class InputEvent(Event):
    input: list[ChatMessage]

#工具调用事件:包含工具调用信息
class ToolCallEvent(Event):
    tool_calls: list[ToolSelection]

#工具输出事件:包含工具输出信息
class FunctionOutputEvent(Event):
    output: ToolOutput

【ReAct Agent初始化】

工作流初始化,主要是为了给智能体准备必备的“工具”,最重要的就是智能体需要的几大件:LLM大模型、Memory记忆、以及可以使用的Tools工具

from typing import Any, List
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import ActionReasoningStep,ObservationReasoningStep
from llama_index.core.llms.llm import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import Context,Workflow,StartEvent,StopEvent,step
from llama_index.llms.openai import OpenAI

class ReActAgent(Workflow):
    def __init__(
        self,
        *args: Any,
        llm: LLM | None = None,
        tools: list[BaseTool] | None = None,
        extra_context: str | None = None,
        **kwargs: Any,
    )
 -> None:

        super().__init__(*args, **kwargs)

        #可用的工具tools
        self.tools = tools or []

        #使用的LLM(大模型)
        self.llm = llm or OpenAI()

        #持久记忆
        self.memory = ChatMemoryBuffer.from_defaults(llm=llm)

        #用来把历史对话、已有的推理历史格式化成下一次LLM的输入消息历史
        self.formatter = ReActChatFormatter(context=extra_context or "")

        #解析LLM的输出(直接回答、使用工具、使用工具后回答)
        self.output_parser = ReActOutputParser()

        #保存工具调用输出
        self.sources = []

这里有两个辅助工具:

  • formatter:用来把保存在memory中的对话历史以及推理历史,格式化成LLM输入的消息格式(通常是一个包含role与content属性的对象列表);还要附加上引导LLM进行思考的系统指令。

  • out_parser:解析LLM输出的解析器。在ReAct模式下,LLM的输出可能是类似Thought...Action...Action Input...这样的推理结果,需要对这样的输出进行解析,以决定下一步是使用工具还是输出答案。


【用户输入消息处理:new_user_msg】

这是一次性的步骤,简单的把输入问题/任务放入memory即可:

    @step
    async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:

        """
        流程入口: 接受用户输入, 并放置到Memory中; 并触发下一步
        """


        self.sources = []

        user_input = ev.input
        user_msg = ChatMessage(role="user", content=user_input)
        self.memory.put(user_msg)

        await ctx.set("current_reasoning", [])
        return PrepEvent()

【LLM输入准备:prepare_chat_history

在这个步骤中,利用上面初始化的formatter,把对话历史与推理历史格式化,用来输入给LLM做推理。注意在一次任务中,这个步骤有可能会被多次循环调用,除非输入问题被LLM直接回答(无需借助工具)。

   @step
    async def prepare_chat_history(
        self, ctx: Context, ev: PrepEvent
    )
 -> InputEvent:

        
        """
        将对话与推理历史组装成LLM的输入消息列表(通常是角色+内容)。
        推理历史包括:
        1. LLM输出的推理结果(直接回答问题、需要工具调用、观察工具调用结果后可以回答)
        2. 工具调用的结果
        """


        #获取历史消息
        chat_history = self.memory.get()

        print(f'\n------------当前消息历史------------')
        for idx, message in enumerate(chat_history, start=1):
            print(f'\n{idx}. {message}')

        current_reasoning = await ctx.get("current_reasoning", default=[])
        print('\n-------------当前推理历史------------')
        for idx, reasoning in enumerate(current_reasoning, start=1):
            print(f'\n{idx}. {reasoning}')

        #将历史用户消息与推理历史组装成列表
        llm_input = self.formatter.format(
            self.tools, chat_history, current_reasoning=current_reasoning
        )

        return InputEvent(input=llm_input)

【LLM调用:handle_llm_input

使用上一步骤准备的输入内容,调用LLM,并解析结果。以决定下一步动作(返回不同的事件),具体可以参考下面的代码及注释:

  @step
    async def handle_llm_input(
        self, ctx: Context, ev: InputEvent
    )
 -> ToolCallEvent | StopEvent:

        
        """
        调用LLM;
        解析输出结果, 获得推理结果;
        判断是结束(可以回答问题), 还是需要调用工具;
        """


        chat_history = ev.input

        #调用LLM
        response = await self.llm.achat(chat_history)

        try:

            #解析输出的推理结果
            reasoning_step = self.output_parser.parse(response.message.content)
            (await ctx.get("current_reasoning", default=[])).append(
                reasoning_step
            )

            #如果已经结束:输出结果,流程结束(可立即回答,或者观察工具调用结果后可以回答)
            if reasoning_step.is_done:
                self.memory.put(
                    ChatMessage(
                        role="assistant", content=reasoning_step.response
                    )
                )
                return StopEvent(
                    result={
                        "response": reasoning_step.response,
                        "sources": [*self.sources],
                        "reasoning": await ctx.get(
                            "current_reasoning", default=[]
                        ),
                    }
                )
            
            #如果无法回答,需要调用工具
            elif isinstance(reasoning_step, ActionReasoningStep):
                tool_name = reasoning_step.action
                tool_args = reasoning_step.action_input
                return ToolCallEvent(
                    tool_calls=[
                        ToolSelection(
                            tool_id="",
                            tool_name=tool_name,
                            tool_kwargs=tool_args,
                        )
                    ]
                )
        except Exception as e:
            (await ctx.get("current_reasoning", default=[])).append(
                ObservationReasoningStep(
                    observation=f"There was an error in parsing my reasoning: {e}"
                )
            )

        # 其他情况则进行下一次迭代,继续尝试
        return PrepEvent()

【工具调用:handle_tool_calls

这是智能体使用外部工具(Tools)的关键步骤。根据上一步骤LLM输出的工具调用需求,调用外部工具(可能有多次调用),并把返回结果放在推理历史中,用于下一次迭代。

   @step
    async def handle_tool_calls(
        self, ctx: Context, ev: ToolCallEvent
    )
 -> PrepEvent:

        
        """
        工具调用,将调用结果作为LLM的观察对象;
        并将观察内容添加到推理历史
        """


        tool_calls = ev.tool_calls
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        # 工具调用
        for tool_call in tool_calls:
            tool = tools_by_name.get(tool_call.tool_name)
            if not tool:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Tool {tool_call.tool_name} does not exist"
                    )
                )
                continue

            try:

                #调用工具,并将工具调用结果作为观察对象,添加到推理历史
                tool_output = tool(**tool_call.tool_kwargs)
                self.sources.append(tool_output)
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(observation=tool_output.content)
                )
            except Exception as e:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
                    )
                )

        # 进入下一次迭代
        return PrepEvent()

【测试实现的ReAct Agent

现在,整个ReAct Agent的工作流就完成了,过程还是比较简单清晰的。当然这里也会利用到一些LlamaIndex提供的组件,比如用来封装LLM推理结果的xxxReasoningStep组件等。

我们来测试这个ReAct Agent组件,准备两个模拟工具(利用LlamaIndex中的FunctionTool快速构造基于函数的工具),然后创建一个Agent

from llama_index.core.tools import BaseTool, FunctionTool 

#模拟发送邮件
def send_email(subject: str, message: str, email: str) -> None:
    """用于发送电子邮件"""
    print(f"邮件已发送至 {email},主题为 {subject},内容为 {message}")

tool_send_mail = FunctionTool.from_defaults(fn=send_email,name='tool_send_mail',description='用于发送电子邮件')

#模拟客户查询
def query_customer(phone: str) -> str:
    """用于查询客户信息"""
    result = f"该客户信息为:\n姓名: 张三\n积分: 50000分\n邮件: test@gmail.com"
    return result

tool_customer = FunctionTool.from_defaults(fn=query_customer,name='tool_customer',description='用于查询客户信息,包括姓名、积分与邮件')

agent = ReActAgent(
    llm=OpenAI(model="gpt-4o-mini"), tools=[tool_send_mail,tool_customer], timeout=120, verbose=True
)

现在调用这个Agent,我们发出一个比较复杂的请求,来看看会发生什么:

async def main():
    ret = await agent.run(input="给客户13688888888发电子邮件,通知他最新的积分")
    print(ret["response"])

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

这里的任务很显然需要借助两次工具调用,一次查询客户信息,一次发送邮件,我们从输出中来观察最后一次的迭代信息:

注意到这里的推理历史,完整的反应了LLM的“思考”过程:首先需要查询客户信息(调用tool_customer);然后观察到客户信息(工具调用结果);判断需要发送邮件(调用tool_send_mail);最后观察返回结果后结束流程。这是一个完整的符合ReAct模式(推理-行动-观察)的工作流,也证明了这里基于Workflows构建的ReAct Agent的可用性。

04

结束语

至此我们对LlamaIndex所推出的新特性Workflows已经有了较为全面的认识,很显然,这是一个与LangChain的LangGraph相似的另一种智能体开发底层框架,两者都面向复杂的智能体/RAG应用工作流,但又采取了不同的设计思想,至于哪个更好或许是见仁见智的问题,但对于大量LLM应用的开发者来说,的确又多了一个强大的工具,相信随着后续的迭代,Workflows也会越来越强大。

END






53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询