微信扫码
与创始人交个朋友
我要投稿
近些年,大语言模型领域的高速发展,涌现出了众多优秀的产品,能够解决很多实际的业务场景,大幅提升工作效率。各公司都纷纷搭建起了自己的大模型应用平台,来统一管理各种大语言模型,构建更加复杂的应用,满足各部门的业务需求。
最初,我们选择了 dify 项目提供的工作流引擎功能来构建大模型应用平台,实现将多个大语言模型组合起来,形成更加复杂的流程和应用。
dify 是2023 年开源的一款集工作流引擎、大模型对话应用、知识库管理等多种功能于一体的 AI 大模型应用平台,截止目前(2024 年 11 月)在 github 平台上已经获得了 5.3 万 star,是目前大模型应用基础服务中最热门的项目之一。
dify 拥有非常美观的界面,尤其对于工作流引擎功能来说,dify 提供了通过简单的拖拽来编排工作流的功能,十分易于使用。
在我们实际的使用过程中,我们发现 dify 存在一些不足,无法满足我们对于工作流引擎的需求。
在实际业务中,最重要的诉求之一,是能够实现任务的等待和触发机制,比如我们经常需要在AI大模型完成文本的生成后,进入到人工审核的环节,只有当人工审核通过后,才能继续进行下一步的工作。
在很多业务流程中,我们还可能需要等待某个外部事件的发生,比如订单的支付成功、外卖的接单成功,或者直到某个特定的时间节点,才需要触发继续进行下一步的工作。
此外,dify 对于工作流中的错误处理能力也比较有限,在工作流中,任何一个节点执行失败,都会导致整个工作流执行失败,无法进行错误处理和恢复。
对于整个工作流来说,有些节点并非是必须的,这些非必须的节点在执行失败后,我们往往希望能够跳过这些节点,继续执行后续的节点,而不是让整个工作流执行失败。除此之外,我们也希望在任务执行失败后,能够自动发起几次重试,尝试让工作流自动恢复,而不是需要人工介入后,手动发起整个工作流流程的重新执行。
在我们最初使用 dify 搭建大模型应用平台时,dify 项目并不支持多任务分支的并行执行,在一个工作流中,只能一个任务接着一个任务地顺序执行。
尽管在 0.8.0 版本中,dify 项目已经支持了多任务分支的并行执行,但整个工作流的每次执行都被限制在单机环境下。这使得工作流的不同任务节点无法在不同的机器上并行执行,如果运行的工作流中具有的任务节点过多,有可能会造成分布式集群资源使用的不均衡,某些单机资源占用过多。
另一方面,dify 工作流中循环任务的每次迭代都是在整个循环任务中串行执行的,迭代之间不能并发,造成了工作流执行性能上的瓶颈。比如需要把一篇文章分段后分别让大模型对每段进行总结,如果用 dify 实现这一功能,那么每一段都必须依次执行,而理想情况显然是对大模型同时发起多次调用,让大模型同时处理每一个分段。
基于上述 dify 工作流引擎项目的不足,我们尝试对 dify 项目进行改造,来满足实际业务场景的需要。
dify 的网站前端项目是基于 Typescript 的Next.js 框架开发的,相应的项目代码位于web
目录下。Next.js 是用于构建全栈 Web 应用程序的 React 框架,它用来帮助用户快速构建交互式、动态的 React 应用程序。
[web/]
├── app // 布局、页面和组件
│ ├── (commonLayout) // 整个应用通用的布局
│ ├── (shareLayout) // 在特定会话中共享的布局
│ ├── activate // 激活页面
│ ├── components // 页面和布局共享的组件
│ ├── install // 安装页面
│ ├── signin // 登录页面
│ └── styles // 全局共享的样式
├── assets // 静态资源
├── bin // 构建步骤运行的脚本
├── config // 可调整的设置和选项
├── context // 应用中不同部分使用的共享上下文
├── dictionaries // 语言特定的翻译文件
├── docker // 容器配置
├── hooks // 可重用的钩子
├── i18n // 国际化配置
├── models // 描述数据模型和 API 响应的形状
├── public // 如 favicon 等元资源
├── service // 定义 API 操作的形状
├── test
├── types // 函数参数和返回值的描述
└── utils // 共享的实用函数
dify 的服务端项目是基于 python 语言以及 Flask 框架开发的,项目代码被拆分为 controller、service、model、core 等几个部分,并放置在api
目录下的相应子目录中。
[api/]
├── constants // 用于整个代码库的常量设置。
├── controllers // API 路由定义和请求处理逻辑。
├── core // 核心应用编排、模型集成和工具。
├── docker // Docker 和容器化相关配置。
├── events // 事件处理和处理。
├── extensions // 与第三方框架/平台的扩展。
├── fields // 用于序列化/封装的字段定义。
├── libs // 可重用的库和助手。
├── migrations // 数据库迁移脚本。
├── models // 数据库模型和架构定义。
├── services // 指定业务逻辑。
├── storage // 私钥存储。
├── tasks // 异步任务和后台作业的处理。
└── tests
服务端对外提供的 api 接口的外层实现位于api/controllers
目录下,而 dify 实现的各模块的核心代码则位于api/core
目录下。除此以外,在项目运行的过程中,还会不断地触发不同类型的事件,这些事件类型的定义与管理就位于api/events
目录下;而 dify 对诸如登录、邮件发送、存储调用、任务分发队列等第三方组件的依赖和封装,则位于api/extensions
目录下。
按照具体的功能,在 api/core
目录中,不同的组件又被拆分到对应的子目录中:
[api/core/]
├── agent
├── app
├── callback_handler
├── embedding
├── entities
├── errors
├── extension
├── external_data_tool
├── file
├── helper
├── llm_generator
├── memory
├── model_runtime
├── moderation
├── ops
├── prompt
├── rag
├── tools
└── workflow
dify 的工作流引擎模块的实现代码位于api/core/workflow
目录下,其调用接口的入口则位于api/controllers/web/workflow.py
文件中。整个工作流执行的发起,就是调用workflow.py
中声明和定义的 /workflows/run
接口触发的。
当/workflows/run
接口被调用后,dify 服务端执行了以下步骤来完成整个工作流的处理:
WorkflowRunApi.post
方法,使用reqparse.RequestParser
完成对请求体与参数的解析工作;api/services/app_generate_service.py
中的generate
方法,对传入的app_model
参数进行匹配,根据模式选择具体的应用实现(比如工作流、对话等),从而调用具体应用的generate
方法。api/core/app/workflow/app_generator.py
中定义的generate
方法来生成工作流应用的执行对象并执行相应代码逻辑。在工作流应用的generate
方法中,dify 服务端又执行了以下流程:
WorkflowAppGenerator.generate
方法中,首先进行了参数的解析,并通过WorkflowAppConfigManager
获取到应用的配置,并拼装为工作流执行所需的WorkflowAppGenerateEntity
对象。WorkflowAppGenerator._generate
方法,dify 创建并初始化用于通信的WorkflowAppQueueManager
对象,并创建了工作线程,执行WorkflowAppGenerator._generate_worker
方法。WorkflowAppGenerator._generate_worker
方法中。在_generate_worker
中,dify 创建了WorkflowAppRunner
实例,来执行工作流。在WorkflowAppRunner
的run
方法中,dify 实现了工作流的执行:
VariablePool
,并调用_init_graph
方法初始化工作流 DAG 图完成了工作流的初始化,通过传入上述工作流相关信息,创建WorkflowEntry
实例,并调用WorkflowEntry
实例的run
方法,开始运行整个工作流。GraphEngine
的run
方法,将整个 DAG 图切分为若干 item 进行迭代处理,流式生成器AnswerStreamProcessor
的实例不断生成 item 并执行,触发不同事件的产生。WorkflowBasedAppRunner
的_handle_event
方法,将事件发布到队列中,实现事件的响应处理。当事件被发布后,监听事件的AppQueueManager
通过 yield message,完成消息的生成,最外层的WorkflowRunApi
就可以通过流式通信的方式,将事件消息发送给客户端。通过上述代码流程分析可知,dify 采用多线程模式处理并发请求。dify 服务端针对工作流的每次执行,都是一次基于Server-Sent Events 协议的 HTTP 请求,客户端通过调用 dify 服务端提供的/workflows/run
接口与 dify 服务端建立长连接后,dify 服务端会持续不断地将工作流执行过程中产生的状态变化以及数据消息作为一个个事件,通过流式通信的方式返回给客户端。
这样的流式通信机制,是 dify 服务端与客户端之间通信的核心机制,在这样的设计之下,让 dify 构建起来的应用与其他 AI 大语言模型的交互方式完全相同,从而能够让用户将一个工作流流程的执行或是对话流程的执行,都被封装为一个 AI 应用的执行,而无需关心其背后的具体实现。
但这样的设计,也带来了上述我们提到的不足,由于工作流引擎的每次执行,都需要与服务端建立一个长连接,因此,整个工作流的每一次完整执行,都必须在同一个服务器上完成,无法让工作流的不同任务节点分布在不同的服务器上并行执行。如果想要在这套机制之下实现任务的等待和触发,就需要让客户端与服务端保持的长连接一直处于打开状态,直到任务被触发并执行完成,这样长时间的长连接在生产环境中是不可靠的,同时也会造成资源的浪费,在并发量较大的情况下,服务端的压力会非常大,因此在生产环境中也是不可接受的。
从本质上,dify 的设计目的是将其实现的各种功能都封装成与 AI 大语言模型相同的交互模式,而我们在大模型应用平台的开发和设计中,对于工作流引擎的诉求则是高吞吐和高可用,因此,如果我们要实现工作流中任务的等待和触发、让不同的任务在分布式环境中不同的机器上并发执行等功能,就需要对 dify 项目的工作流引擎的基础设计与实现机制,以及通信模式进行改造,这样的改造工作量无疑是巨大的,相当于对 dify 项目进行了整体的重构,特别是考虑到 dify 当前正处于快速迭代期,版本迭代非常频繁,在版本之间,dify 本身代码的变更本就非常庞大,随着 dify 本身的版本迭代,我们的改造工作也会面临越来越大的合并难度,后续的维护成本也只会越来越高,最终必将面临要么放弃同步 dify 开源版本的升级,要么放弃改造 dify 项目这两条路。
在放弃改造 dify 项目后,我们选择了将 dify 的执行部分对接到开源工作流引擎 rill-flow 的方案。
rill-flow 是新浪微博开源的高性能、可扩展的分布式流程编排服务,核心设计目标是成为一款易用性、高并发、低延时的工作流引擎项目,是云原生分布式场景下解决复杂流程编排、大流量任务执行性能、AIGC应用快速集成的优秀方案。并且已经在微博得到大规模应用,日处理任务量达千万级,支撑了微博多个业务的核心流程。
rill-flow 的服务端是通过 java 语言与 SpringBoot 框架实现的工作流引擎项目,它的代码较为简洁清晰,被划分为若干 java 模块相互依赖。
[/]
├── rill-flow-common // 项目通用类型、异常、常量等
├── rill-flow-dag // 流程图调度器的实现
│ ├── olympicene-core // 调度器所依赖的基础类型及工具类
│ ├── olympicene-ddl // DAG 图的解析与转换
│ ├── olympicene-spring-boot-starter // SpringBoot的bean启动类
│ ├── olympicene-storage // DAG 图及相关存储
│ └── olympicene-traversal // 调度器与派发器的代码实现
├── rill-flow-service // web api 所需调用的接口定义
├── rill-flow-impl // web api 所需调用的接口实现
├── rill-flow-interfaces // 插件需要实现的统一接口定义
├── rill-flow-plugins // 派发器插件的具体实现
│ ├── aliyun-ai-plugin
│ └── chatgpt-plugin
├── rill-flow-trigger // 触发器实现
├── rill-flow-web // web api 入口
├── rill-flow-ui // web 前端代码
└── flow-graph // 核心功能微应用
rill-flow 还支持用户通过开源的PF4J 项目协议扩展自定义的派发器作为插件集成到 rill-flow 的执行环境中,从而让用户能够通过自定义的实现,完成各类协议任务的派发工作。
从项目架构来看,rill-flow 分为触发器、调度器、派发器、执行器等几个模块:
相比于 dify 项目的流式阻塞式通信模式,rill-flow 采用的异步非阻塞的架构设计能够更加高效地利用资源,客户端不需要和服务端保持长连接,而是通过异步查询的方式来获取工作流的执行结果。并且,rill-flow 本身已经提供了任务的阻塞与唤醒、并发调度、错误处理等功能,能够很好的满足我们的实际业务需要,因此我们最终决定以 rill-flow 项目作为大模型应用平台的工作流引擎核心组件。
另外,rill-flow 作为一个云原生应用,它被设计成可以直接被部署在分布式集群环境中,每个任务节点都被自动调度到集群中的空闲节点上进行执行,可以最大限度地利用分布式集群带来的并发性能提升。
基于上述考量,我们最终决定将工作流引擎组件从 dify 项目更换为 rill-flow 项目,但由于我们的大模型应用平台已经基于 dify 项目运行了一段时间,在 dify 的基础上已经构建出了很多实际的业务流程,并且,操作人员也已经充分适应了 dify 的界面和操作方式,如果直接切换到 rill-flow,无疑将大大增加学习成本,并且需要对所有现有的工作流进行重新构建,这将带来不容忽视的巨大工作量。于是我们开始考虑,是否可以有一种方案,能够让我们的大模型应用平台在平滑过渡到 rill-flow 的同时,最大限度地减少对现有平台用户的影响。
最终,我们开发了一个 DSL 转换器,能够将 dify 的 DSL 描述文件转换为 rill-flow 的 DSL 描述,从而用户在大模型应用平台上能够通过 dify 的界面拖拽配置工作流描述,却在后台实际通过 rill-flow 来调度执行,这样一来,就实现了大模型应用平台无缝切换到 rill-flow 上的目标。通过 dify 界面上的“导出 DSL”选项,或者 dify 提供的 DSL 查询接口,我们都可以获取到某个工作流的描述文件。
DSL 转换器通过将 dify 导出的 DSL 描述转换为 rill-flow 项目支持的 DSL 描述,并导入到 rill-flow 服务中,实现让 rill-flow 调度派发,实现将业界先进的交互体验与高性能的工作流引擎结合的目的。
同时,dify 还提供了知识库、大模型的配置和使用等丰富的功能,在转换器改造过程中,我们仍然希望能够充分利用 dify 本身的任务执行能力。因此我们设计了新的架构,分为两个主要的流程改造:
在用户完成工作流的编辑后,大模型应用平台自动触发调用 DSL 转换服务,通过调用 dify 服务端应用的 DSL 导出接口,获取 dify 工作流的 DSL 描述,并执行格式转换工作,完成从 dify 的 DSL 描述转换为 rill-flow 项目所需的 DSL 描述的功能。
当用户触发任务的执行时,大模型应用平台网关通过调用 rill-flow 引擎的 API 接口,触发工作流的调度,基于 DSL 描述,最终,任务会通过调用 dify 后端服务的任务执行接口,完成任务的执行。
dify 与 rill-flow 一样,都使用 yaml 格式的 DSL 描述,二者的 DSL 描述都非常易于理解。与 rill-flow 不同,dify 的 DSL 中除了描述节点之间的依赖关系以及各节点的参数外,还描述了节点在界面上展示所需的位置信息。而 rill-flow 的 DSL 描述则较为纯粹,只包含了节点之间的依赖关系、输入与输出信息,以及可选的各节点的异常处理等信息,因此,将 dify 的 DSL 描述转换成 rill-flow 的 DSL 描述是相对比较简单的。
例如,下面是一个 dify 工作流的 DSL 描述:
app:
description:''
icon:?
icon_background:'#FFEAD5'
mode:workflow
name:hello
use_icon_as_answer_icon:false
kind:app
version:0.1.2
workflow:
conversation_variables:[]
environment_variables:[]
features:
file_upload:
allowed_file_extensions:
-.JPG
-.JPEG
-.PNG
-.GIF
-.WEBP
-.SVG
allowed_file_types:
-image
allowed_file_upload_methods:
-local_file
-remote_url
enabled:false
fileUploadConfig:
audio_file_size_limit:50
batch_count_limit:5
file_size_limit:15
image_file_size_limit:10
video_file_size_limit:100
image:
enabled:false
number_limits:3
transfer_methods:
-local_file
-remote_url
number_limits:3
opening_statement:''
retriever_resource:
enabled:true
sensitive_word_avoidance:
enabled:false
speech_to_text:
enabled:false
suggested_questions:[]
suggested_questions_after_answer:
enabled:false
text_to_speech:
enabled:false
language:''
voice:''
graph:
edges:
-data:
isInIteration:false
sourceType:start
targetType:http-request
id:1728717788734-source-1728717794809-target
source:'1728717788734'
sourceHandle:source
target:'1728717794809'
targetHandle:target
type:custom
zIndex:0
-data:
isInIteration:false
sourceType:http-request
targetType:end
id:1728717794809-source-1732851756085-target
source:'1728717794809'
sourceHandle:source
target:'1732851756085'
targetHandle:target
type:custom
zIndex:0
nodes:
-data:
desc:''
selected:false
title:开始
type:start
variables:[]
height:54
id:'1728717788734'
position:
x:80
y:282
positionAbsolute:
x:80
y:282
sourcePosition:right
targetPosition:left
type:custom
width:244
-data:
authorization:
config:null
type:no-auth
body:
data:
-type:text
value:''
type:none
desc:''
headers:''
method:get
params:wd:当前时间
selected:false
timeout:
max_connect_timeout:0
max_read_timeout:0
max_write_timeout:0
title:HTTP请求
type:http-request
url:https://www.baidu.com/s
variables:[]
height:94
id:'1728717794809'
position:
x:384
y:282
positionAbsolute:
x:384
y:282
selected:false
sourcePosition:right
targetPosition:left
type:custom
width:244
-data:
desc:''
outputs:
-value_selector:
-'1728717794809'
-body
variable:result
selected:false
title:结束
type:end
height:90
id:'1732851756085'
position:
x:688
y:282
positionAbsolute:
x:688
y:282
selected:true
sourcePosition:right
targetPosition:left
type:custom
width:244
viewport:
x:-20
y:1
zoom:1
它定义了一个包含有三个节点的工作流:
我们将它转换为 rill-flow 的 DSL yaml 则是:
workspace: "default"
dagName:"httpRequestDemo"
type:"flow"
tasks:
-name:"httpRequest"
title:"HTTP 请求"
category:"function"
resourceName:"http://www.baidu.com/s"
resourceProtocol:"http"
pattern:"task_sync"
input:
query.wd:"当前时间"
output:
end:$.httpRequest.result
rill-flow 的这个 DSL 描述了一个只有一个节点的工作流,它实现了对 http 接口的请求,并且输出返回 json 的 result key 对应的 数据。
由此可以看出,rill-flow 的 DSL 配置十分简洁,同时结合官方文档中提到的其他配置字段,能够实现非常强大的包含错误处理、失败重试、流式处理、分布式并发执行等多种功能,能够非常完美的支持大模型应用平台的各种场景。比如,如果我们想让上述工作流中的 HTTP 请求失败后,能够间隔 2 秒重试 3 次,那么我们只需要在上述 DSL 描述中添加 retry 字段即可:
workspace: "default"
dagName:"httpRequestDemo"
type:"flow"
tasks:
-name:"httpRequest"
title:"HTTP 请求"
category:"function"
resourceName:"http://www.baidu.com/s"
resourceProtocol:"http"
pattern:"task_sync"
input:
query.wd:"当前时间"
retry:
maxRetryTimes:3
intervalInSeconds:2
output:
end:$.httpRequest.result
同样的,我们也可以将某个任务节点的 tolerance 属性设置为 true,就可以将其设置为执行失败就跳过的非核心节点。
由于 dify 服务端不支持对整个流程图中的单个节点进行独立的调用和执行,因此,在上述 DSL 转换服务架构下, 需要对 dify 进行一定的改造,暴露出 dify 任务执行器的执行接口。
首先,在controllers/workflow/workflow.py
中增加工作流单个节点的执行接口,在这个接口中,通过直接调用api/core/workflow/workflow_entry.py
中的WorkflowEntry.single_step_run
方法,并传递单个任务执行所需的全部参数,实现单个任务节点的执行与事件生成。然后,通过WorkflowEntry.handle_special_values
方法接收相应事件结果并返回。
在 DSL 转换服务执行转换时,每个任务的调用目标地址都指向 dify 服务端的这个新增的单个节点执行接口,从而在工作流执行时,rill-flow 可以通过不断地调度和派发,完成整个工作流的执行。
通过上述改造,我们实现了用最低的改造成本,将 dify 的前端交互体验,以及大模型任务的执行能力,与 rill-flow 功能强大且高性能的调度能力结合到了一起。由于我们通过新增接口的方式对 dify 进行改造,对 dify 的代码几乎不具备任何侵入性,因此,随着 dify 项目的后续迭代,也不存在维护成本上升的问题。
下图中的工作流是一个典型的例子,首先,传入一段长文本,然后,代码执行节点通过 python 代码将文本切分为以 100 字符为单位的字符串列表,然后对这个列表进行迭代,让大模型对每个分段进行总结。
在改造前的 dify 项目中,上述流程的单次运行时间为 47.4 秒,而在改造后的运行结果中,上述流程的单次运行时间被缩短到了 19 秒。运行时间缩短了 60%,效果十分显著。除此以外,改造后的工作流还增加了工作流中任务的等待、触发,以及丰富的错误处理、重试、跳过等能力。
经过不断的发展和实践,我们的大模型应用平台从最初整合多个大模型 API 接口,提供简单的问答功能,并通过 dify 项目提供复杂工作流的流程编排能力,到后来将工作流引擎的执行切换到 rill-flow 项目,在保证前端操作体验不变的同时优化了工作流的性能表现和稳定性。
最终,我们在 dify 美观、易用的界面基础上,通过实现 DSL 转换器,实现了用户在 dify 界面上通过拖拽配置工作流,但后台实际通过 rill-flow 来调度执行,从而在最大限度上保留用户使用习惯并且又享受到了 rill-flow 带来的高性能、高可靠性的工作流引擎服务,充分发挥了 dify 与 rill-flow 两个开源项目各自的优势,最大程度上满足了我们大模型应用平台实际的业务场景需要。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-01-13
前后端源码部署:Dify v0.15.0 升级 v1.0.0-beta.1 的尝试
2025-01-11
Dify v1.0.0-beta:插件开启公测
2025-01-07
Dify v0.15.0:全新父子检索策略 - 更精准,更全面的知识检索
2024-12-27
【场景驱动】企业的哪些重复性任务,最适合用Coze循环节点来解决?——慢慢学AI146
2024-12-24
Coze,Dify,FastGPT,哪个更强?全方位对比分析来了!
2024-12-19
打开日本市场背后,Dify 是怎么做 AI 全球化的?
2024-12-15
有了 NewAPI 之后,Dify 的可玩儿性又高了
2024-12-06
太强大了!Coze史诗级提升,零门槛做自己的AI产品!
2024-04-25
2024-04-24
2024-07-16
2024-07-20
2024-05-08
2024-05-07
2024-05-09
2024-06-21
2024-12-24
2024-04-25