This repository has been archived by the owner on May 13, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #560 from sbor23/feat-oidc-auth
Implement OIDC authentication
- Loading branch information
Showing
21 changed files
with
372 additions
and
238 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import base64 | ||
import functools | ||
import hashlib | ||
|
||
import requests | ||
from django.conf import settings | ||
from django.core.cache import cache | ||
from django.core.exceptions import SuspiciousOperation | ||
from django.utils.encoding import force_bytes | ||
from mozilla_django_oidc.auth import LOGGER, OIDCAuthenticationBackend | ||
|
||
|
||
class TimedOIDCAuthenticationBackend(OIDCAuthenticationBackend): | ||
def get_introspection(self, access_token, id_token, payload): | ||
"""Return user details dictionary.""" | ||
|
||
basic = base64.b64encode( | ||
f"{settings.OIDC_OP_INTROSPECT_CLIENT_ID}:{settings.OIDC_OP_INTROSPECT_CLIENT_SECRET}".encode( | ||
"utf-8" | ||
) | ||
).decode() | ||
headers = { | ||
"Authorization": f"Basic {basic}", | ||
"Content-Type": "application/x-www-form-urlencoded", | ||
} | ||
response = requests.post( | ||
settings.OIDC_OP_INTROSPECT_ENDPOINT, | ||
verify=settings.OIDC_VERIFY_SSL, | ||
headers=headers, | ||
data={"token": access_token}, | ||
) | ||
response.raise_for_status() | ||
return response.json() | ||
|
||
def get_userinfo_or_introspection(self, access_token): | ||
try: | ||
claims = self.cached_request( | ||
self.get_userinfo, access_token, "auth.userinfo" | ||
) | ||
except requests.HTTPError as e: | ||
if not ( | ||
e.response.status_code in [401, 403] and settings.OIDC_CHECK_INTROSPECT | ||
): | ||
raise e | ||
|
||
# check introspection if userinfo fails (confidental client) | ||
claims = self.cached_request( | ||
self.get_introspection, access_token, "auth.introspection" | ||
) | ||
if "client_id" not in claims: | ||
raise SuspiciousOperation("client_id not present in introspection") | ||
|
||
return claims | ||
|
||
def get_or_create_user(self, access_token, id_token, payload): | ||
"""Verify claims and return user, otherwise raise an Exception.""" | ||
|
||
claims = self.get_userinfo_or_introspection(access_token) | ||
|
||
users = self.filter_users_by_claims(claims) | ||
|
||
if len(users) == 1: | ||
return users[0] | ||
elif settings.OIDC_CREATE_USER: | ||
return self.create_user(claims) | ||
else: | ||
LOGGER.debug( | ||
"Login failed: No user with username %s found, and " | ||
"OIDC_CREATE_USER is False", | ||
self.get_username(claims), | ||
) | ||
return None | ||
|
||
def filter_users_by_claims(self, claims): | ||
username = self.get_username(claims) | ||
return self.UserModel.objects.filter(username=username) | ||
|
||
def cached_request(self, method, token, cache_prefix): | ||
token_hash = hashlib.sha256(force_bytes(token)).hexdigest() | ||
|
||
func = functools.partial(method, token, None, None) | ||
|
||
return cache.get_or_set( | ||
f"{cache_prefix}.{token_hash}", | ||
func, | ||
timeout=settings.OIDC_BEARER_TOKEN_REVALIDATION_TIME, | ||
) | ||
|
||
def create_user(self, claims): | ||
"""Return object for a newly created user account.""" | ||
username = self.get_username(claims) | ||
email = claims.get(settings.OIDC_EMAIL_CLAIM, "") | ||
first_name = claims.get(settings.OIDC_FIRSTNAME_CLAIM, "") | ||
last_name = claims.get(settings.OIDC_LASTNAME_CLAIM, "") | ||
return self.UserModel.objects.create( | ||
username=username, email=email, first_name=first_name, last_name=last_name | ||
) | ||
|
||
def get_username(self, claims): | ||
try: | ||
return claims[settings.OIDC_USERNAME_CLAIM] | ||
except KeyError: | ||
raise SuspiciousOperation("Couldn't find username claim") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,15 @@ | ||
import inspect | ||
|
||
import mockldap | ||
import pytest | ||
from django.contrib.auth import get_user_model | ||
from django.core.cache import cache | ||
from factory.base import FactoryMetaClass | ||
from pytest_factoryboy import register | ||
from rest_framework.test import APIClient | ||
|
||
from timed.employment import factories as employment_factories | ||
from timed.projects import factories as projects_factories | ||
from timed.subscription import factories as subscription_factories | ||
from timed.tests.client import JSONAPIClient | ||
from timed.tracking import factories as tracking_factories | ||
|
||
|
||
|
@@ -25,48 +25,9 @@ def register_module(module): | |
register_module(tracking_factories) | ||
|
||
|
||
@pytest.fixture(autouse=True, scope="session") | ||
def ldap_directory(): | ||
top = ("o=test", {"o": "test"}) | ||
people = ("ou=people,o=test", {"ou": "people"}) | ||
groups = ("ou=groups,o=test", {"ou": "groups"}) | ||
ldapuser = ( | ||
"uid=ldapuser,ou=people,o=test", | ||
{ | ||
"uid": ["ldapuser"], | ||
"objectClass": [ | ||
"person", | ||
"organizationalPerson", | ||
"inetOrgPerson", | ||
"posixAccount", | ||
], | ||
"userPassword": ["Test1234!"], | ||
"uidNumber": ["1000"], | ||
"gidNumber": ["1000"], | ||
"givenName": ["givenName"], | ||
"mail": ["[email protected]"], | ||
"sn": ["LdapUser"], | ||
}, | ||
) | ||
|
||
directory = dict([top, people, groups, ldapuser]) | ||
mock = mockldap.MockLdap(directory) | ||
mock.start() | ||
|
||
yield | ||
|
||
mock.stop() | ||
|
||
|
||
@pytest.fixture | ||
def client(db): | ||
return JSONAPIClient() | ||
|
||
|
||
@pytest.fixture | ||
def auth_client(db): | ||
"""Return instance of a JSONAPIClient that is logged in as test user.""" | ||
user = get_user_model().objects.create_user( | ||
def auth_user(db): | ||
return get_user_model().objects.create_user( | ||
username="user", | ||
password="123qweasd", | ||
first_name="Test", | ||
|
@@ -75,43 +36,63 @@ def auth_client(db): | |
is_staff=False, | ||
) | ||
|
||
client = JSONAPIClient() | ||
client.user = user | ||
client.login("user", "123qweasd") | ||
return client | ||
|
||
|
||
@pytest.fixture | ||
def admin_client(db): | ||
"""Return instance of a JSONAPIClient that is logged in as a staff user.""" | ||
user = get_user_model().objects.create_user( | ||
username="user", | ||
def admin_user(db): | ||
return get_user_model().objects.create_user( | ||
username="admin", | ||
password="123qweasd", | ||
first_name="Test", | ||
first_name="Admin", | ||
last_name="User", | ||
is_superuser=False, | ||
is_staff=True, | ||
) | ||
|
||
client = JSONAPIClient() | ||
client.user = user | ||
client.login("user", "123qweasd") | ||
return client | ||
|
||
|
||
@pytest.fixture | ||
def superadmin_client(db): | ||
"""Return instance of a JSONAPIClient that is logged in as superuser.""" | ||
user = get_user_model().objects.create_user( | ||
username="user", | ||
def superadmin_user(db): | ||
return get_user_model().objects.create_user( | ||
username="superadmin", | ||
password="123qweasd", | ||
first_name="Test", | ||
first_name="Superadmin", | ||
last_name="User", | ||
is_staff=True, | ||
is_superuser=True, | ||
is_staff=True, | ||
) | ||
|
||
client = JSONAPIClient() | ||
client.user = user | ||
client.login("user", "123qweasd") | ||
|
||
@pytest.fixture | ||
def client(): | ||
return APIClient() | ||
|
||
|
||
@pytest.fixture | ||
def auth_client(auth_user): | ||
"""Return instance of a APIClient that is logged in as test user.""" | ||
client = APIClient() | ||
client.force_authenticate(user=auth_user) | ||
client.user = auth_user | ||
return client | ||
|
||
|
||
@pytest.fixture | ||
def admin_client(admin_user): | ||
"""Return instance of a APIClient that is logged in as a staff user.""" | ||
client = APIClient() | ||
client.force_authenticate(user=admin_user) | ||
client.user = admin_user | ||
return client | ||
|
||
|
||
@pytest.fixture | ||
def superadmin_client(superadmin_user): | ||
"""Return instance of a APIClient that is logged in as superuser.""" | ||
client = APIClient() | ||
client.force_authenticate(user=superadmin_user) | ||
client.user = superadmin_user | ||
return client | ||
|
||
|
||
@pytest.fixture(scope="function", autouse=True) | ||
def _autoclear_cache(): | ||
cache.clear() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.