From 6a70bafd95f06b7a5fee9b6c99033dc32f72cd1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20P=C3=B6lzl?= Date: Fri, 13 Oct 2023 14:51:55 +0200 Subject: [PATCH] Added logic and test cases for generating and clearing tokens programmatically --- django_rest_passwordreset/views.py | 121 ++++++++++++++++------------- tests/test/test_auth_test_case.py | 45 ++++++++++- 2 files changed, 109 insertions(+), 57 deletions(-) diff --git a/django_rest_passwordreset/views.py b/django_rest_passwordreset/views.py index a2a4480..541a8fa 100644 --- a/django_rest_passwordreset/views.py +++ b/django_rest_passwordreset/views.py @@ -47,6 +47,60 @@ def _unicode_ci_compare(s1, s2): return normalized1.casefold() == normalized2.casefold() +def clear_expired_tokens(): + """ + Delete all existing expired tokens + """ + password_reset_token_validation_time = get_password_reset_token_expiry_time() + + # datetime.now minus expiry hours + now_minus_expiry_time = timezone.now() - timedelta(hours=password_reset_token_validation_time) + + # delete all tokens where created_at < now - 24 hours + clear_expired(now_minus_expiry_time) + + +def generate_token_for_email(email, user_agent='', ip_address=''): + # find a user by email address (case-insensitive search) + users = User.objects.filter(**{'{}__iexact'.format(get_password_reset_lookup_field()): email}) + + active_user_found = False + + # iterate over all users and check if there is any user that is active + # also check whether the password can be changed (is usable), as there could be users that are not allowed + # to change their password (e.g., LDAP user) + for user in users: + if user.eligible_for_reset(): + active_user_found = True + break + + # No active user found, raise a ValidationError + # but not if DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE == True + if not active_user_found and not getattr(settings, 'DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE', False): + raise exceptions.ValidationError({ + 'email': [_( + "We couldn't find an account associated with that email. Please try a different e-mail address.")], + }) + + # last but not least: iterate over all users that are active and can change their password + # and create a Reset Password Token and send a signal with the created token + for user in users: + if user.eligible_for_reset() and _unicode_ci_compare(email, getattr(user, get_password_reset_lookup_field())): + password_reset_tokens = user.password_reset_tokens.all() + + # check if the user already has a token + if password_reset_tokens.count(): + # yes, already has a token, re-use this token + return password_reset_tokens.first() + + # no token exists, generate a new token + return ResetPasswordToken.objects.create( + user=user, + user_agent=user_agent, + ip_address=ip_address, + ) + + class ResetPasswordValidateToken(GenericAPIView): """ An Api View which provides a method to verify that a token is valid @@ -128,61 +182,18 @@ class ResetPasswordRequestToken(GenericAPIView): def post(self, request, *args, **kwargs): serializer = self.serializer_class(data=request.data) serializer.is_valid(raise_exception=True) - email = serializer.validated_data['email'] - - # before we continue, delete all existing expired tokens - password_reset_token_validation_time = get_password_reset_token_expiry_time() - - # datetime.now minus expiry hours - now_minus_expiry_time = timezone.now() - timedelta(hours=password_reset_token_validation_time) - - # delete all tokens where created_at < now - 24 hours - clear_expired(now_minus_expiry_time) - - # find a user by email address (case insensitive search) - users = User.objects.filter(**{'{}__iexact'.format(get_password_reset_lookup_field()): email}) - - active_user_found = False - - # iterate over all users and check if there is any user that is active - # also check whether the password can be changed (is useable), as there could be users that are not allowed - # to change their password (e.g., LDAP user) - for user in users: - if user.eligible_for_reset(): - active_user_found = True - break - - # No active user found, raise a validation error - # but not if DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE == True - if not active_user_found and not getattr(settings, 'DJANGO_REST_PASSWORDRESET_NO_INFORMATION_LEAKAGE', False): - raise exceptions.ValidationError({ - 'email': [_( - "We couldn't find an account associated with that email. Please try a different e-mail address.")], - }) - - # last but not least: iterate over all users that are active and can change their password - # and create a Reset Password Token and send a signal with the created token - for user in users: - if user.eligible_for_reset() and \ - _unicode_ci_compare(email, getattr(user, get_password_reset_lookup_field())): - # define the token as none for now - token = None - - # check if the user already has a token - if user.password_reset_tokens.all().count() > 0: - # yes, already has a token, re-use this token - token = user.password_reset_tokens.all()[0] - else: - # no token exists, generate a new token - token = ResetPasswordToken.objects.create( - user=user, - user_agent=request.META.get(HTTP_USER_AGENT_HEADER, ''), - ip_address=request.META.get(HTTP_IP_ADDRESS_HEADER, ''), - ) - # send a signal that the password token was created - # let whoever receives this signal handle sending the email for the password reset - reset_password_token_created.send(sender=self.__class__, instance=self, reset_password_token=token) - # done + + clear_expired_tokens() + token = generate_token_for_email( + email=serializer.validated_data['email'], + user_agent=request.META.get(HTTP_USER_AGENT_HEADER, ''), + ip_address=request.META.get(HTTP_IP_ADDRESS_HEADER, ''), + ) + + # send a signal that the password token was created + # let whoever receives this signal handle sending the email for the password reset + reset_password_token_created.send(sender=self.__class__, instance=self, reset_password_token=token) + return Response({'status': 'OK'}) diff --git a/tests/test/test_auth_test_case.py b/tests/test/test_auth_test_case.py index aee413d..868049d 100644 --- a/tests/test/test_auth_test_case.py +++ b/tests/test/test_auth_test_case.py @@ -1,11 +1,14 @@ import json +from datetime import timedelta from django.contrib.auth.models import User from django.test import override_settings +from django.utils import timezone from rest_framework import status from rest_framework.test import APITestCase -from django_rest_passwordreset.models import ResetPasswordToken +from django_rest_passwordreset.models import ResetPasswordToken, get_password_reset_token_expiry_time +from django_rest_passwordreset.views import clear_expired_tokens, generate_token_for_email from tests.test.helpers import HelperMixin, patch @@ -67,7 +70,7 @@ def test_validate_token(self, mock_reset_password_token_created): # there should be one token self.assertEqual(ResetPasswordToken.objects.all().count(), 1) - # try to login with the old username/password (should work) + # try to log in with the old username/password (should work) self.assertTrue( self.django_check_login("user1", "secret1"), msg="User 1 should still be able to login with the old credentials" @@ -386,3 +389,41 @@ def test_user_without_password_where_not_required(self, mock_reset_password_toke self.django_check_login("user4", "new_secret"), msg="User 4 should be able to login with the modified credentials" ) + + def test_clear_expired_tokens(self): + """ Tests clearance of expired tokens """ + + password_reset_token_validation_time = get_password_reset_token_expiry_time() + + # there should be zero tokens + self.assertEqual(ResetPasswordToken.objects.all().count(), 0) + + # request a new token + response = self.rest_do_request_reset_token(email="user1@mail.com") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # there should be one token + self.assertEqual(ResetPasswordToken.objects.all().count(), 1) + + # let the token expire + token = ResetPasswordToken.objects.all().first() + token.created_at = timezone.now() - timedelta(hours=password_reset_token_validation_time) + token.save() + + # clear expired tokens + clear_expired_tokens() + + # there should be zero tokens + self.assertEqual(ResetPasswordToken.objects.all().count(), 0) + + def test_generate_token_for_email(self): + """ Tests generating tokens for a specific email address programmatically """ + + # there should be zero tokens + self.assertEqual(ResetPasswordToken.objects.all().count(), 0) + + # request a new token + generate_token_for_email(email="user1@mail.com") + + # there should be one token + self.assertEqual(ResetPasswordToken.objects.all().count(), 1)