Skip to content

Commit

Permalink
Merge pull request #283 from intuitem/feat/implement-knox-token-authe…
Browse files Browse the repository at this point in the history
…ntication

Implement django-rest-knox token authentication
  • Loading branch information
ab-smith authored Apr 18, 2024
2 parents 46f5cae + 14c0ac5 commit 208783c
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 134 deletions.
121 changes: 85 additions & 36 deletions backend/app_tests/api/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from knox.auth import AuthToken
import pytest
import json
import re
Expand Down Expand Up @@ -28,20 +29,32 @@ def get_object_urn(object_name: str, resolved: bool = True):
return f"{reverse(LIBRARIES_ENDPOINT)}{urn}/" if resolved else eval(urn)

@pytest.mark.django_db
def get_test_client_and_folder(authenticated_client, role: str, test_folder_name: str, assigned_folder_name: str = "test"):
def get_test_client_and_folder(
authenticated_client,
role: str,
test_folder_name: str,
assigned_folder_name: str = "test",
):
"""Get an authenticated client with a specific role and the folder associated to the role"""
from iam.models import Folder, User, UserGroup

EndpointTestsQueries.Auth.create_object(
authenticated_client, "Folders", Folder, {"name": assigned_folder_name},
item_search_field="name"
authenticated_client,
"Folders",
Folder,
{"name": assigned_folder_name},
item_search_field="name",
)
assigned_folder = test_folder = Folder.objects.get(name=assigned_folder_name)

if test_folder_name != assigned_folder_name:
EndpointTestsQueries.Auth.create_object(
authenticated_client, "Folders", Folder, {"name": test_folder_name}, base_count=1,
item_search_field="name"
authenticated_client,
"Folders",
Folder,
{"name": test_folder_name},
base_count=1,
item_search_field="name",
)
test_folder = Folder.objects.get(name=test_folder_name)

Expand All @@ -51,25 +64,39 @@ def get_test_client_and_folder(authenticated_client, role: str, test_folder_name
folder=Folder.objects.get(name=GROUPS_PERMISSIONS[role]["folder"]),
).user_set.add(user)
client = APIClient()
client.force_login(user)
_auth_token = AuthToken.objects.create(user=user)
auth_token = _auth_token[1]
client.credentials(HTTP_AUTHORIZATION=f"Token {auth_token}")
return client, test_folder, assigned_folder

def expected_request_response(
action: str, object: str, scope: str, user_group: str, expected_status: int = status.HTTP_200_OK
action: str,
object: str,
scope: str,
user_group: str,
expected_status: int = status.HTTP_200_OK,
):
"""Get the expected request response for a specific action on an object for a specific user group"""
perm_name = f"{action}_{get_singular_name(object).lower().replace(' ', '')}"

