Skip to content

Commit

Permalink
Merge pull request #2876 from InfinityPacer/feature/event
Browse files Browse the repository at this point in the history
feat(auth): enhance auxiliary authentication
  • Loading branch information
jxxghp authored Oct 19, 2024
2 parents 616b15e + c7b2778 commit 69c0229
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 167 deletions.
60 changes: 13 additions & 47 deletions app/api/endpoints/login.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,49 @@
import secrets
from datetime import timedelta
from typing import Any, List

from fastapi import APIRouter, Depends, Form, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session

from app import schemas
from app.chain.tmdb import TmdbChain
from app.chain.user import UserChain
from app.core import security
from app.core.config import settings
from app.core.security import get_password_hash
from app.db import get_db
from app.db.models.user import User
from app.helper.sites import SitesHelper
from app.log import logger
from app.utils.web import WebUtils

router = APIRouter()


@router.post("/access-token", summary="获取token", response_model=schemas.Token)
async def login_access_token(
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
otp_password: str = Form(None)
) -> Any:
"""
获取认证Token
"""
# 检查数据库
success, user = User.authenticate(
db=db,
name=form_data.username,
password=form_data.password,
otp_password=otp_password
)
success, user_or_message = UserChain().user_authenticate(username=form_data.username,
password=form_data.password,
mfa_code=otp_password)

if not success:
# 认证不成功
if not user:
if not settings.AUXILIARY_AUTH_ENABLE:
logger.warn(f"用户 {form_data.username} 登录失败!")
raise HTTPException(status_code=401, detail="用户名、密码或二次校验码不正确")
else:
# 如果找不到用户并开启了辅助认证
logger.warn(f"登录用户 {form_data.username} 本地不存在,尝试辅助认证 ...")
success = UserChain().user_authenticate(form_data.username, form_data.password)
if not success:
logger.warn(f"用户 {form_data.username} 登录失败!")
raise HTTPException(status_code=401, detail="用户名、密码、二次校验码不正确")
else:
logger.info(f"用户 {form_data.username} 辅助认证成功,以普通用户登录...")
# 加入用户信息表
logger.info(f"创建用户: {form_data.username}")
user = User(name=form_data.username, is_active=True,
is_superuser=False, hashed_password=get_password_hash(secrets.token_urlsafe(16)))
user.create(db)
else:
# 用户存在,但认证失败
logger.warn(f"用户 {user.name} 登录失败!")
raise HTTPException(status_code=401, detail="用户名、密码或二次校验码不正确")
elif user and not user.is_active:
raise HTTPException(status_code=403, detail="用户未启用")
logger.info(f"用户 {user.name} 登录成功!")
raise HTTPException(status_code=401, detail=user_or_message)

level = SitesHelper().auth_level
return schemas.Token(
access_token=security.create_access_token(
userid=user.id,
username=user.name,
super_user=user.is_superuser,
userid=user_or_message.id,
username=user_or_message.name,
super_user=user_or_message.is_superuser,
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
level=level
),
token_type="bearer",
super_user=user.is_superuser,
user_id=user.id,
user_name=user.name,
avatar=user.avatar,
super_user=user_or_message.is_superuser,
user_id=user_or_message.id,
user_name=user_or_message.name,
avatar=user_or_message.avatar,
level=level
)

Expand Down
272 changes: 212 additions & 60 deletions app/chain/user.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,232 @@
import secrets
from typing import Optional, Tuple, Union

from app.chain import ChainBase
from app.core.config import settings
from app.core.security import get_password_hash, verify_password
from app.db.models.user import User
from app.db.user_oper import UserOper
from app.log import logger
from app.schemas.event import AuthPassedInterceptData, AuthVerificationData
from app.schemas.event import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import ChainEventType
from app.utils.otp import OtpUtils
from app.utils.singleton import Singleton

PASSWORD_INVALID_CREDENTIALS_MESSAGE = "用户名或密码或二次校验码不正确"


class UserChain(ChainBase):
class UserChain(ChainBase, metaclass=Singleton):
"""
用户链
用户链,处理多种认证协议
"""

