Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mechanism for deactivating users #23

Merged
merged 11 commits into from
Feb 2, 2024
10 changes: 9 additions & 1 deletion ldap_jwt_auth/auth/jwt_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import jwt
from cryptography.hazmat.primitives import serialization

from ldap_jwt_auth.auth.authentication import Authentication
from ldap_jwt_auth.core.config import config
from ldap_jwt_auth.core.constants import PRIVATE_KEY, PUBLIC_KEY
from ldap_jwt_auth.core.exceptions import InvalidJWTError, JWTRefreshError
from ldap_jwt_auth.core.exceptions import InvalidJWTError, JWTRefreshError, UserNotActiveError

logger = logging.getLogger()

Expand Down Expand Up @@ -54,8 +55,15 @@ def refresh_access_token(self, access_token: str, refresh_token: str):
"""
logger.info("Refreshing access token")
self.verify_token(refresh_token)

try:
payload = self._get_jwt_payload(access_token, {"verify_exp": False})

authentication = Authentication()
joelvdavies marked this conversation as resolved.
Show resolved Hide resolved
username = payload["username"]
if not authentication.is_user_active(username):
raise UserNotActiveError(f"The provided username '{username}' is not part of the active usernames")

payload["exp"] = datetime.now(timezone.utc) + timedelta(
minutes=config.authentication.access_token_validity_minutes
)
Expand Down
6 changes: 5 additions & 1 deletion ldap_jwt_auth/routers/refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fastapi.responses import JSONResponse

from ldap_jwt_auth.auth.jwt_handler import JWTHandler
from ldap_jwt_auth.core.exceptions import JWTRefreshError, InvalidJWTError
from ldap_jwt_auth.core.exceptions import JWTRefreshError, InvalidJWTError, ActiveUsernamesFileNotFoundError

logger = logging.getLogger()

Expand Down Expand Up @@ -37,3 +37,7 @@ def refresh_access_token(
message = "Unable to refresh access token"
logger.exception(message)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message) from exc
except ActiveUsernamesFileNotFoundError as exc:
message = "Something went wrong"
logger.exception(message)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message) from exc
23 changes: 21 additions & 2 deletions test/unit/auth/test_jwt_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,44 @@ def test_get_refresh_token(datetime_mock):
assert refresh_token == EXPECTED_REFRESH_TOKEN


@patch("ldap_jwt_auth.auth.jwt_handler.Authentication.is_user_active")
@patch("ldap_jwt_auth.auth.jwt_handler.datetime")
def test_refresh_access_token(datetime_mock):
def test_refresh_access_token(datetime_mock, is_user_active_mock):
"""
Test refreshing an expired access token with a valid refresh token.
"""
datetime_mock.now.return_value = mock_datetime_now()
is_user_active_mock.return_value = True

jwt_handler = JWTHandler()
access_token = jwt_handler.refresh_access_token(EXPIRED_ACCESS_TOKEN, VALID_REFRESH_TOKEN)

assert access_token == EXPECTED_ACCESS_TOKEN


@patch("ldap_jwt_auth.auth.jwt_handler.Authentication.is_user_active")
def test_refresh_access_token_with_not_active_username(is_user_active_mock):
"""
Test refreshing an access token when username is not active.
:param is_user_active_mock:
:return:
"""
is_user_active_mock.return_value = False

jwt_handler = JWTHandler()
with pytest.raises(JWTRefreshError) as exc:
jwt_handler.refresh_access_token(EXPIRED_ACCESS_TOKEN, VALID_REFRESH_TOKEN)
assert str(exc.value) == "Unable to refresh access token"


@patch("ldap_jwt_auth.auth.jwt_handler.Authentication.is_user_active")
@patch("ldap_jwt_auth.auth.jwt_handler.datetime")
def test_refresh_access_token_with_valid_access_token(datetime_mock):
def test_refresh_access_token_with_valid_access_token(datetime_mock, is_user_active_mock):
"""
Test refreshing a valid access token with a valid refresh token.
"""
datetime_mock.now.return_value = mock_datetime_now()
is_user_active_mock.return_value = True

jwt_handler = JWTHandler()
access_token = jwt_handler.refresh_access_token(VALID_ACCESS_TOKEN, VALID_REFRESH_TOKEN)
Expand Down