AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


Dify 中的 Bearer Token 与 API-Key 鉴权方式
发布日期:2024-11-05 07:34:02 浏览次数: 1660 来源:NLP工程化


本文使用 Dify v0.10.2 版本,在 Dify 中包括 Bearer Token 与 API-Key 鉴权这 2 种方式。console(URL 前缀/console/api)和 web(URL 前缀/api)蓝图使用的是 Bearer Token 鉴权方式,而 service_api(URL 前缀/v1)蓝图使用的是 API-Key 鉴权方式。console 蓝图通过 login_required 装饰器[1]实现,web 蓝图通过继承 WebApiResource,即 validate_jwt_token 装饰器实现,而 service_api 通过 validate_app_token 装饰器实现。

一.service_api 蓝图与 API-Key 鉴权

API-Key 与 Bearer Token 鉴权区别,如下所示:

特性API-KeyBearer Token
定义一种用于识别调用者身份的密钥一种令牌,表示用户已经通过认证
格式通常为简单的字符串通常为 "Bearer <token>" 格式
存储位置可以在请求头、URL 查询参数或请求体中通常存储在请求头中
认证过程直接使用 API-Key需要通过认证流程获取令牌
有效期通常没有有效期或长期有效通常有有效期(如几分钟到几小时)
安全性相对较低,容易被截获更高,通过过期和刷新机制提高安全性
使用场景简单的 API 访问用户身份验证和授权
撤销机制需要手动更换或删除 API-Key可以通过令牌失效或刷新实现
适用性适用于无状态 API 调用适用于需要用户会话的应用

1.service_api 蓝图

简单理解,只有这一个地方的 API 使用的是 API-Key 鉴权方式,其它地方均使用 earer Token 鉴权方式。

2.validate_app_token 装饰器

该装饰器 validate_app_token 用于验证应用程序的 API 令牌,检查相关的应用和租户状态,并根据需要提取用户信息,为后续的视图函数提供必要的上下文。

(1)函数定义和参数

def validate_app_token(view: Optional[Callable] = None, *, fetch_user_arg: Optional[FetchUserArg] = None):
  • 定义一个装饰器 validate_app_token,接受一个可选的视图函数 view 和一个关键字参数 fetch_user_arg
  • fetch_user_arg 是一个可选参数,用于指定如何从请求中获取用户 ID。

(2)装饰器内部定义

def decorator(view_func):
  • 定义内部函数 decorator,它接受要装饰的视图函数 view_func

(3)装饰的视图函数

@wraps(view_func)
def decorated_view(*args, **kwargs):
  • 使用 functools.wraps 保留原始视图函数的元数据。
  • 定义内部函数 decorated_view,接受任意位置和关键字参数。

(4)获取和验证 API 令牌

api_token = validate_and_get_api_token("app")
  • 调用函数 validate_and_get_api_token 验证并获取 API 令牌,类型为 "app"。

(5)查询应用模型

app_model = db.session.query(App).filter(App.id == api_token.app_id).first()
if not app_model:
raise Forbidden("The app no longer exists.")
  • 查询数据库,获取与 api_token 中的 app_id 关联的应用模型 app_model
  • 如果未找到该应用模型,抛出 Forbidden 异常,说明应用不存在。

(6)检查应用状态

if app_model.status != "normal":
raise Forbidden("The app's status is abnormal.")
  • 检查应用的状态是否为正常。如果不是,则抛出 Forbidden 异常,说明应用状态异常。
if not app_model.enable_api:
raise Forbidden("The app's API service has been disabled.")
  • 检查应用的 API 服务是否启用。如果禁用,则抛出 Forbidden 异常。

(7)查询租户模型

tenant = db.session.query(Tenant).filter(Tenant.id == app_model.tenant_id).first()
if tenant.status == TenantStatus.ARCHIVE:
raise Forbidden("The workspace's status is archived.")
  • 查询与 app_model 相关联的租户模型 tenant,并检查租户的状态。
  • 如果租户状态为归档(ARCHIVE),则抛出 Forbidden 异常。

(8)将应用模型传递给视图函数

kwargs["app_model"] = app_model
  • app_model 添加到 kwargs 中,以便传递给被装饰的视图函数。

(9)获取用户 ID