def user_authenticate(self, name: str, password: str) -> bool:
"""
辅助完成用户认证。
def __init__(self):
super().__init__()
self.user_oper = UserOper()

:param name: 用户名
:param password: 密码
:return: 认证成功时返回 True,否则返回 False
def user_authenticate(
self,
username: Optional[str] = None,
password: Optional[str] = None,
mfa_code: Optional[str] = None,
code: Optional[str] = None,
grant_type: str = "password"
) -> Union[Tuple[bool, Optional[str]], Tuple[bool, Optional[User]]]:
"""
logger.debug(f"开始对用户 {name} 通过系统预置渠道进行辅助认证")
auth_data = AuthVerificationData(name=name, password=password)
# 尝试通过默认的认证模块认证
try:
result = self.run_module("user_authenticate", auth_data=auth_data)
if result:
return self._process_auth_success(name, result)
except Exception as e:
logger.error(f"认证模块运行出错:{e}")
return False
认证用户,根据不同的 grant_type 处理不同的认证流程
# 如果预置的认证未通过,则触发 AuthVerification 事件
logger.debug(f"用户 {name} 未通过系统预置渠道认证,触发认证事件")
event = self.eventmanager.send_event(
etype=ChainEventType.AuthVerification,
data=auth_data
:param username: 用户名,适用于 "password" grant_type
:param password: 用户密码,适用于 "password" grant_type
:param mfa_code: 一次性密码,适用于 "password" grant_type
:param code: 授权码,适用于 "authorization_code" grant_type
:param grant_type: 认证类型,如 "password", "authorization_code", "client_credentials"
:return:
- 对于成功的认证,返回 (True, User)
- 对于失败的认证,返回 (False, "错误信息")
"""
credentials = AuthCredentials(
username=username,
password=password,
mfa_code=mfa_code,
code=code,
grant_type=grant_type
)
if not event:
return False
if event and event.event_data:
try:
return self._process_auth_success(name, event.event_data)
except Exception as e:
logger.error(f"AuthVerificationData 数据验证失败:{e}")
return False
logger.debug(f"开始使用 {grant_type} 认证,对用户 {username} 进行身份校验")
if credentials.grant_type == "password":
# Password 认证
success, user_or_message = self.password_authenticate(credentials=credentials)
if success:
# 如果用户启用了二次验证码,则进一步验证
if not self._verify_mfa(user_or_message, credentials.mfa_code):
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
logger.info(f"用户 {username} 通过密码认证成功")
return True, user_or_message
else:
# 用户不存在或密码错误,考虑辅助认证
if settings.AUXILIARY_AUTH_ENABLE:
# 检查是否因为用户被禁用
user = self.user_oper.get_by_name(name=username)
if user and not user.is_active:
logger.info(f"用户 {username} 已被禁用,跳过后续辅助认证")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE

# 认证失败
logger.warning(f"用户 {name} 辅助认证失败")
return False
logger.warning("密码认证失败,尝试通过外部服务进行辅助认证 ...")
aux_success, aux_user_or_message = self.auxiliary_authenticate(credentials=credentials)
if aux_success:
# 辅助认证成功后再验证二次验证码
if not self._verify_mfa(aux_user_or_message, credentials.mfa_code):
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
return True, aux_user_or_message
else:
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
else:
logger.debug(f"辅助认证未启用,用户 {username} 认证失败")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE
else:
# 处理其他认证类型的分支
if settings.AUXILIARY_AUTH_ENABLE:
aux_success, aux_user_or_message = self.auxiliary_authenticate(credentials=credentials)
if aux_success:
logger.info(f"用户 {username} 辅助认证成功")
return True, aux_user_or_message
else:
logger.warning(f"用户 {username} 辅助认证失败")
return False, "认证失败"
else:
logger.debug(f"辅助认证未启用,认证类型 {grant_type} 未实现")
return False, "不支持的认证类型"

def _process_auth_success(self, name: str, data: AuthVerificationData) -> bool:
def password_authenticate(self, credentials: AuthCredentials) -> Tuple[bool, Union[User, str]]:
"""
处理认证成功后的逻辑,记录日志并处理拦截事件。
密码认证
:param name: 用户名
:param data: 认证返回的数据,包含 token、channel 和 service
:return: 成功返回 True,若被拦截返回 False
:param credentials: 认证凭证,包含用户名、密码以及可选的 MFA 认证码
:return:
- 成功时返回 (True, User),其中 User 是认证通过的用户对象
- 失败时返回 (False, "错误信息")
"""
token, channel, service = data.token, data.channel, data.service
if token and channel and service:
# 匿名化 token
anonymized_token = f"{token[:len(token) // 2]}****"
logger.info(f"用户 {name} 通过渠道 {channel},服务: {service} 认证成功,token: {anonymized_token}")

# 触发认证通过的拦截事件
intercept_event = self.eventmanager.send_event(
etype=ChainEventType.AuthPassedIntercept,
data=AuthPassedInterceptData(name=name, channel=channel, service=service, token=token)
)

if intercept_event and intercept_event.event_data:
intercept_data: AuthPassedInterceptData = intercept_event.event_data
if intercept_data.cancel:
logger.info(
f"认证被拦截,用户: {name},渠道: {channel},服务: {service},拦截源: {intercept_data.source}")
return False
if not credentials or credentials.grant_type != "password":
logger.debug("密码认证失败,认证类型不匹配")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE

user = self.user_oper.get_by_name(name=credentials.username)
if not user:
logger.debug(f"密码认证失败,用户 {credentials.username} 不存在")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE

if not user.is_active:
logger.debug(f"密码认证失败,用户 {credentials.username} 已被禁用")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE

if not verify_password(credentials.password, str(user.hashed_password)):
logger.debug(f"密码认证失败,用户 {credentials.username} 的密码验证不通过")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE

return True, user

def auxiliary_authenticate(self, credentials: AuthCredentials) -> Tuple[bool, Union[User, str]]:
"""
辅助用户认证
:param credentials: 认证凭证,包含必要的认证信息
:return:
- 成功时返回 (True, User),其中 User 是认证通过的用户对象
- 失败时返回 (False, "错误信息")
"""
if not credentials:
return False, "认证凭证无效"

logger.debug(f"尝试通过系统模块进行辅助认证,用户: {credentials.username}")
result = self.run_module("user_authenticate", credentials=credentials)

if not result:
logger.debug(f"通过系统模块辅助认证失败,尝试触发 {ChainEventType.AuthVerification} 事件")
event = self.eventmanager.send_event(etype=ChainEventType.AuthVerification, data=credentials)
if not event or not event.event_data:
logger.error(f"{credentials.grant_type} 辅助认证失败,未返回有效数据")
return False, f"{credentials.grant_type} 辅助认证事件失败或无效"

credentials = event.event_data # 使用事件返回的认证数据
else:
logger.info(f"通过系统模块辅助认证成功,用户: {credentials.username}")
credentials = result # 使用模块认证返回的认证数据

# 处理认证成功的逻辑
success = self._process_auth_success(username=credentials.username, credentials=credentials)
if success:
logger.info(f"用户 {credentials.username} 辅助认证通过")
return True, self.user_oper.get_by_name(credentials.username)
else:
logger.warning(f"用户 {credentials.username} 辅助认证未通过")
return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE

@staticmethod
def _verify_mfa(user: User, mfa_code: Optional[str]) -> bool:
"""
验证 MFA(二次验证码)
:param user: 用户对象
:param mfa_code: 二次验证码
:return: 如果验证成功返回 True,否则返回 False
"""
if not user.is_otp:
return True
if not mfa_code:
logger.debug(f"用户 {user.name} 缺少 MFA 认证码")
return False
if not OtpUtils.check(str(user.otp_secret), mfa_code):
logger.debug(f"用户 {user.name} 的 MFA 认证失败")
return False
return True

def _process_auth_success(self, username: str, credentials: AuthCredentials) -> bool:
"""
处理辅助认证成功的逻辑,返回用户对象或创建新用户
:param username: 用户名
:param credentials: 认证凭证,包含 token、channel、service 等信息
:return:
- 如果认证成功并且用户存在或已创建,返回 User 对象
- 如果认证被拦截或失败,返回 None
"""
if not username:
logger.debug(f"未能获取到对应的用户信息, {credentials.grant_type} 认证不通过")
return False

token, channel, service = credentials.token, credentials.channel, credentials.service
if not all([token, channel, service]):
logger.debug(f"用户 {username} 未通过 {credentials.grant_type} 认证,必要信息不足")
return False

# 触发认证通过的拦截事件
intercept_event = self.eventmanager.send_event(
etype=ChainEventType.AuthIntercept,
data=AuthInterceptCredentials(username=username, channel=channel, service=service, token=token)
)

logger.warning(f"用户 {name} 未通过辅助认证")
return False
if intercept_event and intercept_event.event_data:
intercept_data: AuthInterceptCredentials = intercept_event.event_data
if intercept_data.cancel:
logger.warning(
f"认证被拦截,用户:{username},渠道:{channel},服务:{service},拦截源:{intercept_data.source}")
return False

# 检查用户是否存在,如果不存在且当前为密码认证时则创建新用户
user = self.user_oper.get_by_name(name=username)
if user:
anonymized_token = f"{token[:len(token) // 2]}********"
logger.info(
f"认证类型:{credentials.grant_type},用户:{username},渠道:{channel},"
f"服务:{service} 认证成功,token:{anonymized_token}")
return False
else:
if credentials.grant_type == "password":
self.user_oper.add(name=username, is_active=True, is_superuser=False,
hashed_password=get_password_hash(secrets.token_urlsafe(16)))
logger.info(f"用户 {username} 不存在,已通过 {credentials.grant_type} 认证并已创建普通用户")
return True
else:
logger.warning(
f"认证类型:{credentials.grant_type},用户:{username},渠道:{channel},"
f"服务:{service} 认证不通过,未能在本地找到对应的用户信息")
return False
2 changes: 1 addition & 1 deletion app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Config:
CONFIG_DIR: Optional[str] = None
# 超级管理员
SUPERUSER: str = "admin"
# 辅助认证,允许通过外部服务(如媒体服务器/插件等)认证并创建用户
# 辅助认证,允许通过外部服务进行认证、单点登录以及自动创建用户
AUXILIARY_AUTH_ENABLE: bool = False
# API密钥,需要更换
API_TOKEN: Optional[str] = None
Expand Down
Loading

0 comments on commit 69c0229

Please sign in to comment.