diff --git a/pyproject.toml b/pyproject.toml index b39af5c..f7f7e4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,8 @@ ignore = [ "RUF012", # treats a link with the word "password" in it as a hardcoded password "S105", + # the limit is too low and dumb. We don't use squared monitors anymore + "E501", ] select = [ # pyflakes @@ -59,6 +61,12 @@ fixable = [ [tool.ruff.lint.isort] combine-as-imports = true lines-between-types = 1 +known-first-party = [ + "accounts", + "central_command", + "commons", + "persistence", +] [tool.mypy] show_column_numbers = true @@ -66,6 +74,7 @@ show_error_codes = true # XXX: add new rules here check_untyped_defs = true +warn_unused_ignores = true plugins = [ "mypy_django_plugin.main", diff --git a/src/accounts/api/serializers.py b/src/accounts/api/serializers.py index 52a7a33..67093ce 100644 --- a/src/accounts/api/serializers.py +++ b/src/accounts/api/serializers.py @@ -22,6 +22,12 @@ class Meta: fields = ("unique_identifier", "username", "password", "email") extra_kwargs = {"password": {"write_only": True}} + def validate_password(self, value): + # Validate the password using Django's built-in validators + temp_user = Account(**self.initial_data) + validate_password(value, temp_user) + return value + def create(self, validated_data): """Create and return a new account""" account: Account = Account.objects.create_user(**validated_data) @@ -59,25 +65,14 @@ class VerifyAccountSerializer(serializers.Serializer): unique_identifier = serializers.CharField() verification_token = serializers.UUIDField() - def validate(self, data): - account = Account.objects.get(unique_identifier=data["unique_identifier"]) - - data_token = data["verification_token"] - account_token = account.verification_token - - if account_token != data_token: - raise serializers.ValidationError( - "Verification token seems invalid or maybe outdated. Try requesting a new one." - ) - return account - class ResetPasswordSerializer(serializers.Serializer): password = serializers.CharField(style={"input_type": "password"}) def validate_password(self, value): # Validate the password using Django's built-in validators - validate_password(value) + temp_user = Account(**self.initial_data) + validate_password(value, temp_user) return value diff --git a/src/accounts/api/views.py b/src/accounts/api/views.py index 898239d..bcb2d20 100644 --- a/src/accounts/api/views.py +++ b/src/accounts/api/views.py @@ -4,8 +4,6 @@ from urllib.parse import urljoin from uuid import uuid4 -from commons.error_response import ErrorResponse -from commons.mail_wrapper import send_email_with_template from django.conf import settings from django.contrib.auth import authenticate from django.core.exceptions import ObjectDoesNotExist, PermissionDenied @@ -19,6 +17,9 @@ from rest_framework.serializers import ValidationError from rest_framework.views import APIView +from commons.error_response import ErrorResponse +from commons.mail_wrapper import send_email_with_template + from ..models import Account, AccountConfirmation, PasswordResetRequestModel from .serializers import ( ConfirmAccountSerializer, @@ -89,13 +90,13 @@ def post(self, request): account: Account | None = authenticate(email=email, password=password) # type: ignore[assignment] if account is None: - return ErrorResponse("Unable to login with provided credentials.", status.HTTP_400_BAD_REQUEST) + return ErrorResponse("Unable to login with provided credentials.", status.HTTP_401_UNAUTHORIZED) if not account.is_confirmed: - return ErrorResponse("You must confirm your email before attempting to login.", status.HTTP_400_BAD_REQUEST) + return ErrorResponse("You must confirm your email before attempting to login.", status.HTTP_401_UNAUTHORIZED) if not account.is_active: - return ErrorResponse("Account is suspended.", status.HTTP_400_BAD_REQUEST) + return ErrorResponse("Account is suspended.", status.HTTP_401_UNAUTHORIZED) return Response( { @@ -203,12 +204,17 @@ class VerifyAccountView(GenericAPIView): def post(self, request): serializer = self.get_serializer(data=request.data) + if not serializer.is_valid(): + return ErrorResponse(serializer.errors, status.HTTP_400_BAD_REQUEST) + try: - serializer.is_valid(raise_exception=True) - except ValidationError as e: - return ErrorResponse(str(e), e.status_code) + account = Account.objects.get(unique_identifier=serializer.validated_data["unique_identifier"]) + except Account.DoesNotExist: + return ErrorResponse("Either token or unique_identifier are invalid.", status.HTTP_400_BAD_REQUEST) + + if account.verification_token != serializer.validated_data["verification_token"]: + return ErrorResponse("Either token or unique_identifier are invalid.", status.HTTP_400_BAD_REQUEST) - account = Account.objects.get(unique_identifier=serializer.data["unique_identifier"]) public_data = PublicAccountDataSerializer(account).data return Response(public_data, status=status.HTTP_200_OK) diff --git a/src/accounts/custom_attribute_similarity_validator.py b/src/accounts/custom_attribute_similarity_validator.py new file mode 100644 index 0000000..7d73e37 --- /dev/null +++ b/src/accounts/custom_attribute_similarity_validator.py @@ -0,0 +1,36 @@ +from django.contrib.auth.password_validation import UserAttributeSimilarityValidator +from django.core.exceptions import ValidationError +from django.db.models import Model + +from accounts.models import Account + + +class CustomUserAttributeSimilarityValidator(UserAttributeSimilarityValidator): + def validate(self, password: str, user: Model | None = None): + if user is None: + return + + if not isinstance(user, Account): + raise Exception( + f"User is not an instance of Account: {user}. Check if the right user model is being set at settings.py." + ) + + custom_attributes = [ + user.username, + user.email, + user.unique_identifier, + ] + + for attribute in custom_attributes: + if not attribute or not password: + continue + + password = password.lower() + attribute = attribute.lower() + + if password in attribute or attribute in password: + raise ValidationError( + "The password is too similar to the %(verbose_name)s.", + code="password_too_similar", + params={"verbose_name": attribute}, + ) diff --git a/src/accounts/models.py b/src/accounts/models.py index 93c7de9..8339662 100644 --- a/src/accounts/models.py +++ b/src/accounts/models.py @@ -3,12 +3,13 @@ from urllib.parse import urljoin from uuid import uuid4 -from commons.mail_wrapper import send_email_with_template from django.conf import settings from django.contrib.auth.models import AbstractUser from django.db import models from django.utils import timezone +from commons.mail_wrapper import send_email_with_template + from .validators import AccountNameValidator, UsernameValidator diff --git a/src/central_command/settings.py b/src/central_command/settings.py index edada5a..09aec74 100644 --- a/src/central_command/settings.py +++ b/src/central_command/settings.py @@ -162,6 +162,9 @@ { "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator", }, + { + "NAME": "accounts.custom_attribute_similarity_validator.CustomUserAttributeSimilarityValidator", + }, ] # Internationalization diff --git a/src/commons/error_response.py b/src/commons/error_response.py index 8008208..36e7f60 100644 --- a/src/commons/error_response.py +++ b/src/commons/error_response.py @@ -18,11 +18,11 @@ def custom_exception_handler(exc, context): """ response = exception_handler(exc, context) - if response is not None: - response.data["status_code"] = response.status_code - else: + if response is None: logger.error("An unhandled error occurred: %s", exc) logger.error("Context: %s", context) return ErrorResponse("Something went wrong on our end. Please try again later.") + response.data["status_code"] = response.status_code + return response diff --git a/src/tests/accounts/__init__.py b/src/tests/accounts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tests/accounts/endpoints/__init__.py b/src/tests/accounts/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tests/accounts/endpoints/test_login_credentials.py b/src/tests/accounts/endpoints/test_login_credentials.py new file mode 100644 index 0000000..e4cbe3b --- /dev/null +++ b/src/tests/accounts/endpoints/test_login_credentials.py @@ -0,0 +1,57 @@ +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from accounts.models import Account + + +class LoginCredentialsTest(APITestCase): + def setUp(self): + self.url = reverse("account:login-credentials") + self.valid_account = Account.objects.create_user( + username="validUser", + email="validUser@valid.com", + unique_identifier="validUser", + ) + + self.valid_account.set_password("aValidPss963") + self.valid_account.is_confirmed = True + self.valid_account.save() + self.valid_login_data = { + "email": "validUser@valid.com", + "password": "aValidPss963", + } + + def test_login_with_valid_credentials(self): + response = self.client.post(self.url, self.valid_login_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_login_with_invalid_credentials(self): + data = { + "email": "wrongUser@invalid.com", + "password": "wrongPassword", + } + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_login_with_unconfirmed_account(self): + self.valid_account.is_confirmed = False + self.valid_account.save() + + response = self.client.post(self.url, self.valid_login_data, format="json") + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_login_with_inactive_account(self): + self.valid_account.is_active = False + self.valid_account.save() + + response = self.client.post(self.url, self.valid_login_data, format="json") + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_login_with_invalid_email(self): + data = { + "email": "invalidEmail", + "password": "aValidPss963", + } + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) diff --git a/src/tests/accounts/endpoints/test_login_token.py b/src/tests/accounts/endpoints/test_login_token.py new file mode 100644 index 0000000..fef0233 --- /dev/null +++ b/src/tests/accounts/endpoints/test_login_token.py @@ -0,0 +1,41 @@ +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from accounts.models import Account + + +class LoginTokenTest(APITestCase): + def setUp(self): + self.url = reverse("account:login-token") + self.valid_account = Account.objects.create_user( + username="validUser", + email="validUser@valid.com", + unique_identifier="validUser", + ) + + self.valid_account.set_password("aValidPss963") + self.valid_account.is_confirmed = True + self.valid_account.save() + self.valid_login_data = { + "email": "validUser@valid.com", + "password": "aValidPss963", + } + + # needs to log in with credentials to get the token + response = self.client.post(reverse("account:login-credentials"), self.valid_login_data, format="json") + self.token = response.data["token"] + + def test_login_with_valid_token(self): + self.client.credentials(HTTP_AUTHORIZATION=f"Token {self.token}") + response = self.client.post(self.url, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_login_with_invalid_token(self): + self.client.credentials(HTTP_AUTHORIZATION="Token invalidToken") + response = self.client.post(self.url, format="json") + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_login_without_token(self): + response = self.client.post(self.url, format="json") + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) diff --git a/src/tests/accounts/endpoints/test_register.py b/src/tests/accounts/endpoints/test_register.py new file mode 100644 index 0000000..9c712fe --- /dev/null +++ b/src/tests/accounts/endpoints/test_register.py @@ -0,0 +1,103 @@ +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from accounts.models import Account + + +class RegisterTest(APITestCase): + def setUp(self): + self.url = reverse("account:register") + self.valid_data = { + "email": "validUser@valid.com", + "password": "aValidPss963", + "unique_identifier": "validUser", + "username": "validUser", + } + + def test_register_valid_account(self): + response = self.client.post(self.url, self.valid_data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + created_account = Account.objects.get(email=self.valid_data["email"]) + self.assertIsNotNone(created_account) + self.assertEqual(created_account.username, self.valid_data["username"]) + self.assertEqual(created_account.email, self.valid_data["email"]) + self.assertEqual(created_account.unique_identifier, self.valid_data["unique_identifier"]) + + def test_register_with_invalid_email(self): + data = self.valid_data.copy() + data["email"] = "invalidEmail" + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_register_with_too_common_password(self): + data = self.valid_data.copy() + data["password"] = "password" + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_register_with_password_same_as_unique_identifier(self): + data = self.valid_data.copy() + data["password"] = "validUser" # assuming 'validUser' is the unique_identifier + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_register_with_too_common_admin_password(self): + data = self.valid_data.copy() + data["password"] = "admin123" + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_register_with_too_short_password(self): + data = self.valid_data.copy() + data["password"] = "short" + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_register_with_valid_password(self): + data = self.valid_data.copy() + data["password"] = "aValidPss963" + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_register_with_existing_email(self): + data = self.valid_data.copy() + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_register_with_existing_unique_identifier(self): + data = self.valid_data.copy() + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + data["email"] = "another-email@email.com" + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_register_with_existing_username(self): + data = self.valid_data.copy() + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + data["email"] = "another-email@mail.com" + data["unique_identifier"] = "anotherUnique" + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) # existing username is allowed + + def test_register_with_missing_field(self): + data = self.valid_data.copy() + for field in data: + data = self.valid_data.copy() + data.pop(field) + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_confirmed_state_at_registering(self): + data = self.valid_data.copy() + response = self.client.post(self.url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + created_account = Account.objects.get(email=self.valid_data["email"]) + self.assertFalse(created_account.is_confirmed) diff --git a/src/tests/accounts/endpoints/test_reset_password.py b/src/tests/accounts/endpoints/test_reset_password.py new file mode 100644 index 0000000..ffda7e4 --- /dev/null +++ b/src/tests/accounts/endpoints/test_reset_password.py @@ -0,0 +1,63 @@ +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from accounts.models import Account, PasswordResetRequestModel + + +def get_reset_url(token): + return reverse("account:reset-password-token", kwargs={"reset_token": token}) + + +class PasswordResetTest(APITestCase): + def setUp(self): + self.valid_account = Account.objects.create_user( + username="validUser", + email="validUser@valid.com", + unique_identifier="validUser", + ) + self.valid_account.set_password("aValidPss963") + self.valid_account.is_confirmed = True + self.valid_account.save() + + self.valid_login_data = { + "email": "validUser@valid.com", + "password": "aValidPss963", + } + + self.url_request = reverse("account:reset-password") + + def test_request_password_reset_with_valid_email(self): + data = {"email": "validUser@valid.com"} + + response = self.client.post(self.url_request, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + self.assertEqual(PasswordResetRequestModel.objects.count(), 1) + password_reset_request = PasswordResetRequestModel.objects.first() + self.assertIsNotNone(password_reset_request) + + def test_request_password_reset_with_invalid_email(self): + data = {"email": "invalid@mail.com"} + + response = self.client.post(self.url_request, data, format="json") + self.assertEqual( + response.status_code, status.HTTP_200_OK + ) # We don't want to give away if an email is valid or not + self.assertEqual(PasswordResetRequestModel.objects.count(), 0) # No password reset request should be created + + def test_reset_password_with_valid_token(self): + data = {"email": "validUser@valid.com"} + + self.client.post(self.url_request, data, format="json") + reset_token = PasswordResetRequestModel.objects.first().token # type: ignore + url = get_reset_url(reset_token) + + data = {"password": "newPss963!"} + + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertTrue(Account.objects.count(), 1) # there is only one account in the database + self.assertEqual(PasswordResetRequestModel.objects.count(), 0) # The reset token should be deleted after use + account = Account.objects.first() + self.assertTrue(account.check_password(data["password"])) # type: ignore diff --git a/src/tests/accounts/endpoints/test_server_verification_token.py b/src/tests/accounts/endpoints/test_server_verification_token.py new file mode 100644 index 0000000..bd50d1a --- /dev/null +++ b/src/tests/accounts/endpoints/test_server_verification_token.py @@ -0,0 +1,65 @@ +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from accounts.models import Account + + +class ServerVerificationTokenTest(APITestCase): + def setUp(self): + self.valid_account = Account.objects.create_user( + username="validUser", + email="validUser@valid.com", + unique_identifier="validUser", + ) + self.valid_account.set_password("aValidPss963") + self.valid_account.is_confirmed = True + self.valid_account.save() + + self.valid_login_data = { + "email": "validUser@valid.com", + "password": "aValidPss963", + } + + self.url_request = reverse("account:request-verification-token") + self.url_verify = reverse("account:verify-account") + + response = self.client.post(reverse("account:login-credentials"), self.valid_login_data, format="json") + self.token = response.data["token"] + + def test_request_verification_token(self): + self.client.credentials(HTTP_AUTHORIZATION=f"Token {self.token}") + response = self.client.get(self.url_request, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_verify_account(self): + self.client.credentials(HTTP_AUTHORIZATION=f"Token {self.token}") + response = self.client.get(self.url_request, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("verification_token", response.data) + verification_token = response.data["verification_token"] + + data = {"verification_token": verification_token, "unique_identifier": self.valid_account.unique_identifier} + response = self.client.post(self.url_verify, data, format="json") + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_get_ver_token_without_auth(self): + response = self.client.get(self.url_request, format="json") + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_verify_with_invalid_token(self): + + data = {"verification_token": "invalidToken", "unique_identifier": self.valid_account.unique_identifier} + response = self.client.post(self.url_verify, data, format="json") + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_verify_with_invalid_identifier(self): + self.client.credentials(HTTP_AUTHORIZATION=f"Token {self.token}") + response = self.client.get(self.url_request, format="json") + + data = {"verification_token": response.data["verification_token"], "unique_identifier": "invalidIdentifier"} + response = self.client.post(self.url_verify, data, format="json") + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) diff --git a/src/tests/test_username_validation.py b/src/tests/test_username_validation.py index d9fbb9c..c808040 100644 --- a/src/tests/test_username_validation.py +++ b/src/tests/test_username_validation.py @@ -1,6 +1,7 @@ -from accounts.validators import IDENTIFIER_REGEX, USERNAME_REGEX from django.test import TestCase +from accounts.validators import IDENTIFIER_REGEX, USERNAME_REGEX + class IdentifierRegexTest(TestCase): # a valid identifier has at least 3 characters with letters, numbers, dashes, underscores, dots and no other special