Skip to content

Commit

Permalink
Refactor to paramterized tests
Browse files Browse the repository at this point in the history
  • Loading branch information
conbrad committed Oct 24, 2024
1 parent eb6644c commit aafa044
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 94 deletions.
10 changes: 10 additions & 0 deletions api/app/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import timezone, datetime
import logging
from typing import Optional
from aiohttp import ClientSession
from unittest.mock import MagicMock
import requests
import pytest
Expand All @@ -17,6 +18,7 @@
default_aiobotocore_get_session,
default_mock_requests_get,
default_mock_requests_post,
default_mock_client_get,
default_mock_requests_session_get,
default_mock_requests_session_post,
)
Expand Down Expand Up @@ -185,6 +187,14 @@ def mock_requests_session(monkeypatch):
return monkeypatch


@pytest.fixture()
def mock_client_session(monkeypatch):
"""Patch all calls to aiohttp.ClientSession"""
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
monkeypatch.setattr(ClientSession, "post", default_mock_client_get)
return monkeypatch


@pytest.fixture(autouse=True)
def spy_access_logging(mocker: MockerFixture):
"""Spies on access audting logging for tests"""
Expand Down
27 changes: 0 additions & 27 deletions api/app/tests/test_auth.feature

This file was deleted.

105 changes: 38 additions & 67 deletions api/app/tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,57 @@
""" Functional testing for authentication """

from datetime import datetime, timezone
from aiohttp import ClientSession
from pytest_bdd import scenario, given, then, parsers
from fastapi.testclient import TestClient
from datetime import datetime, timezone
import pytest
import app.auth
import app.main
from app.tests import load_json_file
from app.tests.common import default_mock_client_get


@scenario('test_auth.feature', 'Handling unauthenticated users')
def test_auth_1st_scenario():
""" BDD Scenario #1. """


@given(parsers.parse(
"I am an unauthenticated user {token} when I {verb} a protected {endpoint} with {payload}"),
converters={'token': str, 'verb': str, 'endpoint': str, 'payload': load_json_file(__file__)},
target_fixture='response')
def given_unauthenticated_user(monkeypatch, token: str, endpoint: str, verb: str, payload: dict):
""" Request (post/get) {endpoint} request which is protected """
@pytest.mark.parametrize(
"token, status, endpoint, verb, payload",
[
("Basic token", 401, "/api/weather_models/GDPS/predictions/summaries/", "post", "test_auth_stations_payload.json"),
("just_token", 401, "/api/weather_models/GDPS/predictions/summaries/", "post", "test_auth_stations_payload.json"),
("Bearer token", 401, "/api/weather_models/GDPS/predictions/summaries/", "post", "test_auth_stations_payload.json"),
("just_token", 401, "/api/weather_models/GDPS/predictions/most_recent/", "post", "test_auth_stations_payload.json"),
("Bearer token", 401, "/api/weather_models/GDPS/predictions/most_recent/", "post", "test_auth_stations_payload.json"),
("just_token", 401, "/api/stations/details/", "get", "test_auth_stations_payload.json"),
],
)
@pytest.mark.usefixtures("mock_client_session")
def test_unauthenticated_requests(token, status, endpoint, verb, payload, spy_access_logging):
client = TestClient(app.main.app)
monkeypatch.setattr(ClientSession, 'get', default_mock_client_get)
if verb == 'post':
response = client.post(endpoint, headers={'Authorization': token}, json=payload)
elif verb == 'get':
response = client.get(endpoint, headers={'Authorization': token})
payload = load_json_file(__file__)(payload)
response = None
if verb == "post":
response = client.post(endpoint, headers={"Authorization": token}, json=payload)
else:
raise NotImplementedError('unexpected verb', verb)
return response
response = client.get(endpoint, headers={"Authorization": token})


@then("Unauthenticated access audit logs are created", converters={'endpoint': str})
def no_access_is_logged(spy_access_logging, endpoint):
"""Access audit logs are created"""
spy_access_logging.assert_called_once_with(None, False, endpoint)


@then(parsers.parse("I will get an error with {status} code"), converters={'status': int})
def status_code(response, status: int):
""" Assert that we receive the expected status code """
assert response.status_code == status
spy_access_logging.assert_called_once_with(None, False, endpoint)


@pytest.mark.parametrize(
"status, endpoint, verb, utc_time",
[
(200, "/api/weather_models/GDPS/predictions/summaries/", "post", 1618870929583),
(200, "/api/weather_models/GDPS/predictions/most_recent/", "post", 1618870929583),
(200, "/api/stations/details/", "get", 1618870929583),
],
)
@pytest.mark.usefixtures("mock_client_session")
@pytest.mark.usefixtures("mock_jwt_decode")
@scenario("test_auth.feature", "Verifying authenticated users")
def test_auth_2nd_scenario():
""" BDD Scenario #2. """


@then("Authenticated access audit logs are created")
def access_is_logged(spy_access_logging, endpoint):
"""Access audit logs are created"""
spy_access_logging.assert_called_once_with("test_username", True, endpoint)


@given(parsers.parse("utc_time: {utc_time}"), converters={'utc_time': int})
def given_utc_time(monkeypatch, utc_time: int):
""" Mock out utc time """
def test_authenticated_requests(status, endpoint, verb, utc_time, spy_access_logging, monkeypatch):
def mock_get_utc_now():
return datetime.fromtimestamp(utc_time / 1000, tz=timezone.utc)
monkeypatch.setattr(app.routers.stations, 'get_utc_now', mock_get_utc_now)


@given(parsers.parse("I am an authenticated user when I {verb} a protected {endpoint}"),
converters={'verb': str, 'endpoint': str},
target_fixture='response_2')
def given_authenticated_user(monkeypatch, endpoint: str, verb: str):
""" Request (post/get) {endpoint} request which is protected """
monkeypatch.setattr(app.routers.stations, "get_utc_now", mock_get_utc_now)
client = TestClient(app.main.app)
monkeypatch.setattr(ClientSession, 'get', default_mock_client_get)
if verb == 'post':
return client.post(endpoint, headers={"Authorization": "Bearer token"}, json={"stations": [838]})
if verb == 'get':
return client.get(
endpoint, headers={'Authorization': 'Bearer token'})
raise NotImplementedError('unexpected verb', verb)

response = None
if verb == "post":
response = client.post(endpoint, headers={"Authorization": "Bearer token"}, json={"stations": [838]})
if verb == "get":
response = client.get(endpoint, headers={"Authorization": "Bearer token"})

@then(parsers.parse("I shouldn't get an unauthorized error {status} code"), converters={'status': int})
def status_code_2(response_2, status: int):
""" Assert that we receive the expected status code """
assert response_2.status_code == status
assert response.status_code == status
spy_access_logging.assert_called_once_with("test_username", True, endpoint)

0 comments on commit aafa044

Please sign in to comment.