if fetch_user_arg:
if fetch_user_arg.fetch_from == WhereisUserArg.QUERY:
user_id = request.args.get("user")
elif fetch_user_arg.fetch_from == WhereisUserArg.JSON:
user_id = request.get_json().get("user")
elif fetch_user_arg.fetch_from == WhereisUserArg.FORM:
user_id = request.form.get("user")
else:
# use default-user
user_id = None
  • 如果 fetch_user_arg 被提供,根据其 fetch_from 属性从不同来源获取用户 ID:
    • 如果是查询参数(QUERY),从请求参数中获取。
    • 如果是 JSON 体(JSON),从请求 JSON 中获取。
    • 如果是表单数据(FORM),从请求表单中获取。
    • 如果没有指定来源,则 user_id 设置为 None

(10)检查用户 ID 的有效性

if not user_id and fetch_user_arg.required:
raise ValueError("Arg user must be provided.")
  • 如果 user_id 不存在且 fetch_user_arg 指定了该参数是必需的,则抛出 ValueError
if user_id:
user_id = str(user_id)

如果 user_id 存在,则将其转换为字符串。

(11)创建或更新用户信息

kwargs["end_user"] = create_or_update_end_user_for_user_id(app_model, user_id)
  • 调用函数 create_or_update_end_user_for_user_id,根据 app_modeluser_id 创建或更新用户信息,并将结果存入 kwargs 中。

(12)返回视图函数的结果

return view_func(*args, **kwargs)
  • 调用被装饰的视图函数,传递原始参数和 kwargs,并返回结果。

(13)装饰器的返回

return decorated_view
  • 返回装饰后的视图函数 decorated_view

(14)检查视图函数参数

if view is None:
return decorator
else:
return decorator(view)
  • 如果 view 参数为 None,则返回 decorator 函数;否则将视图函数 view 传递给 decorator 并返回。

二.console 蓝图与 Bearer Token 鉴权(用户登录操作)

源码位置:dify\api\controllers\console\auth\login.py

该接口返回结果示例,如下所示:

{
"result": "success",
"data": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiZWJiZjI1YmUtZWM4NS00ZDY4LTlkZDItNWJjMmVlMjg1NWU0IiwiZXhwIjoxNzMwNjg5Mjk4LCJpc3MiOiJTRUxGX0hPU1RFRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.EPfEaeoDY8NVeS7TDC95-B0rvFYjH9BVqiFY9IwYcVQ",
"refresh_token": "32a914bb7d7dac9b1b7b5ba98d6584007acbcba4638374d8e0171877f5c9eee34eae400917aadc433d7801181367579dd72894ae8f512aaba00929ab98bf509f"
}
}

1.AccountService.authenticate()方法

account = AccountService.authenticate(args["email"], args["password"])该方法实现了基于邮箱和密码的账户认证流程,包括账户查询、状态检查、密码哈希、首次登录处理和状态更新。如果所有条件都满足,最后将返回一个有效的 Account 对象。

@staticmethod
def authenticate(email: str, password: str, invite_token: Optional[str] = None) -> Account:
<em>"""authenticate account with email and password"""</em>

<em></em>account = Account.query.filter_by(email=email).first()
if not account:
raise AccountNotFoundError()

if account.status == AccountStatus.BANNED.value:
raise AccountLoginError("Account is banned.")

if password and invite_token and account.password is None:
# if invite_token is valid, set password and password_salt
salt = secrets.token_bytes(16)
base64_salt = base64.b64encode(salt).decode()
password_hashed = hash_password(password, salt)
base64_password_hashed = base64.b64encode(password_hashed).decode()
account.password = base64_password_hashed
account.password_salt = base64_salt

if account.password is None or not compare_password(password, account.password, account.password_salt):
raise AccountPasswordError("Invalid email or password.")

if account.status == AccountStatus.PENDING.value:
account.status = AccountStatus.ACTIVE.value
account.initialized_at = datetime.now(timezone.utc).replace(tzinfo=None)

db.session.commit()

return account

生成 16 字节的随机盐值,并将其编码为 Base64 字符串,以便后续用于密码哈希。对用户提供的密码进行哈希处理,使用生成的盐值,然后将哈希结果编码为 Base64 字符串。将哈希后的密码和盐值存储到账户对象中。

2. AccountService.login()方法

token_pair = AccountService.login(account=account, ip_address=extract_remote_ip(request))

@staticmethod
def login(account: Account, *, ip_address: Optional[str] = None) -> TokenPair:
if ip_address:
AccountService.update_login_info(account=account, ip_address=ip_address)

if account.status == AccountStatus.PENDING.value:
account.status = AccountStatus.ACTIVE.value
db.session.commit()

