Skip to content

Commit

Permalink
Rework pwdgrp.cgi (#275)
Browse files Browse the repository at this point in the history
* Fix requests

* Fix interface

* Improve typing

* User enum for secondary group
Clean up imports

* Minor clean up
  • Loading branch information
Kane610 authored Oct 12, 2023
1 parent bf7d3c5 commit 03236fb
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 128 deletions.
109 changes: 29 additions & 80 deletions axis/vapix/interfaces/pwdgrp_cgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,103 +13,52 @@
comment: The comment field of the account.
"""

import re
from typing import Dict

from ..models.pwdgrp_cgi import ADMIN, OPERATOR, PTZ, VIEWER, User
from .api import APIItems

PROPERTY = "Properties.API.HTTP.Version=3"

URL = "/axis-cgi/pwdgrp.cgi"
URL_GET = URL + "?action=get"

SGRP_VIEWER = VIEWER
SGRP_OPERATOR = "{}:{}".format(VIEWER, OPERATOR)
SGRP_ADMIN = "{}:{}:{}".format(VIEWER, OPERATOR, ADMIN)

REGEX_USER = re.compile(r"^[A-Z0-9]{1,14}$", re.IGNORECASE)
REGEX_PASS = re.compile(r"^[x20-x7e]{1,64}$")
REGEX_STRING = re.compile(r"[A-Z0-9]+", re.IGNORECASE)


class Users(APIItems):
from ..models.pwdgrp_cgi import (
CreateUserRequest,
DeleteUserRequest,
GetUsersRequest,
GetUsersResponse,
ModifyUserRequest,
SecondaryGroup,
User,
)
from .api_handler import ApiHandler


class Users(ApiHandler):
"""Represents all users of a device."""

item_cls = User
path = URL_GET

async def update(self) -> None:
"""Update list of current users."""
users = await self.list()
self.process_raw(users)

@staticmethod
def pre_process_raw(raw: str) -> dict: # type: ignore[override]
"""Pre-process raw string.
Prepare users to work with APIItems.
Create booleans with user levels.
"""
if "=" not in raw:
return {}

raw_dict: Dict[str, str] = dict(
group.split("=", 1) for group in raw.splitlines() # type: ignore
)

raw_users = ["root"] + REGEX_STRING.findall(raw_dict["users"])

users = {
user: {
group: user in REGEX_STRING.findall(raw_dict[group])
for group in [ADMIN, OPERATOR, VIEWER, PTZ]
}
for user in raw_users
}
async def _api_request(self) -> dict[str, User]:
"""Get default data of basic device information."""
return await self.list()

return users

async def list(self) -> str:
async def list(self) -> dict[str, User]:
"""List current users."""
data = {"action": "get"}
return await self.vapix.request("post", URL, data=data)
data = await self.vapix.new_request(GetUsersRequest())
return GetUsersResponse.decode(data).data

async def create(
self, user: str, *, pwd: str, sgrp: str, comment: str | None = None
self,
user: str,
*,
pwd: str,
sgrp: SecondaryGroup,
comment: str | None = None,
) -> None:
"""Create new user."""
data = {"action": "add", "user": user, "pwd": pwd, "grp": "users", "sgrp": sgrp}

if comment:
data["comment"] = comment

await self.vapix.request("post", URL, data=data)
await self.vapix.new_request(CreateUserRequest(user, pwd, sgrp, comment))

async def modify(
self,
user: str,
*,
pwd: str | None = None,
sgrp: str | None = None,
sgrp: SecondaryGroup | None = None,
comment: str | None = None,
) -> None:
"""Update user."""
data = {"action": "update", "user": user}

if pwd:
data["pwd"] = pwd

if sgrp:
data["sgrp"] = sgrp

if comment:
data["comment"] = comment

await self.vapix.request("post", URL, data=data)
await self.vapix.new_request(ModifyUserRequest(user, pwd, sgrp, comment))

async def delete(self, user: str) -> None:
"""Remove user."""
data = {"action": "remove", "user": user}

await self.vapix.request("post", URL, data=data)
await self.vapix.new_request(DeleteUserRequest(user))
6 changes: 5 additions & 1 deletion axis/vapix/interfaces/user_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
Figure out what access rights an account has.
"""

from ..models.pwdgrp_cgi import ADMIN, OPERATOR, PTZ, VIEWER
from .api import APIItem, APIItems

URL = "/axis-cgi/usergroup.cgi"

ADMIN = "admin"
OPERATOR = "operator"
VIEWER = "viewer"
PTZ = "ptz"

UNKNOWN = "unknown"


Expand Down
9 changes: 7 additions & 2 deletions axis/vapix/models/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,14 @@ class ApiRequest(ABC):
path: str = field(init=False)

@property
@abstractmethod
def content(self) -> bytes:
def content(self) -> bytes | None:
"""Request content."""
return None

@property
def data(self) -> dict[str, str] | None:
"""Request data."""
return None


class APIItem:
Expand Down
191 changes: 172 additions & 19 deletions axis/vapix/models/pwdgrp_cgi.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,190 @@
"""Axis Vapix user management user class."""

from .api import APIItem
from dataclasses import dataclass
import enum
import re

ADMIN = "admin"
OPERATOR = "operator"
VIEWER = "viewer"
PTZ = "ptz"
from typing_extensions import Self, TypedDict

from .api import ApiItem, ApiRequest, ApiResponse

class User(APIItem):
"""Represents a user."""

class SecondaryGroup(enum.Enum):
"""Supported user secondary groups.
Defines the user access rights for the account.
"""

ADMIN = "viewer:operator:admin"
ADMIN_PTZ = "viewer:operator:admin:ptz"
OPERATOR = "viewer:operator"
OPERATOR_PTZ = "viewer:operator:ptz"
VIEWER = "viewer"
VIEWER_PTZ = "viewer:ptz"


REGEX_USER = re.compile(r"^[A-Z0-9]{1,14}$", re.IGNORECASE)
REGEX_PASS = re.compile(r"^[x20-x7e]{1,64}$")
REGEX_STRING = re.compile(r"[A-Z0-9]+", re.IGNORECASE)


class UserGroupsT(TypedDict):
"""Groups user belongs to."""

user: str
admin: bool
operator: bool
viewer: bool
ptz: bool


@dataclass
class User(ApiItem):
"""Represents a user and the groups it belongs to."""

admin: bool
operator: bool
viewer: bool
ptz: bool

@property
def name(self) -> str:
"""User name."""
return self.id

@classmethod
def decode(cls, data: UserGroupsT) -> Self:
"""Create object from dict."""
return cls(
id=data["user"],
admin=data["admin"],
operator=data["operator"],
viewer=data["viewer"],
ptz=data["ptz"],
)

@classmethod
def from_list(cls, data: list[UserGroupsT]) -> dict[str, Self]:
"""Create objects from list."""
users = [cls.decode(item) for item in data]
return {user.id: user for user in users}


@dataclass
class GetUsersRequest(ApiRequest):
"""Request object for listing users."""

method = "post"
path = "/axis-cgi/pwdgrp.cgi"
content_type = "text/plain"

@property
def admin(self) -> bool:
"""Is user part of admin group."""
return self.raw[ADMIN]
def data(self) -> dict[str, str]:
"""Request data."""
return {"action": "get"}


@dataclass
class GetUsersResponse(ApiResponse[dict[str, User]]):
"""Response object for listing ports."""

@classmethod
def decode(cls, bytes_data: bytes) -> Self:
"""Prepare API description dictionary."""
if "=" not in (string_data := bytes_data.decode()):
return cls(data={})

data: dict[str, str] = dict(
group.split("=", 1) for group in string_data.splitlines()
)

user_list = ["root"] + REGEX_STRING.findall(data["users"])

users: list[UserGroupsT] = [
{
"user": user,
"admin": user in REGEX_STRING.findall(data["admin"]),
"operator": user in REGEX_STRING.findall(data["operator"]),
"viewer": user in REGEX_STRING.findall(data["viewer"]),
"ptz": user in REGEX_STRING.findall(data["ptz"]),
}
for user in user_list
]

return cls(data=User.from_list(users))


@dataclass
class CreateUserRequest(ApiRequest):
"""Request object for creating a user."""

method = "post"
path = "/axis-cgi/pwdgrp.cgi"
content_type = "text/plain"

user: str
pwd: str
sgrp: SecondaryGroup
comment: str | None = None

@property
def operator(self) -> bool:
"""Is user part of operator group."""
return self.raw[OPERATOR]
def data(self) -> dict[str, str]:
"""Request data."""
data = {
"action": "add",
"user": self.user,
"pwd": self.pwd,
"grp": "users",
"sgrp": self.sgrp.value,
}

if self.comment is not None:
data["comment"] = self.comment

return data


@dataclass
class ModifyUserRequest(ApiRequest):
"""Request object for modifying a user."""

method = "post"
path = "/axis-cgi/pwdgrp.cgi"
content_type = "text/plain"

user: str
pwd: str | None = None
sgrp: SecondaryGroup | None = None
comment: str | None = None

@property
def viewer(self) -> bool:
"""Is user part of viewer group."""
return self.raw[VIEWER]
def data(self) -> dict[str, str]:
"""Request data."""
data = {"action": "update", "user": self.user}

if self.pwd is not None:
data["pwd"] = self.pwd

if self.sgrp:
data["sgrp"] = self.sgrp.value

if self.comment:
data["comment"] = self.comment

return data


@dataclass
class DeleteUserRequest(ApiRequest):
"""Request object for deleting a user."""

method = "post"
path = "/axis-cgi/pwdgrp.cgi"
content_type = "text/plain"

user: str

@property
def ptz(self) -> bool:
"""Is user part of PTZ group."""
return self.raw[PTZ]
def data(self) -> dict[str, str]:
"""Request data."""
return {"action": "remove", "user": self.user}
Loading

0 comments on commit 03236fb

Please sign in to comment.