微信扫码
和创始人交个朋友
我要投稿
掌握DeepSeek+dify知识库查询技巧,提升AI应用效率。核心内容:1. DeepSeek+dify知识库查询的两种方式介绍2. 基于代码执行模块直接查询数据库的流程梳理3. 基于Http请求调用封装接口查询数据库的接口开发指南
自从发了 DeepSeek+dify 本地知识库:真的太香了这篇以后,一直有小伙伴介绍在问我,怎么让在个ai应用客户端直接连接数据库查询。dify官方没有现成的组件可以直接用。
当时我想的是两种方式,一种是基于代码执行模块直接查询数据库,一种是基于Http请求,调用自己封装接口来查询数据库。
历史文章
DeepSeek+dify 本地知识库:高级应用Agent+工作流
想干这事之前先梳理下流程
要求:
python -m pip install pymysql flask
让kimi给我生成一个文章表,并且插入10条数据,我们可以告诉kimi,文章长度多大,这样内容可以丰富些。直接让kimi生成一个暴露接口查数据库的服务,有简单的优化了下,将下面的内容放入到
server.py
文件中
from flask import Flask, request, jsonify
import pymysql
app = Flask(__name__)
# 数据库配置
DATABASE_CONFIG = {
'host': '', # 自己的数据库地址
'user': '', # 自己数据库的账户
'password': '', #自己数据库的密码
'db': 'demo', # 自己数据库的库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
@app.route('/query', methods=['POST'])
def query_database():
print("接收到请求")
# 获取关键字
keyword = request.json.get('keyword')
print("keyword为:"+keyword)
ifnot keyword:
return jsonify({"error": "Keyword is required"}), 400
# 参数化查询,避免 SQL 注入,修改成自己的库
query = "SELECT * FROM articles WHERE content LIKE %s"
params = ('%' + keyword + '%',)
try:
# 建立数据库连接
connection = pymysql.connect(**DATABASE_CONFIG)
with connection.cursor() as cursor:
# 执行查询
cursor.execute(query, params)
result = cursor.fetchall()
connection.commit()
connection.close()
ifnot result:
return"未查询到有效数据", 400
# 生成 Markdown 表格
markdown_table = generate_markdown_table(result)
return markdown_table, 200
except Exception as e:
return str(e), 500
def generate_markdown_table(results):
""" 生成 Markdown 表格 """
ifnot results:
return""
# 获取列名
columns = results[0].keys()
# 表头
table_md = "| " + " | ".join([col for col in columns]) + " |\n"
# 分隔线
table_md += "| " + " --- |" * len(columns) + "\n"
# 表格内容
for row in results:
table_md += "| " + " | ".join([str(cell) for cell in row.values()]) + " |\n"
return table_md
if __name__ == '__main__':
# 注意这里绑定本机的内容ip,省事点,就0.0.0.0即可。不要绑定127.0.0.1,docke内访问不到
app.run(host='10.1.0.65', port=8000)
启动服务
python .\server.py
创建一个空白应用。
在开始节点添加一个输入字段
context
添加一个LLM,把开始节点设置的context
字段作为上下文传入,并设置提示词提取关键词。
添加一个http请求节点,把我们在接口开发里的地址和接口名填写进去
2
,然后把大模型的输出作为关键词填写到请求body里3
,我们关闭重试机制4
。
这里要注意下:json的引号是中文的,最好在外面写好校验过了再放进去。在HTTP请求的输出变量里,我们只关注status_code 响应状态码和响应内容即可。
添加一个条件分支
1
,然后设置HTTP响应码为200的时候,连接到大模型。其他直接结束。
添加大模型,将HTTP请求的响应体作为上下文给大模型,输入提示词,让大模型根据知识,验证,并进行合理性的验证,最后结构化返回。
在结束节点中,我们把大模型整理的内容输出。
试运行效果。
由于difysandbox的安全限制
官方也有了对应的说明,见文档。 https://github.com/langgenius/dify-sandbox/blob/main/FAQ.mdss
一定要使用linux环境
、一定要使用linux环境
一定要使用linux环境
我从github上拉下代码以后,搜索``syscalls_amd64.go
一共有4个文件,
我用python,不是arm架构的,镜像都是linux的。
我们直接问kimi即可。
ps:这个问题丢给了ds和chatgpt都是瞎回答
一步步的问kimi,最后告诉我要添加哪些。整理以后添加到代码里。
var ALLOW_SYSCALLS = []int{
// file io
syscall.SYS_NEWFSTATAT, syscall.SYS_IOCTL, syscall.SYS_LSEEK, syscall.SYS_GETDENTS64,
syscall.SYS_WRITE, syscall.SYS_CLOSE, syscall.SYS_OPENAT, syscall.SYS_READ,
// thread
syscall.SYS_FUTEX,
// memory
syscall.SYS_MMAP, syscall.SYS_BRK, syscall.SYS_MPROTECT, syscall.SYS_MUNMAP, syscall.SYS_RT_SIGRETURN,
syscall.SYS_MREMAP,
// user/group
syscall.SYS_SETUID, syscall.SYS_SETGID, syscall.SYS_GETUID,
// process
syscall.SYS_GETPID, syscall.SYS_GETPPID, syscall.SYS_GETTID,
syscall.SYS_EXIT, syscall.SYS_EXIT_GROUP,
syscall.SYS_TGKILL, syscall.SYS_RT_SIGACTION, syscall.SYS_IOCTL,
syscall.SYS_SCHED_YIELD,
syscall.SYS_SET_ROBUST_LIST, syscall.SYS_GET_ROBUST_LIST, SYS_RSEQ,
// time
syscall.SYS_CLOCK_GETTIME, syscall.SYS_GETTIMEOFDAY, syscall.SYS_NANOSLEEP,
syscall.SYS_EPOLL_CREATE1,
syscall.SYS_EPOLL_CTL, syscall.SYS_CLOCK_NANOSLEEP, syscall.SYS_PSELECT6,
syscall.SYS_TIME,
syscall.SYS_RT_SIGPROCMASK, syscall.SYS_SIGALTSTACK, SYS_GETRANDOM,
//新增
5, 6, 7, 21, 41, 42, 44, 45, 51, 54, 55, 107, 137, 204, 281,
}
既然我们要操作在沙箱里操作mysql,那我们得在对应的环境中预装下mysql客户端。
在
1
对应的文件中添加2
对应的pymysql==1.1.1
,我直接安装最新版。
在readme中有操作步骤
### Steps
1. Clone the repository using `git clone https://github.com/langgenius/dify-sandbox` and navigate to the project directory.
2. Run ./install.sh to install the necessary dependencies.
3. Run ./build/build_[amd64|arm64].sh to build the sandbox binary.
4. Run ./main to start the server.
编译成功以后,打包镜像。因为我没有环境,直接模拟了下创建了一个main和env
目录
然后模拟打包镜像。在根目录中执行下面的命令
docker build -f docker/amd64/dockerfile -t dify-sandbox:local .
我在win上打包报了一堆错,都扔给kimi,一步步的解决。最后成功。
在我们的安装dify的的时候,有个dify/docker/ssrf_proxy
目录,找到squid.conf.template
在这里,你可以设置允许访问的网络,允许访问的端口,生产一定要最小权限
acl devnet src 10.1.0.0/24
acl devnet src 10.255.200.0/24
acl Safe_ports port 3306 # MySQL
acl Safe_ports port 5432 # Postgres
acl Safe_ports port 27017 # MongoDB
acl Safe_ports port 6379 # Redis
http_access allow devnet
devnet
为定义的规则集名称,后面跟自己的ip段设置,表示:10.255.200.1
到 10.255.200.254
-acl Safe_ports port
允许访问的端口http_access allow devnet
允许访问的规则集在dify的的docker目录中修改docker-compose.yaml
文件sandbox使用本地镜像。
sandbox:
#image: langgenius/dify-sandbox:0.2.10
image: dify-sandbox:local
restart: always
environment:
将image
由langgenius/dify-sandbox:0.2.10
改为了dify-sandbox:local
在docker目录下执行以下命令
# 销毁
docker compose down
# 重新部署
docker compose up -d
使用kimi生成了一个python代码
import sys
import pymysql
import os
def connect_to_database():
""" 连接到数据库,配置都从环境变量里取 """
try:
# 从环境变量或配置文件中获取数据库参数
host = os.getenv("DB_HOST", "localhost")
user = os.getenv("DB_USER", "root")
password = os.getenv("DB_PASSWORD", "password")
database = os.getenv("DB_NAME", "database_name")
conn = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 使用字典游标
)
return conn
except pymysql.MySQLError as err:
print(f"Error connecting to database: {err}")
returnNone
def execute_query(conn, query, params=None):
""" 执行 SQL 查询,并支持参数化查询 """
cursor = conn.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
except pymysql.MySQLError as err:
print(f"Error executing query: {err}")
returnNone
finally:
cursor.close()
def generate_markdown_table(results):
""" 生成 Markdown 表格 """
ifnot results:
return""
# 获取列名
columns = results[0].keys()
# 表头
table_md = "| " + " | ".join([col for col in columns]) + " |\n"
# 分隔线
table_md += "| " + " --- |" * len(columns) + "\n"
# 表格内容
for row in results:
table_md += "| " + " | ".join([str(cell) for cell in row.values()]) + " |\n"
return table_md
def main(arg1: str) -> dict:
# 参数化查询,避免 SQL 注入
query = "SELECT * FROM table_name WHERE column LIKE %s"
params = ('%' + arg1 + '%',)
# 连接到数据库
conn = connect_to_database()
ifnot conn:
sys.exit(1)
try:
# 执行查询
result = execute_query(conn, query, params)
if result isNone:
return {"result": [], "markdown": ""}
# 生成 Markdown 表格
markdown_table = generate_markdown_table(result)
return {
"result": result,
"markdown": markdown_table
}
except Exception as e:
print(f"Unexpected error: {e}")
return {"result": [], "markdown": ""}
finally:
# 确保数据库连接关闭
conn.close()
这两种方式,不管哪种都能实现查询数据库,但是有个问题,数据量小的时候性能还行,数据量大了,你查询一次就得耗时好久。
如果知识固定,也可以前置设置一个知识库把关键词和文章映射出来,这样大模型整理的时候,尽量的去往对应的关键词上靠。
?【三连好运 福利拉满】?
? 若本日推送有收获:
? 点赞 → 小手一抖,bug没有
? 在看 → 一点扩散,知识璀璨
? 收藏 → 代码永驻,防止迷路
? 分享 → 传递战友,功德+999
? 关注 → 追更不迷路,干货永同步
? 若有槽点想输出:
? 评论区已铺好红毯,等你来战!
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-01-02
2024-07-17
2025-01-03
2024-07-11
2024-08-13
2024-06-24
2024-07-13
2024-08-27
2024-06-10
2024-07-12
2025-02-13
2025-01-14
2025-01-10
2025-01-06
2025-01-02
2024-12-16
2024-12-10
2024-12-04