access_token = AccountService.get_account_jwt_token(account=account)
refresh_token = _generate_refresh_token()

AccountService._store_refresh_token(refresh_token, account.id)

return TokenPair(access_token=access_token, refresh_token=refresh_token)

(1)AccountService.get_account_jwt_token(account=account)

该方法主要是生成 jwt token 的过程,如下所示:

@staticmethod
def get_account_jwt_token(account: Account) -> str:
exp_dt = datetime.now(timezone.utc) + timedelta(minutes=dify_config.ACCESS_TOKEN_EXPIRE_MINUTES)
exp = int(exp_dt.timestamp())
payload = {
"user_id": account.id,
"exp": exp,
"iss": dify_config.EDITION,
"sub": "Console API Passport",
}

token = PassportService().issue(payload)# jwt.encode(payload, self.sk, algorithm="HS256")
return token

(2)_generate_refresh_token()

该方法主要是生成 refresh token 的过程,如下所示:

def _generate_refresh_token(length: int = 64):
token = secrets.token_hex(length)#返回一个随机的文本字符串,使用十六进制表示。
return token

(3)AccountService._store_refresh_token(refresh_token, account.id)

@staticmethod
def _store_refresh_token(refresh_token: str, account_id: str) -> None:
redis_client.setex(AccountService._get_refresh_token_key(refresh_token), REFRESH_TOKEN_EXPIRY, account_id)
redis_client.setex(
AccountService._get_account_refresh_token_key(account_id), REFRESH_TOKEN_EXPIRY, refresh_token
)

该方法的作用是将刷新令牌和账户 ID 以键值对的形式存储到 Redis 中,并设置过期时间,以便后续的身份验证和管理。当用户登录或需要刷新令牌时,可以快速查找这些信息。

3.AccountService.reset_login_error_rate_limit()方法

AccountService.reset_login_error_rate_limit(args["email"])

@staticmethod
def reset_login_error_rate_limit(email: str):
key = f"login_error_rate_limit:{email}"
redis_client.delete(key)

该方法用于重置指定邮箱的登录错误次数限制,通过删除与该邮箱相关的 Redis 键。

三.console 蓝图与 Bearer Token 鉴权(用户登出操作)

1./console/api/logout 接口

http://localhost:5001/console/api/logout

Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiZWJiZjI1YmUtZWM4NS00ZDY4LTlkZDItNWJjMmVlMjg1NWU0IiwiZXhwIjoxNzMwNjg1NTcyLCJpc3MiOiJTRUxGX0hPU1RFRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.2e_5wPvf8GU9JARduOiIc06loBlQv0mgUrfkbMceeSg

2.AccountService.logout()

源码位置:dify\api\controllers\console\auth\login.py

AccountService.logout(account=account)该方法用于用户登出操作,首先尝试从 Redis 中获取用户的刷新令牌,如果存在,则删除该令牌,以实现用户登出的效果。

@staticmethod
def logout(*, account: Account) -> None:
refresh_token = redis_client.get(AccountService._get_account_refresh_token_key(account.id))
if refresh_token:
AccountService._delete_refresh_token(refresh_token.decode("utf-8"), account.id)

使用 redis_client 从 Redis 数据库中获取与给定 account(账户)相关的刷新令牌(refresh token)。_get_account_refresh_token_key(account.id) 是一个静态方法,用于生成存储刷新令牌的键,account.id 是账户的唯一标识符。

如果获取到了刷新令牌,调用 AccountService_delete_refresh_token 方法来删除该令牌。refresh_token.decode("utf-8") 将字节串解码为字符串,传入方法以执行删除操作,同时传入 account.id 以确保删除的是对应账户的令牌。

四.Web 蓝图与 Bearer Token 鉴权

1.Web 蓝图

简单理解,将应用发布后的运行 Web 页面调用接口 URL 前缀为/api,如下所示:

2.validate_jwt_token 装饰器

这里面的类基本都继承自 WebApiResource 类,而 WebApiResource 类实现,如下所示:

class WebApiResource(Resource):
method_decorators = [validate_jwt_token]

validate_jwt_token 装饰器用于在调用视图函数之前验证 JWT。它会解码 JWT,提取出相关的用户信息,并将其传递给视图函数。如下所示:

def validate_jwt_token(view=None):
def decorator(view):
@wraps(view)
def decorated(*args, **kwargs):
app_model, end_user = decode_jwt_token()

return view(app_model, end_user, *args, **kwargs)

return decorated

if view:
return decorator(view)
return decorato


53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询