Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement django-rest-knox token authentication #283

Merged
merged 9 commits into from
Apr 18, 2024
Merged
121 changes: 85 additions & 36 deletions backend/app_tests/api/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from knox.auth import AuthToken
import pytest
import json
import re
Expand Down Expand Up @@ -28,20 +29,32 @@ def get_object_urn(object_name: str, resolved: bool = True):
return f"{reverse(LIBRARIES_ENDPOINT)}{urn}/" if resolved else eval(urn)

@pytest.mark.django_db
def get_test_client_and_folder(authenticated_client, role: str, test_folder_name: str, assigned_folder_name: str = "test"):
def get_test_client_and_folder(
authenticated_client,
role: str,
test_folder_name: str,
assigned_folder_name: str = "test",
):
"""Get an authenticated client with a specific role and the folder associated to the role"""
from iam.models import Folder, User, UserGroup

EndpointTestsQueries.Auth.create_object(
authenticated_client, "Folders", Folder, {"name": assigned_folder_name},
item_search_field="name"
authenticated_client,
"Folders",
Folder,
{"name": assigned_folder_name},
item_search_field="name",
)
assigned_folder = test_folder = Folder.objects.get(name=assigned_folder_name)

if test_folder_name != assigned_folder_name:
EndpointTestsQueries.Auth.create_object(
authenticated_client, "Folders", Folder, {"name": test_folder_name}, base_count=1,
item_search_field="name"
authenticated_client,
"Folders",
Folder,
{"name": test_folder_name},
base_count=1,
item_search_field="name",
)
test_folder = Folder.objects.get(name=test_folder_name)

Expand All @@ -51,25 +64,39 @@ def get_test_client_and_folder(authenticated_client, role: str, test_folder_name
folder=Folder.objects.get(name=GROUPS_PERMISSIONS[role]["folder"]),
).user_set.add(user)
client = APIClient()
client.force_login(user)
_auth_token = AuthToken.objects.create(user=user)
auth_token = _auth_token[1]
client.credentials(HTTP_AUTHORIZATION=f"Token {auth_token}")
return client, test_folder, assigned_folder

def expected_request_response(
action: str, object: str, scope: str, user_group: str, expected_status: int = status.HTTP_200_OK
action: str,
object: str,
scope: str,
user_group: str,
expected_status: int = status.HTTP_200_OK,
):
"""Get the expected request response for a specific action on an object for a specific user group"""
perm_name = f"{action}_{get_singular_name(object).lower().replace(' ', '')}"

