AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


Multi-Agent实战:构建复杂的数据处理与可视化系统
发布日期:2024-09-18 18:12:23 浏览次数: 1545



在当今复杂的业务场景中,单一的人工智能模型(LLM,Large Language Model)往往难以应对多样化的数据处理与分析需求。为了提升系统的灵活性和效率,Multi-Agent系统应运而生。本文将以Langchain框架中的多LLM Agent系统为例,介绍如何构建一个处理用户请求、收集数据、生成图表并最终反馈结果的复杂系统。在本文中,我们将实现一个处理用户查询印度过去五年GDP数据的Multi-Agent系统。

Multi-Agent系统概述

Multi-Agent系统主要由以下几个关键部分组成:

  1. 研究员(Researcher Agent):负责搜集用户所需的数据。通过调用外部搜索引擎或数据库,收集并整理相关信息。

  2. 路由器(Router Agent):根据当前状态和消息内容,决定信息流向。它是系统的中枢,确保各Agent之间的无缝衔接。

  3. 图表生成器(Chart Generator Agent):负责将收集到的数据转换为可视化图表,以便于用户理解。

  4. 工具调用器(Call Tool Agent):用于执行各种外部工具或脚本,如Python脚本执行器,用于数据处理或生成图表。

  5. 状态管理器(Agent State):维护每个Agent的状态信息,包括消息记录、发送者等,以便实现跨Agent的上下文记忆。



我们选择了Langchain框架来构建这个系统,因为它提供了丰富的工具集和灵活的架构支持。此外,我们使用了ChatOpenAI的GPT-4o-mini模型作为底层语言模型,以支持智能体的自然语言理解和生成能力。

Multi-Agent工作流程

1、初始请求

用户通过系统接口提交一个请求,如“查询印度过去五年的GDP数据,并生成图表”。

2、数据收集

研究员Agent接收请求后,开始搜集相关数据。它可能通过调用搜索引擎API(如Tavily工具),检索到印度过去五年每年的GDP数据。收集到的数据以特定格式(如JSON)返回,并附加到全局状态中。

3、路由决策

路由器Agent根据当前状态(即全局状态中的消息列表)和最后一条消息的内容,决定下一步的行动。如果研究员Agent已经收集到足够的数据,并且没有直接生成图表的能力,路由器Agent会指示图表生成器Agent接管任务。

4、图表生成

图表生成器Agent接收到任务后,开始将收集到的数据转换为可视化图表。它可能会首先选择一种合适的图表类型(如折线图、柱状图等),然后使用Python的matplotlib库等工具生成图表。生成的图表将以图片形式保存,并附加到全局状态中。

5、工具调用与执行

在图表生成过程中,图表生成器Agent可能需要调用外部工具(如Python脚本执行器)来辅助生成图表。工具调用器Agent负责执行这些外部工具,并将执行结果返回给图表生成器Agent。

6、结果汇总与呈现

当图表生成完成后,图表生成器Agent会将图表和相关的GDP数据汇总成最终答案,并通过系统接口呈现给用户。最终答案可能包含图表图片、GDP数据表格、数据来源等详细信息。

Multi-Agent系统实现

可以参考 基于LangGraph构建LLM Agent

1、安装依赖

pip install -U langchain langchain_openai langsmith pandas langchain_experimental matplotlib langgraph langchain_core

2、创建代理

from langchain_core.messages import (BaseMessage,HumanMessage,ToolMessage,)from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import END, StateGraph, START

def create_agent(llm, tools, system_message: str):"""Create an agent."""prompt = ChatPromptTemplate.from_messages([("system","You are a helpful AI assistant, collaborating with other assistants."" Use the provided tools to progress towards answering the question."" If you are unable to fully answer, that's OK, another assistant with different tools "" will help where you left off. Execute what you can to make progress."" If you or any of the other assistants have the final answer or deliverable,"" prefix your response with FINAL ANSWER so the team knows to stop."" You have access to the following tools: {tool_names}.\n{system_message}",),MessagesPlaceholder(variable_name="messages"),])prompt = prompt.partial(system_message=system_message)prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))return prompt | llm.bind_tools(tools)

