微信扫码
与创始人交个朋友
我要投稿
async def send(
self,
message: AgentMessage,
recipient: Agent,
reviewer: Optional[Agent] = None,
request_reply: Optional[bool] = True,
is_recovery: Optional[bool] = False,
silent: Optional[bool] = False,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
) -> None:
"""Send a message to recipient agent.
Args:
message(AgentMessage): the message to be sent.
recipient(Agent): the recipient agent.
reviewer(Agent): the reviewer agent.
request_reply(bool): whether to request a reply.
is_recovery(bool): whether the message is a recovery message.
Returns:
None
"""
async def receive(
self,
message: AgentMessage,
sender: Agent,
reviewer: Optional[Agent] = None,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
is_recovery: Optional[bool] = False,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
) -> None:
"""Receive a message from another agent.
Args:
message(AgentMessage): the received message.
sender(Agent): the sender agent.
reviewer(Agent): the reviewer agent.
request_reply(bool): whether to request a reply.
silent(bool): whether to be silent.
is_recovery(bool): whether the message is a recovery message.
Returns:
None
"""
回答生成
async def generate_reply(
self,
received_message: AgentMessage,
sender: Agent,
reviewer: Optional[Agent] = None,
rely_messages: Optional[List[AgentMessage]] = None,
is_retry_chat: bool = False,
last_speaker_name: Optional[str] = None,
**kwargs,
) -> AgentMessage:
"""Generate a reply based on the received messages.
Args:
received_message(AgentMessage): the received message.
sender: sender of an Agent instance.
reviewer: reviewer of an Agent instance.
rely_messages: a list of messages received.
Returns:
AgentMessage: the generated reply. If None, no reply is generated.
"""
# 资源基础类
class Resource(ABC, Generic[P]):
"""Resource for the agent."""
# 数据库资源对象
class DBResource(Resource[P], Generic[P]):
#知识资源对象(将召回对象作为资源绑定)
class RetrieverResource(Resource[ResourceParameters]):
#知识库空间资源对象(将DBGPT的知识库空间作为资源对象)
class KnowledgeSpaceRetrieverResource(RetrieverResource):
# 资源包(将多个资源变成一个资源包的方式绑定引用)
class ResourcePack(Resource[PackResourceParameters]):
# 内置工具资源
class ToolPack(ResourcePack):
# 插件工具资源包,可加载Autogpt插件
class PluginToolPack(ToolPack):
class AutoGPTPluginToolPack(ToolPack):
# 内置工具定义和使用方法
def list_dbgpt_support_models(
model_type: Annotated[
str, Doc("The model type, LLM(Large Language Model) and EMBEDDING).")
] = "LLM",
) -> str:
...
def get_current_host_cpu_status() -> str:
...
description="Baidu search and return the results as a markdown string. Please set "
"number of results not less than 8 for rich search results.",
)
def baidu_search(
query: Annotated[str, Doc("The search query.")],
num_results: Annotated[int, Doc("The number of search results to return.")] = 8,
) -> str:
...
llm_client = OpenAILLMClient(model_alias="gpt-3.5-turbo")
context: AgentContext = AgentContext(conv_id="test456")
agent_memory = AgentMemory()
tools = ToolPack([simple_calculator, count_directory_files])
prompt_template: PromptTemplate = prompt_service.get_template(
prompt_code=record.prompt_template
)
await ToolAssistantAgent()
.bind(context) #agent 运行上下文 会话id、应用名、推理参数等
.bind(LLMConfig(llm_client=llm_client)) #当前agent使用的模型服务
.bind(agent_memory) # 绑定当前agent的记忆对象
.bind(prompt_template) # 绑定Agent的prompt 覆盖角色定义 暂时依赖Prompt模块,后续改造为面向API
.bind(tools)# 绑定当前agent要使用的资源
.build() #Agent准备检查和预加载等工作
# 默认短期记忆 默认使用 ShortTermMemory(buffer_size=5) 内存队列作为存储
agent_memory = AgentMemory(gpts_memory=self.memory)
# 短期记忆
class ShortTermMemory(Memory, Generic[T])
# 长期记忆
class LongTermMemory(Memory, Generic[T])
embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)
embedding_fn = embedding_factory.create(
model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
)
vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"
Just use chroma store now
vector_store_connector = VectorStoreConnector(
vector_store_type=CFG.VECTOR_STORE_TYPE,
vector_store_config=VectorStoreConfig(
name=vstore_name, embedding_fn=embedding_fn
),
)
memory = HybridMemory[AgentMemoryFragment].from_chroma(
vstore_name=vstore_name,
embeddings=embedding_fn,
)
# 感知记忆
class SensoryMemory(Memory, Generic[T])
# 混合记忆
class HybridMemory(Memory, Generic[T])
# 增强短期记忆
class EnhancedShortTermMemory(ShortTermMemory[T])
需要按会话id进行初始化和关闭:
self.memory.init({conv_id})
try:
# 这里开始一个Agent的对话
await user_proxy.initiate_chat(
recipient=tool_engineer,
reviewer=user_proxy,
message="Calculate the product of 10 and 99",
)
finally:
await self.memory.clear({conv_id}))
## 外部通过集体记忆对象的通道获取Agent的对话消息,支持流式输出
async def chat_messages(
self, conv_id: str, user_code: str = None, system_app: str = None,
):
while True:
queue = self.memory.queue(conv_id)
if not queue:
break
item = await queue.get()
if item == "[DONE]":
queue.task_done()
break
else:
yield item
await asyncio.sleep(0.005)
# 参考这个Aciton
class StartAppAction(Action[LinkAppInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
conv_id = kwargs.get("conv_id")
user_input = kwargs.get("user_input")
paren_agent = kwargs.get("paren_agent")
init_message_rounds = kwargs.get("init_message_rounds")
# TODO 这里放应用启动前的逻辑代码
from dbgpt.serve.agent.agents.controller import multi_agents
await multi_agents.agent_team_chat_new(
new_user_input if new_user_input else user_input,
conv_id,
gpts_app,
paren_agent.memory,
False,
link_sender=paren_agent,
app_link_start=True,
init_message_rounds=init_message_rounds,
)
return ActionOutput(
is_exe_success=True, content="", view=None, have_retry=False
)
# 参考这个Action
class LinkAppAction(Action[LinkAppInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
# TODO 这里根据模型输出解析出下一步要走到的Agent角色名称
role = "xxxx"
# 当前Agent返回时指定下一个发言者信息
return ActionOutput(
is_exe_success=True,
content=json.dumps(app_link_param, ensure_ascii=False),
view=await self.render_protocal.display(content=app_link_param),
next_speakers=[role],
)
self._render_protocol = VisChart()view = await self.render_protocol.display(chart=json.loads(model_to_json(param)), data_df=data_df)
# 基类 和接口
class LLMStrategy:
# 默认使用当前模型服的默认模型
async def next_llm(self, excluded_models: Optional[List[str]] = None):
## 优先级策略的模型选择策略实现
class LLMStrategyPriority(LLMStrategy):
# 按配置优先级进行选择和重试
async def next_llm(self, excluded_models: Optional[List[str]] = None) -> str:
"""Return next available llm model name."""
try:
if not excluded_models:
excluded_models = []
all_models = await self._llm_client.models()
if not self._context:
raise ValueError("No context provided for priority strategy!")
priority: List[str] = json.loads(self._context)
can_uses = self._excluded_models(all_models, excluded_models, priority)
if can_uses and len(can_uses) > 0:
return can_uses[0].model
else:
raise ValueError("No model service available!")
except Exception as e:
logger.error(f"{self.type} get next llm failed!{str(e)}")
raise ValueError(f"Failed to allocate model service,{str(e)}!")
class DataScientistAgent(ConversableAgent):
"""Data Scientist Agent."""
profile: ProfileConfig = ProfileConfig(
name=DynConfig(
"Edgar",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_name",
),
role=DynConfig(
"DataScientist",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_role",
),
goal=DynConfig(
"Use correct {{dialect}} SQL to analyze and resolve user "
"input targets based on the data structure information of the "
"database given in the resource.",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_goal",
),
constraints=DynConfig(
[
"Please ensure that the output is in the required format. "
"Please ensure that each analysis only outputs one analysis "
"result SQL, including as much analysis target content as possible.",
"If there is a recent message record, pay attention to refer to "
"the answers and execution results inside when analyzing, "
"and do not generate the same wrong answer.Please check carefully "
"to make sure the correct SQL is generated. Please strictly adhere "
"to the data structure definition given. The use of non-existing "
"fields is prohibited. Be careful not to confuse fields from "
"different tables, and you can perform multi-table related queries.",
"If the data and fields that need to be analyzed in the target are in "
"different tables, it is recommended to use multi-table correlation "
"queries first, and pay attention to the correlation between multiple "
"table structures.",
"It is prohibited to construct data yourself as query conditions. "
"Only the data values given by the famous songs in the input can "
"be used as query conditions.",
"Please select an appropriate one from the supported display methods "
"for data display. If no suitable display type is found, "
"use 'response_table' as default value. Supported display types: \n"
"{{ display_type }}",
],
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_constraints",
),
desc=DynConfig(
"Use database resources to conduct data analysis, analyze SQL, and provide "
"recommended rendering methods.",
category="agent",
key="dbgpt_agent_expand_dashboard_assistant_agent_profile_desc",
),
)
async def thinking(self,messages: List[AgentMessage],sender: Optional[Agent] = None,prompt: Optional[str] = None,) -> Tuple[Optional[str], Optional[str]]:
def get_or_build_agent_memory(self, conv_id: str, dbgpts_name: str) -> AgentMemory:
from dbgpt.agent.core.memory.hybrid import HybridMemory
from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG
from dbgpt.rag.embedding.embedding_factory import EmbeddingFactory
memory_key = f"{dbgpts_name}_{conv_id}"
if memory_key in self.agent_memory_map:
return self.agent_memory_map[memory_key]
# embedding_factory = EmbeddingFactory.get_instance(CFG.SYSTEM_APP)
# embedding_fn = embedding_factory.create(
# model_name=EMBEDDING_MODEL_CONFIG[CFG.EMBEDDING_MODEL]
# )
# vstore_name = f"_chroma_agent_memory_{dbgpts_name}_{conv_id}"
# Just use chroma store now
# vector_store_connector = VectorStoreConnector(
# vector_store_type=CFG.VECTOR_STORE_TYPE,
# vector_store_config=VectorStoreConfig(
# name=vstore_name, embedding_fn=embedding_fn
# ),
# )
# memory = HybridMemory[AgentMemoryFragment].from_chroma(
# vstore_name=vstore_name,
# embeddings=embedding_fn,
# )
agent_memory = AgentMemory(gpts_memory=self.memory)
self.agent_memory_map[memory_key] = agent_memory
return agent_memory
class SqlInput(BaseModel):
"""SQL input model."""
display_type: str = Field(
...,
description="The chart rendering method selected for SQL. If you don’t know "
"what to output, just output 'response_table' uniformly.",
)
sql: str = Field(
..., description="Executable sql generated for the current target/problem"
)
thought: str = Field(..., description="Summary of thoughts to the user")
class ChartAction(Action[SqlInput]):
"""Chart action class."""
def __init__(self, **kwargs):
"""Chart action init."""
super().__init__(**kwargs)
self._render_protocol = VisChart()
def out_model_type(self):
"""Return the output model type."""
return SqlInput
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
"""Perform the action."""
try:
param: SqlInput = self._input_convert(ai_message, SqlInput)
except Exception as e:
logger.exception(f"{str(e)}! \n {ai_message}")
return ActionOutput(
is_exe_success=False,
content="Error:The answer is not output in the required format.",
)
try:
if not self.resource_need:
raise ValueError("The resource type is not found!")
if not self.render_protocol:
raise ValueError("The rendering protocol is not initialized!")
db_resources: List[DBResource] = DBResource.from_resource(self.resource)
if not db_resources:
raise ValueError("The database resource is not found!")
db = db_resources[0]
data_df = await db.query_to_df(param.sql)
view = await self.render_protocol.display(
chart=json.loads(model_to_json(param)), data_df=data_df
)
return ActionOutput(
is_exe_success=True,
content=model_to_json(param),
view=view,
resource_type=self.resource_need.value,
resource_value=db._db_name,
)
except Exception as e:
logger.exception("Check your answers, the sql run failed!")
return ActionOutput(
is_exe_success=False,
content=f"Error:Check your answers, the sql run failed!Reason:{str(e)}",
)
class XXXAgent(ConversableAgent):
...
...
# 为Action准备的额外执行参数
def prepare_act_param(self, received_message: Optional[AgentMessage], sender: Agent,
rely_messages: Optional[List[AgentMessage]] = None,
**kwargs) -> Dict[str, Any]:
historical_dialogues = kwargs.get("historical_dialogues", None)
return {"user_input": received_message.content,
"conv_id": self.agent_context.conv_id,
"paren_agent": self,
"rely_messages": rely_messages,
"historical_dialogues": historical_dialogues,
}
# 资源加载方法,此处会默认会将资源包通过资源类的方法转成资源输入给LLMasync def load_resource(self, question: str, is_retry_chat: bool = False, **kwargs):logger.info(f"DomainApi load_resource:{question}")
class XXXAction(Action[xxInput]):
async def run(
self,
ai_message: str,
resource: Optional[AgentResource] = None,
rely_action_out: Optional[ActionOutput] = None,
need_vis_render: bool = True,
**kwargs,
) -> ActionOutput:
...
return ActionOutput(
is_exe_success=False, # 提示当前Agent进展失败
content=json.dumps(intent.to_dict(), ensure_ascii=False), # 问题内容
view=intent.ask_user if intent.ask_user else ai_message, # 问题展示效果(可以配合GptVis像用户发起类似动态表单的消息)
have_retry=False, # 并主动向用户发起提问
ask_user=True
)
manager = AutoPlanChatManager()manager = (await manager.bind(context).bind(agent_memory).bind(llm_config).build())manager.hire(employees)user_proxy: UserProxyAgent = (await UserProxyAgent().bind(context).bind(agent_memory).build())await user_proxy.initiate_chat(recipient=manager,message=user_query,is_retry_chat=is_retry_chat,last_speaker_name=last_speaker_name,message_rounds=init_message_rounds,**ext_info,)
class AutoPlanChatManager(ManagerAgent):"""A chat manager agent that can manage a team chat of multiple agents."""
class PlannerAgent(ConversableAgent):"""Planner Agent.
class AWELBaseManager(ManagerAgent, ABC):"""AWEL base manager."""
## Agent相关算子
### Agent Flow触发器,无实际逻辑,Flow的特性必须从触发器开始
class AgentDummyTrigger(Trigger):
### Agent算子容器,拥有一致的输入输出,可以实现Agent Flow的自由拼接
class AWELAgentOperator(
MixinLLMOperator, MapOperator[AgentGenerateContext, AgentGenerateContext]
):
## Agent Flow特性算子
### 实现Agent Flow分支的算子
class AgentBranchOperator(BranchOperator[AgentGenerateContext, AgentGenerateContext]):
### 实现Agent Flow分支合并的算子
class AgentBranchJoinOperator(BranchJoinOperator[AgentGenerateContext]):
# 实际Agent在Flow里的绑定节点(把Agent作为Agent算子容器的资源)
class AWELAgent(BaseModel):
# Agent的绑定资源,将Agent的绑定资源作为Agent资源节点的资源节点
### Agent资源
class AWELAgentResource(AgentResource):
"""AWEL Agent Resource."""
### Agent知识库资源
class AWELAgentKnowledgeResource(AgentResource):
### Agent的Prompt资源
class AgentPrompt(BaseModel):
### Agent的模型配置资源
class AWELAgentConfig(LLMConfig):
53AI,企业落地应用大模型首选服务商
产品:大模型应用平台+智能体定制开发+落地咨询服务
承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-11-23
人生搜索引擎免费用,开源版哈利波特“冥想盆”登GitHub热榜,支持中文
2024-11-23
o1圈杀疯了,阿里又开源Marco-o1
2024-11-22
Kotaemon:开源基于文档检索的聊天系统(RAG Chat)
2024-11-22
不可思议!AirLLM 如何让 70B 大模型在 4GB GPU 上顺利推理?
2024-11-22
刚刚,OpenAI公开o1模型测试方法,人机协作时代!
2024-11-21
22.4K+ Star!Chatbox:你的终极AI桌面助手
2024-11-21
Magentic-One:微软开源多智能体系统,让 AI 自己动手解决问题
2024-11-21
阿里发布Qwen2.5-Turbo,支持100万Tokens上下文!
2024-05-06
2024-07-25
2024-08-13
2024-06-12
2024-07-11
2024-06-16
2024-07-20
2024-06-15
2024-07-25
2024-07-25
2024-11-22
2024-11-19
2024-11-13
2024-11-13
2024-10-07
2024-09-22
2024-09-20
2024-09-14