Skip to content

Commit

Permalink
Search: Permissioned tribal audit access (#2807)
Browse files Browse the repository at this point in the history
* checkpoint

* add search view tests, refactor search fn

* lint

* first pass

* clean up is_public clause
  • Loading branch information
timoballard authored Nov 17, 2023
1 parent 1b8e05a commit 11eea7f
Show file tree
Hide file tree
Showing 11 changed files with 696 additions and 136 deletions.
43 changes: 43 additions & 0 deletions backend/dissemination/mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging

from django.core.exceptions import PermissionDenied
from django.http import Http404

from dissemination.models import General
from users.permissions import can_read_tribal


logger = logging.getLogger(__name__)


class ReportAccessRequiredMixin:
def dispatch(self, request, *args, **kwargs):
report_id = kwargs["report_id"]
try:
general = General.objects.get(report_id=report_id)

if general.is_public:
return super().dispatch(request, *args, **kwargs)

if not request.user:
logger.debug(
f"denying anonymous user access to non-public report {report_id}"
)
raise PermissionDenied

if not request.user.is_authenticated:
logger.debug(
f"denying anonymous user access to non-public report {report_id}"
)
raise PermissionDenied

if not can_read_tribal(request.user):
logger.debug(
f"denying user {request.user.email} access to non-public report {report_id}"
)
raise PermissionDenied

return super().dispatch(request, *args, **kwargs)

except General.DoesNotExist:
raise Http404()
90 changes: 59 additions & 31 deletions backend/dissemination/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,20 @@ def search_general(
cog_or_oversight=None,
agency_name=None,
audit_years=None,
include_private=False,
):
query = Q(is_public=True)
query = Q()

if alns:
query.add(_get_aln_match_query(alns), Q.AND)
query.add(_get_aln_match_query(alns), Q.AND)
query.add(_get_names_match_query(names), Q.AND)
query.add(_get_uei_or_eins_match_query(uei_or_eins), Q.AND)
query.add(_get_start_date_match_query(start_date), Q.AND)
query.add(_get_end_date_match_query(end_date), Q.AND)
query.add(_get_cog_or_oversight_match_query(agency_name, cog_or_oversight), Q.AND)
query.add(_get_audit_years_match_query(audit_years), Q.AND)

if names:
query.add(_get_names_match_query(names), Q.AND)

if uei_or_eins:
uei_or_ein_match = Q(
Q(auditee_uei__in=uei_or_eins) | Q(auditee_ein__in=uei_or_eins)
)
query.add(uei_or_ein_match, Q.AND)

if start_date:
start_date_match = Q(fac_accepted_date__gte=start_date)
query.add(start_date_match, Q.AND)

if end_date:
end_date_match = Q(fac_accepted_date__lte=end_date)
query.add(end_date_match, Q.AND)

if cog_or_oversight:
if cog_or_oversight.lower() == "cog":
cog_match = Q(cognizant_agency__in=[agency_name])
query.add(cog_match, Q.AND)
elif cog_or_oversight.lower() == "oversight":
oversight_match = Q(oversight_agency__in=[agency_name])
query.add(oversight_match, Q.AND)

if audit_years:
fiscal_year_match = Q(audit_year__in=audit_years)
query.add(fiscal_year_match, Q.AND)
if not include_private:
query.add(Q(is_public=True), Q.AND)

results = General.objects.filter(query).order_by("-fac_accepted_date")

Expand All @@ -65,6 +45,10 @@ def _get_aln_match_query(alns):
# If there's a prefix and extention, search on both.
# 3. Add the report_ids from the identified awards to the search params.
"""

if not alns:
return Q()

# Split each ALN into (prefix, extention)
split_alns = set()
agency_numbers = set()
Expand Down Expand Up @@ -118,6 +102,9 @@ def _get_names_match_query(names):
"""
Given a list of (potential) names, return the query object that searches auditee and firm names.
"""
if not names:
return Q()

name_fields = [
"auditee_city",
"auditee_contact_name",
Expand All @@ -139,3 +126,44 @@ def _get_names_match_query(names):
names_match.add(Q(**{"%s__search" % field: names}), Q.OR)

return names_match


def _get_uei_or_eins_match_query(uei_or_eins):
if not uei_or_eins:
return Q()

uei_or_ein_match = Q(
Q(auditee_uei__in=uei_or_eins) | Q(auditee_ein__in=uei_or_eins)
)
return uei_or_ein_match


def _get_start_date_match_query(start_date):
if not start_date:
return Q()

return Q(fac_accepted_date__gte=start_date)


def _get_end_date_match_query(end_date):
if not end_date:
return Q()

return Q(fac_accepted_date__lte=end_date)


def _get_cog_or_oversight_match_query(agency_name, cog_or_oversight):
if not cog_or_oversight:
return Q()

if cog_or_oversight.lower() == "cog":
return Q(cognizant_agency__in=[agency_name])
elif cog_or_oversight.lower() == "oversight":
return Q(oversight_agency__in=[agency_name])


def _get_audit_years_match_query(audit_years):
if not audit_years:
return Q()

return Q(audit_year__in=audit_years)
107 changes: 107 additions & 0 deletions backend/dissemination/test_mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.http.response import Http404
from django.test import TestCase
from django.test.client import RequestFactory
from django.views.generic import View

from dissemination.models import General
from dissemination.mixins import ReportAccessRequiredMixin

from users.models import Permission, UserPermission

from model_bakery import baker

User = get_user_model()


class ReportAccessRequiredMixinTests(TestCase):
class ViewStub(ReportAccessRequiredMixin, View):
def get(self, request, *args, **kwargs):
pass

def test_missing_report_id_raises(self):
request = RequestFactory().get("/")

self.assertRaises(KeyError, self.ViewStub().dispatch, request)

def test_nonexistent_report_raises(self):
request = RequestFactory().get("/")

self.assertRaises(
Http404,
self.ViewStub().dispatch,
request,
report_id="not-a-real-report-id",
)

def test_public_report_passes(self):
request = RequestFactory().get("/")

general = baker.make(General, is_public=True)

self.ViewStub().dispatch(request, report_id=general.report_id)

def test_non_public_raises_for_anonymous(self):
request = RequestFactory().get("/")
request.user = None

general = baker.make(General, is_public=False)

self.assertRaises(
PermissionDenied,
self.ViewStub().dispatch,
request,
report_id=general.report_id,
)

def test_non_public_raises_for_unpermissioned(self):
request = RequestFactory().get("/")

user = baker.make(User)
request.user = user

general = baker.make(General, is_public=False)

self.assertRaises(
PermissionDenied,
self.ViewStub().dispatch,
request,
report_id=general.report_id,
)

def test_non_public_passes_for_permissioned(self):
request = RequestFactory().get("/")

user = baker.make(User)
request.user = user

permission = Permission.objects.get(slug=Permission.PermissionType.READ_TRIBAL)
baker.make(UserPermission, user=user, email=user.email, permission=permission)

general = baker.make(General, is_public=False)

self.ViewStub().dispatch(request, report_id=general.report_id)

def test_public_passes_for_unpermissioned(self):
request = RequestFactory().get("/")

user = baker.make(User)
request.user = user

general = baker.make(General, is_public=True)

self.ViewStub().dispatch(request, report_id=general.report_id)

def test_public_passes_for_permissioned(self):
request = RequestFactory().get("/")

user = baker.make(User)
request.user = user

permission = Permission.objects.get(slug=Permission.PermissionType.READ_TRIBAL)
baker.make(UserPermission, user=user, email=user.email, permission=permission)

general = baker.make(General, is_public=True)

self.ViewStub().dispatch(request, report_id=general.report_id)
Loading

0 comments on commit 11eea7f

Please sign in to comment.