这是创建LLM Agent的通用函数,通过给定一组tools和对应agent的指定描述来创建对应的agent

3、添加工具

from typing import Annotated
from langchain_community.tools.tavily_search import TavilySearchResultsfrom langchain_core.tools import toolfrom langchain_experimental.utilities import PythonREPL
tavily_tool = TavilySearchResults(max_results=5)
# Warning: This executes code locally, which can be unsafe when not sandboxed
repl = PythonREPL()

@tooldef python_repl(code: Annotated[str, "The python code to execute to generate your chart."],):"""Use this to execute python code. If you want to see the output of a value,you should print it out with `print(...)`. This is visible to the user."""try:result = repl.run(code)except BaseException as e:return f"Failed to execute. Error: {repr(e)}"result_str = f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"return (result_str + "\n\nIf you have completed all tasks, respond with FINAL ANSWER.")

这里提供了2个tool,一个是用于搜索的tool,一个是tool运行时获取python函数并执行它

4、设置代理状态

我们这里提到的代理都是有状态的代理,有状态 LLM Agent会保留和更新交互中的上下文信息,从而动态适应其决策过程。这种架构有助于进行复杂的推理,支持顺序任务中的长期依赖关系。

import operatorfrom typing import Annotated, Sequence, TypedDict
from langchain_openai import ChatOpenAI

# This defines the object that is passed between each node# in the graph. We will create different nodes for each agent and toolclass AgentState(TypedDict):messages: Annotated[Sequence[BaseMessage], operator.add]sender: str

5、定义代理

import functools
from langchain_core.messages import AIMessage

# Helper function to create a node for a given agentdef agent_node(state, agent, name):result = agent.invoke(state)# We convert the agent output into a format that is suitable to append to the global stateif isinstance(result, ToolMessage):passelse:result = AIMessage(**result.dict(exclude={"type", "name"}), name=name)return {"messages": [result],# Since we have a strict workflow, we can# track the sender so we know who to pass to next."sender": name,}

llm = ChatOpenAI(model="gpt-4o-mini")
# Research agent and noderesearch_agent = create_agent(llm,[tavily_tool],system_message="You should provide accurate data for the chart_generator to use.",)research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")
# chart_generatorchart_agent = create_agent(llm,[python_repl],system_message="Any charts you display will be visible by the user.",)chart_node = functools.partial(agent_node, agent=chart_agent, name="chart_generator")

from langgraph.prebuilt import ToolNode
tools = [tavily_tool, python_repl]tool_node = ToolNode(tools)

6、agent循环执行

这里用来判断当前是结束调用还是继续agent循环调用tool

# Either agent can decide to endfrom typing import Literal

def router(state) -> Literal["call_tool", "__end__", "continue"]:# This is the routermessages = state["messages"]last_message = messages[-1]if last_message.tool_calls:# The previous agent is invoking a toolreturn "call_tool"if "FINAL ANSWER" in last_message.content:# Any agent decided the work is donereturn "__end__"return "continue"

7、定义完整图

workflow = StateGraph(AgentState)
workflow.add_node("Researcher", research_node)workflow.add_node("chart_generator", chart_node)workflow.add_node("call_tool", tool_node)
workflow.add_conditional_edges("Researcher",router,{"continue": "chart_generator", "call_tool": "call_tool", "__end__": END},)workflow.add_conditional_edges("chart_generator",router,{"continue": "Researcher", "call_tool": "call_tool", "__end__": END},)
workflow.add_conditional_edges("call_tool",# Each agent node updates the 'sender' field# the tool calling node does not, meaning# this edge will route back to the original agent# who invoked the toollambda x: x["sender"],{"Researcher": "Researcher","chart_generator": "chart_generator",},)workflow.add_edge(START, "Researcher")graph = workflow.compile()

8、开始执行

workflow.add_edge(START, "Researcher")graph = workflow.compile()
events = graph.stream({"messages": [HumanMessage(content="Fetch India's GDP for past 5 years and plot a chart of it")],},# Maximum number of steps to take in the graph{"recursion_limit": 150},)for s in events:print(s)print("----")



53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询