Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
anodos325 committed Dec 12, 2024
1 parent 65760bc commit 59b3fd1
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 15 deletions.
1 change: 0 additions & 1 deletion src/middlewared/middlewared/api/v25_04_0/privilege.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from middlewared.api.base import BaseModel, Excluded, excluded_field, ForUpdateMetaclass, NonEmptyString, SID
from .api_key import AllowListItem
from .group import GroupEntry

__all__ = ["PrivilegeEntry", "PrivilegeRoleEntry",
Expand Down
4 changes: 2 additions & 2 deletions src/middlewared/middlewared/plugins/account_/privilege.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def do_create(self, data):
`ds_groups` is list of Directory Service group GIDs that will gain this privilege.
`allowlist` is a list of API endpoints allowed for this privilege.
`roles` is a list of roles to be assigned to the privilege
`web_shell` controls whether users with this privilege are allowed to log in to the Web UI.
"""
Expand Down Expand Up @@ -106,7 +106,7 @@ async def do_update(self, audit_callback, id_, data):
verrors = ValidationErrors()

if new["builtin_name"]:
for k in ["name", "allowlist", "roles"]:
for k in ["name", "roles"]:
if new[k] != old[k]:
verrors.add(f"privilege_update.{k}", "This field is read-only for built-in privileges")

Expand Down
25 changes: 13 additions & 12 deletions src/middlewared/middlewared/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Role:
'SYSTEM_AUDIT_READ': Role(),
'SYSTEM_AUDIT_WRITE': Role(),

'FULL_ADMIN': Role(full_admin=True, builtin=False, stig=False),
'FULL_ADMIN': Role(full_admin=True, builtin=False, stig=None),

# Alert roles
'ALERT_LIST_READ': Role(),
Expand All @@ -86,17 +86,17 @@ class Role:

# VM roles
'VM_READ': Role(),
'VM_WRITE': Role(includes=['VM_READ'], stig=False),
'VM_WRITE': Role(includes=['VM_READ'], stig=None),
'VM_DEVICE_READ': Role(includes=['VM_READ']),
'VM_DEVICE_WRITE': Role(includes=['VM_WRITE', 'VM_DEVICE_READ'], stig=False),
'VM_DEVICE_WRITE': Role(includes=['VM_WRITE', 'VM_DEVICE_READ'], stig=None),

# JBOF roles
'JBOF_READ': Role(),
'JBOF_WRITE': Role(includes=['JBOF_READ']),

# Truecommand roles
'TRUECOMMAND_READ': Role(),
'TRUECOMMAND_WRITE': Role(includes=['TRUECOMMAND_READ'], stig=False),
'TRUECOMMAND_WRITE': Role(includes=['TRUECOMMAND_READ'], stig=None),

# Crypto roles
'CERTIFICATE_READ': Role(),
Expand All @@ -106,11 +106,11 @@ class Role:

# Apps roles
'CATALOG_READ': Role(),
'CATALOG_WRITE': Role(includes=['CATALOG_READ'], stig=False),
'CATALOG_WRITE': Role(includes=['CATALOG_READ'], stig=None),
'DOCKER_READ': Role(includes=[]),
'DOCKER_WRITE': Role(includes=['DOCKER_READ'], stig=False),
'DOCKER_WRITE': Role(includes=['DOCKER_READ'], stig=None),
'APPS_READ': Role(includes=['CATALOG_READ']),
'APPS_WRITE': Role(includes=['CATALOG_WRITE', 'APPS_READ'], stig=False),
'APPS_WRITE': Role(includes=['CATALOG_WRITE', 'APPS_READ'], stig=None),

# FTP roles
'SHARING_FTP_READ': Role(),
Expand Down Expand Up @@ -211,7 +211,7 @@ class Role:
'VIRT_GLOBAL_WRITE': Role(includes=['VIRT_GLOBAL_READ'], stig=None),
'VIRT_INSTANCE_READ': Role(),
'VIRT_INSTANCE_WRITE': Role(includes=['VIRT_INSTANCE_READ'], stig=None),
'VIRT_INSTANCE_DELETE': Role(stig=False),
'VIRT_INSTANCE_DELETE': Role(stig=None),
'VIRT_IMAGE_READ': Role(),
'VIRT_IMAGE_WRITE': Role(includes=['VIRT_IMAGE_READ'], stig=None),

Expand Down Expand Up @@ -280,7 +280,8 @@ def add_roles_to_method(self, method_name: str, roles: typing.Iterable[str]):
def register_event(self, event_name: str, roles: typing.Iterable[str], *, exist_ok: bool = False):
self.events.register_resource(event_name, roles, exist_ok)

def role_stig_check(self, role: Role, enabled_stig: STIGType) -> bool:
def role_stig_check(self, role_name: str, enabled_stig: STIGType) -> bool:
role = self.roles[role_name]
if role.stig is None:
return False

Expand All @@ -297,11 +298,11 @@ def roles_for_role(self, role: str, enabled_stig: STIGType | None) -> typing.Set

if self.roles[role].full_admin:
# Convert FULL_ADMIN to all stig-allowed roles.
return set([role_name for role_name, role in self.roles.items() if self.stig_check(role, enabled_stig)])
return set([role_name for role_name, role in self.roles.items() if self.role_stig_check(role_name, enabled_stig)])

return set.union({role}, *[
self.roles_for_role(included_role, stig_enabled)
for included_role in self.roles[role].includes if self.stig_check(included_role.stig, enabled_stig)
self.roles_for_role(included_role, enabled_stig)
for included_role in self.roles[role].includes if self.role_stig_check(included_role, enabled_stig)
])

def allowlist_for_role(self, role: str, enabled_stig: STIGType) -> typing.List[dict[str, str]]:
Expand Down
67 changes: 67 additions & 0 deletions tests/unit/test_role_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pytest

from middlewared.role import RoleManager, ROLES
from middlewared.utils import security


FAKE_METHODS = [
('fakemethod1', ['READONLY_ADMIN']),
('fakemethod2', ['FILESYSTEM_DATA_READ']),
('fakemethod3', ['CATALOG_WRITE']),
('fakemethod4', ['DOCKER_READ']),
('fakemethod5', ['ACCOUNT_READ']),
('fakemethod6', ['ACCOUNT_WRITE']),
]

FAKE_METHODS_NOSTIG = frozenset(['fakemethod3'])
ALL_METHODS = frozenset([method for method, roles in FAKE_METHODS])
WRITE_METHODS = frozenset(['fakemethod3', 'fakemethod6'])
READONLY_ADMIN_METHODS = ALL_METHODS - WRITE_METHODS
FULL_ADMIN_STIG = ALL_METHODS - FAKE_METHODS_NOSTIG


@pytest.fixture(scope='module')
def nostig_roles():
# Generate list of expected roles that should be unavailble for STIG mode
PREFIXES = ('VM', 'TRUECOMMAND', 'CATALOG', 'DOCKER', 'APPS', 'VIRT', 'NOSTIG_PREFIX')
yield set([
role_name for
role_name in list(ROLES.keys()) if role_name.startswith(PREFIXES) and not role_name.endswith('READ')
]) | set(['FULL_ADMIN'])


@pytest.fixture(scope='module')
def role_manager():
""" A role manager populated with made up methods """
rm = RoleManager(roles=ROLES)
for method, roles in FAKE_METHODS:
rm.register_method(method_name=method, roles=roles)

yield rm


@pytest.mark.parametrize('role_name', list(ROLES.keys()))
def test__roles_have_correct_stig_assignment(nostig_roles, role_name):
role_to_check = ROLES[role_name]
assert type(role_to_check.stig) in (security.STIGType, type(None))

if role_to_check.stig is not None:
assert role_name not in nostig_roles
else:
assert role_name in nostig_roles


@pytest.mark.parametrize('role,method,enabled_stig_type,resources', [
('READONLY_ADMIN', 'CALL', None, READONLY_ADMIN_METHODS),
('READONLY_ADMIN', 'CALL', security.STIGType.GPOS, READONLY_ADMIN_METHODS),
('FULL_ADMIN', '*', None, {'*'}),
('FULL_ADMIN', 'CALL', security.STIGType.GPOS, FULL_ADMIN_STIG),
])
def test__roles_have_corect_allowlist(role_manager, role, method, enabled_stig_type, resources):
allowlist = role_manager.allowlist_for_role(role, enabled_stig_type)
allowlist_resources = set()
for entry in allowlist:
assert method == entry['method']
allowlist_resources.add(entry['resource'])

assert allowlist_resources == resources

0 comments on commit 59b3fd1

Please sign in to comment.