Skip to content

Commit

Permalink
Rework user groups (#278)
Browse files Browse the repository at this point in the history
* Get interface to work using ApiHandler

* Break apart into model file as well

* Simplify
  • Loading branch information
Kane610 authored Oct 13, 2023
1 parent 0da3816 commit ed6f597
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 129 deletions.
65 changes: 11 additions & 54 deletions axis/vapix/interfaces/user_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,19 @@
Figure out what access rights an account has.
"""

from .api import APIItem, APIItems
from ..models.pwdgrp_cgi import User
from ..models.user_group import GetUserGroupRequest, GetUserGroupResponse
from .api_handler import ApiHandler

URL = "/axis-cgi/usergroup.cgi"

ADMIN = "admin"
OPERATOR = "operator"
VIEWER = "viewer"
PTZ = "ptz"

UNKNOWN = "unknown"


class UserGroups(APIItems):
class UserGroups(ApiHandler[User]):
"""User group access rights for Axis devices."""

item_cls = APIItem
path = URL

@staticmethod
def pre_process_raw(raw: str) -> dict: # type: ignore[override]
"""Process raw group list to generate a full list of what is and isnt supported."""
raw_list = raw.splitlines()

group_list = []
if len(raw_list) == 2:
group_list = raw_list[1].split()

return {group: group in group_list for group in [ADMIN, OPERATOR, VIEWER, PTZ]}

@property
def privileges(self) -> str:
"""Return highest privileged role supported."""
if self.admin:
return ADMIN
if self.operator:
return OPERATOR
if self.viewer:
return VIEWER
return UNKNOWN

@property
def admin(self) -> bool:
"""Is user admin."""
return self[ADMIN].raw # type: ignore[return-value]

@property
def operator(self) -> bool:
"""Is user operator."""
return self[OPERATOR].raw # type: ignore[return-value]

@property
def viewer(self) -> bool:
"""Is user viewer."""
return self[VIEWER].raw # type: ignore[return-value]
async def _api_request(self) -> dict[str, User]:
"""Get API data method defined by subsclass."""
return await self.get_user_groups()

@property
def ptz(self) -> bool:
"""Is user ptz."""
return self[PTZ].raw # type: ignore[return-value]
async def get_user_groups(self) -> dict[str, User]:
"""Retrieve privilege rights for current user."""
bytes_data = await self.vapix.new_request(GetUserGroupRequest())
return GetUserGroupResponse.decode(bytes_data).data
19 changes: 19 additions & 0 deletions axis/vapix/models/pwdgrp_cgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class SecondaryGroup(enum.Enum):
VIEWER = "viewer"
VIEWER_PTZ = "viewer:ptz"

UNKNOWN = "unknown"


REGEX_USER = re.compile(r"^[A-Z0-9]{1,14}$", re.IGNORECASE)
REGEX_PASS = re.compile(r"^[x20-x7e]{1,64}$")
Expand Down Expand Up @@ -52,6 +54,23 @@ def name(self) -> str:
"""User name."""
return self.id

@property
def privileges(self) -> SecondaryGroup:
"""Return highest privileged role supported."""
if self.admin and self.ptz:
return SecondaryGroup.ADMIN_PTZ
if self.admin:
return SecondaryGroup.ADMIN
if self.operator and self.ptz:
return SecondaryGroup.OPERATOR_PTZ
if self.operator:
return SecondaryGroup.OPERATOR
if self.viewer and self.ptz:
return SecondaryGroup.VIEWER_PTZ
if self.viewer:
return SecondaryGroup.VIEWER
return SecondaryGroup.UNKNOWN

@classmethod
def decode(cls, data: UserGroupsT) -> Self:
"""Create object from dict."""
Expand Down
46 changes: 46 additions & 0 deletions axis/vapix/models/user_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""User group API.
Figure out what access rights an account has.
"""

from dataclasses import dataclass

from typing_extensions import Self

from .api import ApiRequest, ApiResponse
from .pwdgrp_cgi import User, UserGroupsT


@dataclass
class GetUserGroupRequest(ApiRequest):
"""Request object for listing users."""

method = "get"
path = "/axis-cgi/usergroup.cgi"
content_type = "text/plain"


@dataclass
class GetUserGroupResponse(ApiResponse[dict[str, User]]):
"""Response object for listing ports."""

@classmethod
def decode(cls, bytes_data: bytes) -> Self:
"""Prepare API description dictionary."""
data: list[str] = bytes_data.decode().splitlines()

if len(data) == 0:
return cls(data={})

group_list = []
if len(data) == 2:
group_list = data[1].split()

user: UserGroupsT = {
"user": data[0],
"admin": "admin" in group_list,
"operator": "operator" in group_list,
"viewer": "viewer" in group_list,
"ptz": "ptz" in group_list,
}
return cls(data={"0": User.decode(user)})
29 changes: 11 additions & 18 deletions axis/vapix/vapix.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
from .interfaces.ptz import PtzControl
from .interfaces.pwdgrp_cgi import Users
from .interfaces.stream_profiles import StreamProfilesHandler
from .interfaces.user_groups import UNKNOWN, UserGroups
from .interfaces.user_groups import UserGroups
from .interfaces.view_areas import ViewAreaHandler
from .models.api import ApiRequest
from .models.pwdgrp_cgi import SecondaryGroup

if TYPE_CHECKING:
from ..device import AxisDevice
Expand All @@ -61,10 +62,10 @@ def __init__(self, device: "AxisDevice") -> None:
self.params: Params | None = None
self._ports: Ports | None = None
self.ptz: PtzControl | None = None
self.user_groups: UserGroups | None = None
self.vmd4: Vmd4 | None = None

self.users = Users(self)
self.user_groups = UserGroups(self)

self.api_discovery: ApiDiscoveryHandler = ApiDiscoveryHandler(self)
self.basic_device_info = BasicDeviceInfoHandler(self)
Expand Down Expand Up @@ -104,11 +105,11 @@ def serial_number(self) -> str:
return self.params.system_serialnumber # type: ignore[union-attr]

@property
def access_rights(self) -> str:
def access_rights(self) -> SecondaryGroup:
"""Access rights with the account."""
if self.user_groups:
return self.user_groups.privileges
return UNKNOWN
if user := self.user_groups.get("0"):
return user.privileges
return SecondaryGroup.UNKNOWN

@property
def streaming_profiles(self) -> list:
Expand Down Expand Up @@ -264,25 +265,17 @@ async def load_user_groups(self) -> None:
If information is available from pwdgrp.cgi use that.
"""
user_groups = ""
if self.users and self.device.config.username in self.users:
user = self.users[self.device.config.username]
user_groups = (
f"{user.name}\n" # type: ignore[attr-defined]
+ ("admin " if user.admin else "") # type: ignore[attr-defined]
+ ("operator " if user.operator else "") # type: ignore[attr-defined]
+ ("viewer " if user.viewer else "") # type: ignore[attr-defined]
+ ("ptz" if user.ptz else "") # type: ignore[attr-defined]
)
user_groups = {}
if len(self.users) > 0 and self.device.config.username in self.users:
user_groups = {"0": self.users[self.device.config.username]}

self.user_groups = UserGroups(self)
if not user_groups:
try:
await self.user_groups.update()
return
except PathNotFound:
pass
self.user_groups.process_raw(user_groups)
self.user_groups._items = user_groups

async def request(
self,
Expand Down
73 changes: 34 additions & 39 deletions tests/test_user_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,98 +6,93 @@
import pytest
import respx

from axis.vapix.interfaces.user_groups import URL, UserGroups
from axis.vapix.interfaces.user_groups import UserGroups
from axis.vapix.models.pwdgrp_cgi import SecondaryGroup

from .conftest import HOST


@pytest.fixture
def user_groups(axis_device) -> UserGroups:
"""Return the user_groups mock object."""
return UserGroups(axis_device.vapix, "")
return UserGroups(axis_device.vapix)


@respx.mock
@pytest.mark.asyncio
async def test_empty_response(user_groups):
"""Test get_supported_versions."""
respx.get(f"http://{HOST}:80{URL}").respond(
respx.get(f"http://{HOST}:80/axis-cgi/usergroup.cgi").respond(
text="",
headers={"Content-Type": "text/plain"},
)
await user_groups.update()

assert user_groups.privileges == "unknown"
assert not user_groups.admin
assert not user_groups.operator
assert not user_groups.viewer
assert not user_groups.ptz
assert user_groups.get("0") is None


@respx.mock
@pytest.mark.asyncio
async def test_root_user(user_groups):
"""Test get_supported_versions."""
respx.get(f"http://{HOST}:80{URL}").respond(
respx.get(f"http://{HOST}:80/axis-cgi/usergroup.cgi").respond(
text="root\nroot admin operator ptz viewer\n",
headers={"Content-Type": "text/plain"},
)
await user_groups.update()

assert user_groups.privileges == "admin"

assert user_groups.admin
assert user_groups.operator
assert user_groups.viewer
assert user_groups.ptz
assert (user := user_groups.get("0"))
assert user.privileges == SecondaryGroup.ADMIN_PTZ
assert user.admin
assert user.operator
assert user.viewer
assert user.ptz


@respx.mock
@pytest.mark.asyncio
async def test_admin_user(user_groups):
"""Test get_supported_versions."""
respx.get(f"http://{HOST}:80{URL}").respond(
text="administrator\nusers admin operator ptz viewer\n",
respx.get(f"http://{HOST}:80/axis-cgi/usergroup.cgi").respond(
text="administrator\nusers admin operator viewer\n",
headers={"Content-Type": "text/plain"},
)
await user_groups.update()

assert user_groups.privileges == "admin"
assert user_groups.admin
assert user_groups.operator
assert user_groups.viewer
assert user_groups.ptz
assert (user := user_groups.get("0"))
assert user.privileges == SecondaryGroup.ADMIN
assert user.admin
assert user.operator
assert user.viewer
assert not user.ptz


@respx.mock
@pytest.mark.asyncio
async def test_operator_user(user_groups):
"""Test get_supported_versions."""
respx.get(f"http://{HOST}:80{URL}").respond(
respx.get(f"http://{HOST}:80/axis-cgi/usergroup.cgi").respond(
text="operator\nusers operator viewer\n",
headers={"Content-Type": "text/plain"},
)
await user_groups.update()

assert user_groups.privileges == "operator"
assert not user_groups.admin
assert user_groups.operator
assert user_groups.viewer
assert not user_groups.ptz
assert (user := user_groups.get("0"))
assert user.privileges == SecondaryGroup.OPERATOR
assert not user.admin
assert user.operator
assert user.viewer
assert not user.ptz


@respx.mock
@pytest.mark.asyncio
async def test_viewer_user(user_groups):
"""Test get_supported_versions."""
respx.get(f"http://{HOST}:80{URL}").respond(
respx.get(f"http://{HOST}:80/axis-cgi/usergroup.cgi").respond(
text="viewer\nusers viewer\n",
headers={"Content-Type": "text/plain"},
)
await user_groups.update()

assert user_groups.privileges == "viewer"
assert not user_groups.admin
assert not user_groups.operator
assert user_groups.viewer
assert not user_groups.ptz
assert (user := user_groups.get("0"))
assert user.privileges == SecondaryGroup.VIEWER
assert not user.admin
assert not user.operator
assert user.viewer
assert not user.ptz
Loading

0 comments on commit ed6f597

Please sign in to comment.