if perm_name in GROUPS_PERMISSIONS[user_group]["perms"]:
# User has permission to perform the action
if (GROUPS_PERMISSIONS[user_group]["folder"] == "Global") or (scope == GROUPS_PERMISSIONS[user_group]["folder"]) or (scope == "Global"):
if (
(GROUPS_PERMISSIONS[user_group]["folder"] == "Global")
or (scope == GROUPS_PERMISSIONS[user_group]["folder"])
or (scope == "Global")
):
# User has access to the domain
return False, expected_status, "ok"
else:
return False, expected_status, "outside_scope"
else:
# User has not permission to perform the action
if (GROUPS_PERMISSIONS[user_group]["folder"] == "Global") or (scope == GROUPS_PERMISSIONS[user_group]["folder"]) or (scope == "Global"):
if (
(GROUPS_PERMISSIONS[user_group]["folder"] == "Global")
or (scope == GROUPS_PERMISSIONS[user_group]["folder"])
or (scope == "Global")
):
# User has access to the domain
return True, status.HTTP_403_FORBIDDEN, "permission_denied"
else:
Expand Down Expand Up @@ -101,7 +128,7 @@ def get_object(
response = client.get(url)

assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} are accessible without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -133,7 +160,7 @@ def get_object(
response = client.get(url)

assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} are accessible without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand All @@ -159,7 +186,7 @@ def create_object(

# Asserts that the user was not created
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be created without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -215,7 +242,7 @@ def update_object(

# Asserts that the user was not updated
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be updated without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -267,7 +294,7 @@ def delete_object(

# Asserts that the user was not deleted
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be deleted without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand All @@ -293,7 +320,7 @@ def import_object(client, verbose_name: str, urn: str = None):

# Asserts that the object was imported successfully
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be imported without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -330,11 +357,13 @@ def get_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -434,7 +463,11 @@ def get_object(
response.json()["count"] == base_count + 1
), f"{verbose_name} are not accessible with authentication"

if not (fails or user_perm_fails) and user_perm_reason != "outside_scope" and len(response.json()["results"]) != 0:
if (
not (fails or user_perm_fails)
and user_perm_reason != "outside_scope"
and len(response.json()["results"]) != 0
):
params = {**build_params, **test_params}
if len(response.json()["results"]) > 0 and item_search_field:
response_item = [
Expand Down Expand Up @@ -479,7 +512,7 @@ def get_object_options(
(
user_perm_fails,
user_perm_expected_status,
_
_,
) = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -542,11 +575,13 @@ def create_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"add", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -589,8 +624,9 @@ def create_object(
if not (fails or user_perm_fails):
if user_perm_reason == "outside_scope":
assert (
response.json()['folder'] == 'You do not have permission to create objects in this folder'
), f"{verbose_name} can be created outside the domain"
response.json()["folder"]
== "You do not have permission to create objects in this folder"
), f"{verbose_name} can be created outside the domain"
else:
for key, value in build_params.items():
if key == "attachment":
Expand All @@ -605,9 +641,9 @@ def create_object(
), f"{verbose_name} {key.replace('_', ' ')} returned by the API after object creation don't match the provided {key.replace('_', ' ')}"

# Checks that the object was created in the database
assert (
object.objects.filter(id=response.json()["id"]).exists()
), f"{verbose_name} created with the API are not saved in the database"
assert object.objects.filter(
id=response.json()["id"]
).exists(), f"{verbose_name} created with the API are not saved in the database"

# Uses the API endpoint to assert that the created object is accessible
response = authenticated_client.get(url)
Expand Down Expand Up @@ -670,11 +706,13 @@ def update_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"change", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -705,14 +743,18 @@ def update_object(

response = authenticated_client.get(url)

view_perms = EndpointTestsUtils.expected_request_response("view", verbose_name, scope, user_group)
view_perms = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group
)
if not user_group or view_perms[:2] == (False, status.HTTP_200_OK):
if view_perms[2] == "outside_scope":
assert (
response.status_code == status.HTTP_404_NOT_FOUND
), f"{verbose_name} object detail can be accessed outside the domain"
else:
if (verbose_name is not "Users"): # Users don't have permission to view users details
if (
verbose_name is not "Users"
): # Users don't have permission to view users details
assert (
response.status_code == status.HTTP_200_OK
), f"{verbose_name} object detail can not be accessed with permission"
Expand All @@ -724,7 +766,8 @@ def update_object(
if not (fails or user_perm_fails):
if view_perms[2] == "outside_scope":
assert (
response.json() == {'detail': f'No {object.__name__} matches the given query.'}
response.json()
== {"detail": f"No {object.__name__} matches the given query."}
), f"{verbose_name} object detail can be accessed outside the domain"
else:
for key, value in {**build_params, **test_build_params}.items():
Expand Down Expand Up @@ -801,11 +844,13 @@ def delete_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"delete", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -838,14 +883,18 @@ def delete_object(
# Asserts that the objects exists
response = authenticated_client.get(url)

view_perms = EndpointTestsUtils.expected_request_response("view", verbose_name, scope, user_group)
view_perms = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group
)
if not user_group or view_perms[:2] == (False, status.HTTP_200_OK):
if view_perms[2] == "outside_scope":
assert (
response.status_code == status.HTTP_404_NOT_FOUND
), f"{verbose_name} object detail can be accessed outside the domain"
else:
if (verbose_name is not "Users"): # Users don't have permission to view users details
if (
verbose_name is not "Users"
): # Users don't have permission to view users details
assert (
response.status_code == status.HTTP_200_OK
), f"{verbose_name} object detail can not be accessed with permission"
Expand Down Expand Up @@ -908,7 +957,7 @@ def import_object(
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"add", "library", scope, user_group, expected_status
)
Expand Down
19 changes: 15 additions & 4 deletions backend/app_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from test_vars import GROUPS_PERMISSIONS
from iam.models import User, UserGroup
from core.apps import startup
from knox.auth import AuthToken


class Test(dict):
Expand All @@ -30,14 +31,24 @@ def authenticated_client(app_config):
admin = User.objects.create_superuser("[email protected]")
UserGroup.objects.get(name="BI-UG-ADM").user_set.add(admin)
client = APIClient()
client.force_login(admin)
_auth_token = AuthToken.objects.create(user=admin)
auth_token = _auth_token[1]
client.credentials(HTTP_AUTHORIZATION=f"Token {auth_token}")
return client


@pytest.fixture(
params=[(role, folder) for role in GROUPS_PERMISSIONS.keys() for folder in ["test", "test_outside_domain"]],
ids=[GROUPS_PERMISSIONS[key]["name"]+folder_name for key in GROUPS_PERMISSIONS.keys() for folder_name in ["", "_outside_domain"]]
)
params=[
(role, folder)
for role in GROUPS_PERMISSIONS.keys()
for folder in ["test", "test_outside_domain"]
],
ids=[
GROUPS_PERMISSIONS[key]["name"] + folder_name
for key in GROUPS_PERMISSIONS.keys()
for folder_name in ["", "_outside_domain"]
],
)
def test(authenticated_client, request) -> Test:
"""Get the elements used by the tests such as client and associated folder"""
client, folder, assigned_folder = EndpointTestsUtils.get_test_client_and_folder(
Expand Down
Loading

0 comments on commit 208783c

Please sign in to comment.