Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Ensure self_group name matches email during authentication #78

Merged
merged 10 commits into from
Nov 21, 2023
26 changes: 15 additions & 11 deletions backend/lambdas/api/authorizer/authorizer/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import re
import time
import urllib.request

from datetime import datetime, timezone
from django.db import transaction
from typing import Tuple

from django.db import transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Has black been run on the changed files? (this change may well be because black was run, but I figured I'd ask)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, either black or isort would have made this change, i did not make it manually

from jose import jwk, jwt
from jose.utils import base64url_decode

Expand Down Expand Up @@ -181,15 +180,13 @@ def _get_update_or_create_user(email: str) -> User:
"""
user = _get_user(email)

# If user is found, return the user (unless the user is soft-deleted)
if user:
# If user is soft-deleted, return None so that auth fails
if user.deleted:
return None
else:
return user
# If user is soft-deleted, return None so that auth fails
# this should never occur in practice because user email is modified with a suffix of "_DELETED_{timestamp}" at deletion, but it is ok to retain this check
if user and user.deleted:
return None

if EMAIL_DOMAIN_ALIASES:
# if user is not found by email, check for aliases
if not user and EMAIL_DOMAIN_ALIASES:
email_local_part = email.split("@")[0]
email_domain = email.split("@")[1]

Expand All @@ -213,7 +210,14 @@ def _get_update_or_create_user(email: str) -> User:
if user and not user.deleted:
user.email = email
user.save()
return user

# if user is found directly or via alias (and was not soft-deleted), ensure that user's self group is named correctly, then return user
if user and not user.deleted:
if user.self_group.name != user.email:
user.self_group.name = user.email
user.self_group.save()

return user

# Create the user since no match has been found at this point
return _create_user(email)
Expand Down
98 changes: 84 additions & 14 deletions backend/lambdas/api/authorizer/tests/test_get_user.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import authorizer.handlers
import copy
import unittest

from unittest.mock import patch

import authorizer.handlers

EMAIL_DOMAIN_ALIASES = [
{
"new_domain": "company1.com",
Expand All @@ -30,28 +30,51 @@
"email": "[email protected]",
"deleted": False,
"last_login": "2023-01-01 00:00:00.000000+00:00",
"self_group": {"name": "[email protected]"},
},
{
"id": 2,
"email": "[email protected]",
"deleted": False,
"last_login": "2023-01-01 00:00:00.000000+00:00",
"self_group": {"name": "[email protected]"},
},
{
"id": 3,
"email": "[email protected]",
"deleted": True,
"last_login": "2023-01-01 00:00:00.000000+00:00",
"self_group": {"name": "[email protected]"},
},
{
"id": 4,
"email": "[email protected]",
"deleted": False,
"last_login": "2023-01-01 00:00:00.000000+00:00",
"self_group": {"name": "[email protected]"},
},
{
"id": 5,
"email": "[email protected]",
"deleted": False,
"last_login": "2023-01-01 00:00:00.000000+00:00",
"self_group": {"name": "[email protected]"},
},
]


class MockGroup(object):
def __init__(self, **kwargs):
self.name = kwargs.get("name") or ""

def save(self):
pass

@classmethod
def create_self_group(cls, user):
user.self_group = MockGroup(name=user.email)


class MockUser(object):
users = []

Expand All @@ -62,6 +85,9 @@ def __init__(self, **kwargs):
self.email = kwargs.get("email")
self.deleted = kwargs.get("deleted") or False
self.last_login = kwargs.get("last_login") or ""
self_group = kwargs.get("self_group") or None
if self_group:
self.self_group = MockGroup(name=self_group.get("name"))

def save(self):
for user in MockUser.users:
Expand Down Expand Up @@ -93,8 +119,8 @@ def get(email: str, **kwargs):


@patch("authorizer.handlers.EMAIL_DOMAIN_ALIASES", EMAIL_DOMAIN_ALIASES)
@patch("authorizer.handlers.Group.create_self_group", lambda *x, **y: None)
@patch("authorizer.handlers.User", MockUser)
@patch("authorizer.handlers.Group", MockGroup)
class TestGetUser(unittest.TestCase):
def test_get_existing_user(self):
"""
Expand All @@ -103,7 +129,11 @@ def test_get_existing_user(self):
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
user = _get_update_or_create_user(email=email)
self.assertTrue(user.__dict__.get("id") == 1 and user.__dict__.get("email") == email)
self.assertTrue(
user.__dict__.get("id") == 1
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)

def test_get_deleted_user(self):
"""
Expand All @@ -120,48 +150,83 @@ def test_get_nonexistent_user(self):
"""
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
expected_userid = MockUser.users[-1].get("id") + 1
user = _get_update_or_create_user(email=email)
Copy link
Contributor Author

@breedenc breedenc Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to a derived value which still tests that a new user was created, but does not require the expected ID value to be hard-coded into the unit test and updated in the case that unrelated tests are added.

self.assertTrue(user.__dict__.get("id") == 5 and user.__dict__.get("email") == email)
self.assertTrue(
user.__dict__.get("id") == expected_userid
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)

def test_get_user_with_new_email(self):
"""
User logs in with email "[email protected]" and has an existing account with email "[email protected]"
Existing account is found, and email is updated to the new email "[email protected]"
Existing account is found, and email and self group name are updated to the new email "[email protected]"
"""
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
user = _get_update_or_create_user(email=email)
self.assertTrue(user.__dict__.get("id") == 1 and user.__dict__.get("email") == email)
self.assertTrue(
user.__dict__.get("id") == 1
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)

def test_get_user_with_new_email_with_transformation(self):
"""
User logs in with email "[email protected]" and has an existing account with email "[email protected]"
Existing account is found, and email is updated to the new email "[email protected]"
Existing account is found, and email and self group name are updated to the new email "[email protected]"
"""
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
user = _get_update_or_create_user(email=email)
self.assertTrue(user.__dict__.get("id") == 2 and user.__dict__.get("email") == email)
self.assertTrue(
user.__dict__.get("id") == 2
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)

def test_get_user_with_new_email_with_transformation2(self):
"""
User logs in with email "[email protected]" and has an existing account with email "[email protected]"
Existing account is found, and email is updated to the new email "[email protected]"
Existing account is found, and email and self group name are updated to the new email "[email protected]"
"""
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
user = _get_update_or_create_user(email=email)
self.assertTrue(user.__dict__.get("id") == 4 and user.__dict__.get("email") == email)
self.assertTrue(
user.__dict__.get("id") == 4
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)

def test_get_user_with_new_email_with_transformation3(self):
"""
User logs in with email "[email protected]" and has an existing account with email "[email protected]"
Existing account is found, and email is updated to the new email "[email protected]"
Existing account is found, and email and self group name are updated to the new email "[email protected]"
"""
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
user = _get_update_or_create_user(email=email)
self.assertTrue(user.__dict__.get("id") == 1 and user.__dict__.get("email") == email)
self.assertTrue(
user.__dict__.get("id") == 1
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)

def test_get_user_with_email_and_self_group_mismatch(self):
"""
User logs in with email "[email protected]" and has an existing account with email "[email protected]", but self-group name does not match
Existing account is found, and self group name is updated to the new email "[email protected]"
"""
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
user = _get_update_or_create_user(email=email)
self.assertTrue(
user.__dict__.get("id") == 5
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)

def test_get_user_with_new_email_and_deleted_old_user(self):
"""
Expand All @@ -170,5 +235,10 @@ def test_get_user_with_new_email_and_deleted_old_user(self):
"""
MockUser.users = copy.deepcopy(USERS)
email = "[email protected]"
expected_userid = MockUser.users[-1].get("id") + 1
user = _get_update_or_create_user(email=email)
self.assertTrue(user.__dict__.get("id") == 5 and user.__dict__.get("email") == email)
self.assertTrue(
user.__dict__.get("id") == expected_userid
and user.__dict__.get("email") == email
and user.__dict__.get("self_group").name == email
)
19 changes: 19 additions & 0 deletions backend/lambdas/api/repo/repo/util/authorize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

from artemisapi.response import response
from artemisdb.artemisdb.models import UserService
from artemislib.logging import Logger
from repo.util.parse_event import EventParser
from repo.util.scope import update_scope_cache, validate_scope_with_github
from repo.util.utils import auth

log = Logger(__name__)


def authorize(event_parser: EventParser) -> dict:
"""
Expand All @@ -24,17 +27,25 @@ def authorize(event_parser: EventParser) -> dict:

if not allowed and service_id == "github" and identity.principal_type != "group_api_key":
# If not allowed and not a group API key, check permission with GitHub
log.debug(
f"{identity.principal_id} does not have static or cached scope to {repo_id}, checking for Github access"
)
try:
github_account = UserService.objects.get(
user__email=identity.principal_id, user__deleted=False, service="github"
)
github_username = github_account.username
except UserService.DoesNotExist:
log.debug(f"Linked Github account not found for {identity.principal_id}")
return response({"message": f"Not authorized for {repo_id}"}, code=HTTPStatus.FORBIDDEN)

log.debug(f"Linked Github account found for {identity.principal_id}, validating Github access to {repo_id}")
validated = validate_scope_with_github(repo_id, service_id, github_username)

if validated:
log.debug(
f"Linked Github account found for {identity.principal_id} has access to {repo_id}, updating scope cache"
)
scope_cache += [f"github/{repo_id}"]
update_scope_cache(github_username, scope_cache)

Expand All @@ -58,8 +69,14 @@ def authorize(event_parser: EventParser) -> dict:
# the scopes from the self group so that they are given equal consideration
# and the group scope does not limit what was verified with GitHub but is
# still restricted by the key's own scope.

log.debug(f"Checking if {repo_id} in API key scope {identity.scope} for {identity.principal_id}")
identity.scope[0][1] += [f"github/{repo_id}"]
allowed = auth(repo_id, service_id, identity.scope)
if allowed:
log.debug(f"Repo {repo_id} in key scope")
else:
log.debug(f"Repo {repo_id} not in key scope")
else:
# Users have this scope structure:
# [
Expand Down Expand Up @@ -88,6 +105,8 @@ def authorize(event_parser: EventParser) -> dict:
if not allowed:
return response({"message": f"Not authorized for {repo_id}"}, code=HTTPStatus.FORBIDDEN)

log.debug(f"{identity.principal_id} authorized to {repo_id}")


def allowlist_is_denied(method: str, resource: str, service: str, repo: str, allowlist_denied: list):
if method in ["POST", "PUT", "DELETE"] and resource == "whitelist":
Expand Down
7 changes: 6 additions & 1 deletion backend/lambdas/api/repo/repo/util/scope.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json

import requests

from artemislib.datetime import get_utc_datetime
from artemislib.db_cache import DBLookupCache
from artemislib.logging import Logger
Expand Down Expand Up @@ -35,16 +34,22 @@ def validate_scope_with_github(repo_id: str, service_id: str, service_user: str)

headers = {"accept": "application/vnd.github.v3+json", "authorization": authorization}

log.debug(f"Attempting to validate permissions for github user {service_user} to repo {repo_id}")

r = requests.get(
f"https://api.github.com/repos/{repo_id}/collaborators/{service_user}/permission",
headers=headers,
)

log.debug(f"Github request returned status code: {r.status_code}")

permission = r.json().get("permission")

if not permission:
log.error(f"Error occurred attempting to validate permissions for github user {service_user} to repo {repo_id}")
log.error(f"Status: {r.status_code}")
log.error(f"Body: {r.text}")

log.debug(f"Permission returned for user {service_user} to {repo_id}: {permission}")

return permission in ["admin", "write", "read"]
Loading