if perm_name in GROUPS_PERMISSIONS[user_group]["perms"]:
# User has permission to perform the action
if (GROUPS_PERMISSIONS[user_group]["folder"] == "Global") or (scope == GROUPS_PERMISSIONS[user_group]["folder"]) or (scope == "Global"):
if (
(GROUPS_PERMISSIONS[user_group]["folder"] == "Global")
or (scope == GROUPS_PERMISSIONS[user_group]["folder"])
or (scope == "Global")
):
# User has access to the domain
return False, expected_status, "ok"
else:
return False, expected_status, "outside_scope"
else:
# User has not permission to perform the action
if (GROUPS_PERMISSIONS[user_group]["folder"] == "Global") or (scope == GROUPS_PERMISSIONS[user_group]["folder"]) or (scope == "Global"):
if (
(GROUPS_PERMISSIONS[user_group]["folder"] == "Global")
or (scope == GROUPS_PERMISSIONS[user_group]["folder"])
or (scope == "Global")
):
# User has access to the domain
return True, status.HTTP_403_FORBIDDEN, "permission_denied"
else:
Expand Down Expand Up @@ -101,7 +128,7 @@ def get_object(
response = client.get(url)

assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} are accessible without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -133,7 +160,7 @@ def get_object(
response = client.get(url)

assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} are accessible without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand All @@ -159,7 +186,7 @@ def create_object(

# Asserts that the user was not created
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be created without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -215,7 +242,7 @@ def update_object(

# Asserts that the user was not updated
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be updated without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -267,7 +294,7 @@ def delete_object(

# Asserts that the user was not deleted
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be deleted without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand All @@ -293,7 +320,7 @@ def import_object(client, verbose_name: str, urn: str = None):

# Asserts that the object was imported successfully
assert (
response.status_code == status.HTTP_403_FORBIDDEN
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"{verbose_name} can be imported without authentication"
assert response.json() == {
"detail": "Authentication credentials were not provided."
Expand Down Expand Up @@ -330,11 +357,13 @@ def get_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -434,7 +463,11 @@ def get_object(
response.json()["count"] == base_count + 1
), f"{verbose_name} are not accessible with authentication"

if not (fails or user_perm_fails) and user_perm_reason != "outside_scope" and len(response.json()["results"]) != 0:
if (
not (fails or user_perm_fails)
and user_perm_reason != "outside_scope"
and len(response.json()["results"]) != 0
):
params = {**build_params, **test_params}
if len(response.json()["results"]) > 0 and item_search_field:
response_item = [
Expand Down Expand Up @@ -479,7 +512,7 @@ def get_object_options(
(
user_perm_fails,
user_perm_expected_status,
_
_,
) = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -542,11 +575,13 @@ def create_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"add", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -589,8 +624,9 @@ def create_object(
if not (fails or user_perm_fails):
if user_perm_reason == "outside_scope":
assert (
response.json()['folder'] == 'You do not have permission to create objects in this folder'
), f"{verbose_name} can be created outside the domain"
response.json()["folder"]
== "You do not have permission to create objects in this folder"
), f"{verbose_name} can be created outside the domain"
else:
for key, value in build_params.items():
if key == "attachment":
Expand All @@ -605,9 +641,9 @@ def create_object(
), f"{verbose_name} {key.replace('_', ' ')} returned by the API after object creation don't match the provided {key.replace('_', ' ')}"

# Checks that the object was created in the database
assert (
object.objects.filter(id=response.json()["id"]).exists()
), f"{verbose_name} created with the API are not saved in the database"
assert object.objects.filter(
id=response.json()["id"]
).exists(), f"{verbose_name} created with the API are not saved in the database"

# Uses the API endpoint to assert that the created object is accessible
response = authenticated_client.get(url)
Expand Down Expand Up @@ -670,11 +706,13 @@ def update_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"change", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -705,14 +743,18 @@ def update_object(

response = authenticated_client.get(url)

view_perms = EndpointTestsUtils.expected_request_response("view", verbose_name, scope, user_group)
view_perms = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group
)
if not user_group or view_perms[:2] == (False, status.HTTP_200_OK):
if view_perms[2] == "outside_scope":
assert (
response.status_code == status.HTTP_404_NOT_FOUND
), f"{verbose_name} object detail can be accessed outside the domain"
else:
if (verbose_name is not "Users"): # Users don't have permission to view users details
if (
verbose_name is not "Users"
): # Users don't have permission to view users details
assert (
response.status_code == status.HTTP_200_OK
), f"{verbose_name} object detail can not be accessed with permission"
Expand All @@ -724,7 +766,8 @@ def update_object(
if not (fails or user_perm_fails):
if view_perms[2] == "outside_scope":
assert (
response.json() == {'detail': f'No {object.__name__} matches the given query.'}
response.json()
== {"detail": f"No {object.__name__} matches the given query."}
), f"{verbose_name} object detail can be accessed outside the domain"
else:
for key, value in {**build_params, **test_build_params}.items():
Expand Down Expand Up @@ -801,11 +844,13 @@ def delete_object(
user_perm_fails, user_perm_expected_status, user_perm_reason = None, 0, None

if user_group:
scope = scope or str(build_params.get("folder", None)) # if the scope is not provided, try to get it from the build_params
scope = scope or str(
build_params.get("folder", None)
) # if the scope is not provided, try to get it from the build_params
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"delete", verbose_name, scope, user_group, expected_status
)
Expand Down Expand Up @@ -838,14 +883,18 @@ def delete_object(
# Asserts that the objects exists
response = authenticated_client.get(url)

view_perms = EndpointTestsUtils.expected_request_response("view", verbose_name, scope, user_group)
view_perms = EndpointTestsUtils.expected_request_response(
"view", verbose_name, scope, user_group
)
if not user_group or view_perms[:2] == (False, status.HTTP_200_OK):
if view_perms[2] == "outside_scope":
assert (
response.status_code == status.HTTP_404_NOT_FOUND
), f"{verbose_name} object detail can be accessed outside the domain"
else:
if (verbose_name is not "Users"): # Users don't have permission to view users details
if (
verbose_name is not "Users"
): # Users don't have permission to view users details
assert (
response.status_code == status.HTTP_200_OK
), f"{verbose_name} object detail can not be accessed with permission"
Expand Down Expand Up @@ -908,7 +957,7 @@ def import_object(
(
user_perm_fails,
user_perm_expected_status,
user_perm_reason
user_perm_reason,
) = EndpointTestsUtils.expected_request_response(
"add", "library", scope, user_group, expected_status
)
Expand Down
19 changes: 15 additions & 4 deletions backend/app_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from test_vars import GROUPS_PERMISSIONS
from iam.models import User, UserGroup
from core.apps import startup
from knox.auth import AuthToken


class Test(dict):
Expand All @@ -30,14 +31,24 @@ def authenticated_client(app_config):
admin = User.objects.create_superuser("[email protected]")
UserGroup.objects.get(name="BI-UG-ADM").user_set.add(admin)
client = APIClient()
client.force_login(admin)
_auth_token = AuthToken.objects.create(user=admin)
auth_token = _auth_token[1]
client.credentials(HTTP_AUTHORIZATION=f"Token {auth_token}")
return client


@pytest.fixture(
params=[(role, folder) for role in GROUPS_PERMISSIONS.keys() for folder in ["test", "test_outside_domain"]],
ids=[GROUPS_PERMISSIONS[key]["name"]+folder_name for key in GROUPS_PERMISSIONS.keys() for folder_name in ["", "_outside_domain"]]
)
params=[
(role, folder)
for role in GROUPS_PERMISSIONS.keys()
for folder in ["test", "test_outside_domain"]
],
ids=[
GROUPS_PERMISSIONS[key]["name"] + folder_name
for key in GROUPS_PERMISSIONS.keys()
for folder_name in ["", "_outside_domain"]
],
)
def test(authenticated_client, request) -> Test:
"""Get the elements used by the tests such as client and associated folder"""
client, folder, assigned_folder = EndpointTestsUtils.get_test_client_and_folder(
Expand Down
Loading
Loading