diff --git a/app/chain/user.py b/app/chain/user.py index 85e4da796..5a2650b84 100644 --- a/app/chain/user.py +++ b/app/chain/user.py @@ -51,7 +51,7 @@ def user_authenticate( code=code, grant_type=grant_type ) - logger.debug(f"开始使用 {grant_type} 认证,对用户 {username} 进行身份校验") + logger.debug(f"认证类型:{grant_type},开始准备对用户 {username} 进行身份校验") if credentials.grant_type == "password": # Password 认证 success, user_or_message = self.password_authenticate(credentials=credentials) @@ -64,12 +64,6 @@ def user_authenticate( else: # 用户不存在或密码错误,考虑辅助认证 if settings.AUXILIARY_AUTH_ENABLE: - # 检查是否因为用户被禁用 - user = self.user_oper.get_by_name(name=username) - if user and not user.is_active: - logger.info(f"用户 {username} 已被禁用,跳过后续辅助认证") - return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE - logger.warning("密码认证失败,尝试通过外部服务进行辅助认证 ...") aux_success, aux_user_or_message = self.auxiliary_authenticate(credentials=credentials) if aux_success: @@ -82,19 +76,19 @@ def user_authenticate( else: logger.debug(f"辅助认证未启用,用户 {username} 认证失败") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE - else: + elif credentials.grant_type == "authorization_code": # 处理其他认证类型的分支 if settings.AUXILIARY_AUTH_ENABLE: aux_success, aux_user_or_message = self.auxiliary_authenticate(credentials=credentials) if aux_success: - logger.info(f"用户 {username} 辅助认证成功") return True, aux_user_or_message else: - logger.warning(f"用户 {username} 辅助认证失败") return False, "认证失败" else: - logger.debug(f"辅助认证未启用,认证类型 {grant_type} 未实现") - return False, "不支持的认证类型" + return False, "认证失败" + else: + logger.debug(f"辅助认证未启用,认证类型 {grant_type} 未实现") + return False, "不支持的认证类型" def password_authenticate(self, credentials: AuthCredentials) -> Tuple[bool, Union[User, str]]: """ @@ -106,20 +100,20 @@ def password_authenticate(self, credentials: AuthCredentials) -> Tuple[bool, Uni - 失败时返回 (False, "错误信息") """ if not credentials or credentials.grant_type != "password": - logger.debug("密码认证失败,认证类型不匹配") + logger.info("密码认证失败,认证类型不匹配") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE user = self.user_oper.get_by_name(name=credentials.username) if not user: - logger.debug(f"密码认证失败,用户 {credentials.username} 不存在") + logger.info(f"密码认证失败,用户 {credentials.username} 不存在") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE if not user.is_active: - logger.debug(f"密码认证失败,用户 {credentials.username} 已被禁用") + logger.info(f"密码认证失败,用户 {credentials.username} 已被禁用") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE if not verify_password(credentials.password, str(user.hashed_password)): - logger.debug(f"密码认证失败,用户 {credentials.username} 的密码验证不通过") + logger.info(f"密码认证失败,用户 {credentials.username} 的密码验证不通过") return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE return True, user @@ -136,15 +130,22 @@ def auxiliary_authenticate(self, credentials: AuthCredentials) -> Tuple[bool, Un if not credentials: return False, "认证凭证无效" - logger.debug(f"尝试通过系统模块进行辅助认证,用户: {credentials.username}") + # 检查是否因为用户被禁用 + if credentials.username: + user = self.user_oper.get_by_name(name=credentials.username) + if user and not user.is_active: + logger.info(f"用户 {user.name} 已被禁用,跳过后续身份校验") + return False, PASSWORD_INVALID_CREDENTIALS_MESSAGE + + logger.debug(f"认证类型:{credentials.grant_type},尝试通过系统模块进行辅助认证,用户: {credentials.username}") result = self.run_module("user_authenticate", credentials=credentials) if not result: logger.debug(f"通过系统模块辅助认证失败,尝试触发 {ChainEventType.AuthVerification} 事件") event = self.eventmanager.send_event(etype=ChainEventType.AuthVerification, data=credentials) if not event or not event.event_data: - logger.error(f"{credentials.grant_type} 辅助认证失败,未返回有效数据") - return False, f"{credentials.grant_type} 辅助认证事件失败或无效" + logger.error(f"认证类型:{credentials.grant_type},辅助认证失败,未返回有效数据") + return False, f"认证类型:{credentials.grant_type},辅助认证事件失败或无效" credentials = event.event_data # 使用事件返回的认证数据 else: @@ -172,10 +173,10 @@ def _verify_mfa(user: User, mfa_code: Optional[str]) -> bool: if not user.is_otp: return True if not mfa_code: - logger.debug(f"用户 {user.name} 缺少 MFA 认证码") + logger.info(f"用户 {user.name} 缺少 MFA 认证码") return False if not OtpUtils.check(str(user.otp_secret), mfa_code): - logger.debug(f"用户 {user.name} 的 MFA 认证失败") + logger.info(f"用户 {user.name} 的 MFA 认证失败") return False return True @@ -190,12 +191,12 @@ def _process_auth_success(self, username: str, credentials: AuthCredentials) -> - 如果认证被拦截或失败,返回 None """ if not username: - logger.debug(f"未能获取到对应的用户信息, {credentials.grant_type} 认证不通过") + logger.info(f"未能获取到对应的用户信息,{credentials.grant_type} 认证不通过") return False token, channel, service = credentials.token, credentials.channel, credentials.service if not all([token, channel, service]): - logger.debug(f"用户 {username} 未通过 {credentials.grant_type} 认证,必要信息不足") + logger.info(f"用户 {username} 未通过 {credentials.grant_type} 认证,必要信息不足") return False # 触发认证通过的拦截事件 @@ -214,11 +215,15 @@ def _process_auth_success(self, username: str, credentials: AuthCredentials) -> # 检查用户是否存在,如果不存在且当前为密码认证时则创建新用户 user = self.user_oper.get_by_name(name=username) if user: + # 如果用户存在,但是已经被禁用,则直接响应 + if not user.is_active: + logger.info(f"辅助认证失败,用户 {username} 已被禁用") + return False anonymized_token = f"{token[:len(token) // 2]}********" logger.info( f"认证类型:{credentials.grant_type},用户:{username},渠道:{channel}," f"服务:{service} 认证成功,token:{anonymized_token}") - return False + return True else: if credentials.grant_type == "password": self.user_oper.add(name=username, is_active=True, is_superuser=False,