Skip to content

Commit

Permalink
Add and consolidate parameter handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Kane610 committed Dec 20, 2023
1 parent e87a190 commit d3abfa7
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 48 deletions.
57 changes: 55 additions & 2 deletions axis/vapix/interfaces/api_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generic,
ItemsView,
Iterator,
Expand All @@ -17,15 +18,67 @@

from ..models.api import ApiItemT

CallbackType = Callable[[str], None]
SubscriptionType = CallbackType
UnsubscribeType = Callable[[], None]

class ApiHandler(ABC, Generic[ApiItemT]):
ID_FILTER_ALL = "*"


class SubscriptionHandler(ABC):
"""Manage subscription and notification to subscribers."""

def __init__(self) -> None:
"""Initialize subscription handler."""
self._subscribers: dict[str, list[SubscriptionType]] = {ID_FILTER_ALL: []}

def signal_subscribers(self, obj_id: str) -> None:
"""Signal subscribers."""
subscribers: list[SubscriptionType] = (
self._subscribers.get(obj_id, []) + self._subscribers[ID_FILTER_ALL]
)
for callback in subscribers:
callback(obj_id)

def subscribe(
self,
callback: CallbackType,
id_filter: tuple[str] | str | None = None,
) -> UnsubscribeType:
"""Subscribe to added events."""
subscription = callback

_id_filter: tuple[str]
if id_filter is None:
_id_filter = (ID_FILTER_ALL,)
elif isinstance(id_filter, str):
_id_filter = (id_filter,)

for obj_id in _id_filter:
if obj_id not in self._subscribers:
self._subscribers[obj_id] = []
self._subscribers[obj_id].append(subscription)

def unsubscribe() -> None:
for obj_id in _id_filter:
if obj_id not in self._subscribers:
continue
if subscription not in self._subscribers[obj_id]:
continue
self._subscribers[obj_id].remove(subscription)

return unsubscribe


class ApiHandler(SubscriptionHandler, Generic[ApiItemT]):
"""Base class for a map of API Items."""

api_id: "ApiId"
default_api_version: str | None = None

def __init__(self, vapix: "Vapix") -> None:
"""Initialize API items."""
super().__init__()
self.vapix = vapix
self._items: dict[str, ApiItemT] = {}
self.initialized = False
Expand All @@ -45,7 +98,7 @@ def api_version(self) -> str | None:

@abstractmethod
async def _api_request(self) -> dict[str, ApiItemT]:
"""Get API data method defined by subsclass."""
"""Get API data method defined by subclass."""

async def update(self) -> None:
"""Refresh data."""
Expand Down
2 changes: 1 addition & 1 deletion axis/vapix/interfaces/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MqttClientHandler(ApiHandler[Any]):
default_api_version = API_VERSION

async def _api_request(self) -> dict[str, None]:
"""Get API data method defined by subsclass."""
"""Get API data method defined by subclass."""
raise NotImplementedError

async def configure_client(self, client_config: ClientConfig) -> None:
Expand Down
19 changes: 19 additions & 0 deletions axis/vapix/interfaces/parameters/brand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Brand parameters."""

from typing import cast

from ...models.parameters.brand import BrandParam, BrandT
from .param_handler import ParamHandler


class BrandParameterHandler(ParamHandler[BrandParam]):
"""Handler for brand parameters."""

parameter_group = "Brand"

def get_params(self) -> dict[str, BrandParam]:
"""Retrieve brand properties."""
params = {}
if data := self.vapix.params.get_param(self.parameter_group):
params["0"] = BrandParam.decode(cast(BrandT, data))
return params
17 changes: 17 additions & 0 deletions axis/vapix/interfaces/parameters/image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Image parameters."""

from ...models.parameters.image import ImageParam
from .param_handler import ParamHandler


class ImageParameterHandler(ParamHandler[ImageParam]):
"""Handler for image parameters."""

parameter_group = "Image"

def get_params(self) -> dict[str, ImageParam]:
"""Retrieve brand properties."""
params = {}
if data := self.vapix.params.get_param(self.parameter_group):
params["0"] = ImageParam.decode(data)
return params
17 changes: 17 additions & 0 deletions axis/vapix/interfaces/parameters/io_port.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""I/O port parameters."""

from ...models.parameters.io_port import GetPortsResponse, Port
from .param_handler import ParamHandler


class IOPortParameterHandler(ParamHandler[Port]):
"""Handler for I/O port parameters."""

parameter_group = "IOPort"

def get_params(self) -> dict[str, Port]:
"""Retrieve I/O port parameters."""
params = {}
if data := self.vapix.params.get_param(self.parameter_group):
params.update(GetPortsResponse.from_dict(data).data)
return params
75 changes: 47 additions & 28 deletions axis/vapix/interfaces/parameters/param_cgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,24 @@
Lists Brand, Image, Ports, Properties, PTZ, Stream profiles.
"""

from typing import Any, cast
from typing import TYPE_CHECKING, Any

from ...models.parameters.brand import BrandParam, BrandT
from ...models.parameters.brand import BrandParam
from ...models.parameters.image import ImageParam
from ...models.parameters.param_cgi import ParamRequest, params_to_dict
from ...models.parameters.properties import PropertyParam
from ...models.parameters.stream_profile import StreamProfileParam
from ...models.stream_profile import StreamProfile
from ..api_handler import ApiHandler
from .brand import BrandParameterHandler
from .image import ImageParameterHandler
from .io_port import IOPortParameterHandler
from .properties import PropertyParameterHandler
from .ptz import PtzParameterHandler
from .stream_profile import StreamProfileParameterHandler

if TYPE_CHECKING:
from ...vapix import Vapix

PROPERTY = "Properties.API.HTTP.Version=3"

Expand Down Expand Up @@ -44,6 +53,17 @@
class Params(ApiHandler[Any]):
"""Represents all parameters of param.cgi."""

def __init__(self, vapix: "Vapix") -> None:
"""Initialize API items."""
super().__init__(vapix)

self.brand_handler = BrandParameterHandler(self)
self.image_handler = ImageParameterHandler(self)
self.io_port_handler = IOPortParameterHandler(self)
self.property_handler = PropertyParameterHandler(self)
self.ptz_handler = PtzParameterHandler(self)
self.stream_profile_handler = StreamProfileParameterHandler(self)

async def _api_request(self) -> dict[str, Any]:
"""Refresh data."""
await self.update()
Expand All @@ -58,8 +78,10 @@ async def update(self, group: str = "") -> None:
bytes_data = await self.vapix.new_request(ParamRequest(group))
data = params_to_dict(bytes_data.decode())
root = self._items.setdefault("root", {})
if "root" in data:
root.update(data["root"])
if objects := data.get("root"):
root.update(objects)
for obj_id in objects.keys():
self.signal_subscribers(obj_id)

def get_param(self, group: str) -> dict[str, Any]:
"""Get parameter group."""
Expand All @@ -69,39 +91,49 @@ def get_param(self, group: str) -> dict[str, Any]:

async def update_brand(self) -> None:
"""Update brand group of parameters."""
await self.update("Brand")
await self.brand_handler.update()

@property
def brand(self) -> BrandParam:
def brand(self) -> BrandParam | None:
"""Provide brand parameters."""
return BrandParam.decode(cast(BrandT, self.get_param("Brand")))
return self.brand_handler.get_params().get("0")

# Image

async def update_image(self) -> None:
"""Update image group of parameters."""
await self.update("Image")
await self.image_handler.update()

@property
def image_params(self) -> ImageParam:
"""Provide image parameters."""
return self.image_handler.get_params().get("0")
return ImageParam.decode(self.get_param("Image"))

@property
def image_sources(self) -> dict[str, Any]:
"""Image source information."""
return self.get_param("Image")
data = {}
if params := self.image_handler.get_params().get("0"):
data = params.data
return data

# Properties

async def update_properties(self) -> None:
"""Update properties group of parameters."""
await self.update("Properties")
await self.property_handler.update()
# await self.update("Properties")

@property
def properties(self) -> PropertyParam:
"""Provide property parameters."""
return PropertyParam.decode(self.get_param("Properties"))
return self.property_handler.get_params().get("0")
# data = {}
# if params := self.property_handler.get_params().get("0"):
# data = params
# return data
# return PropertyParam.decode(self.get_param("Properties"))

# PTZ

Expand All @@ -118,32 +150,19 @@ def ptz_data(self) -> dict[str, Any]:

async def update_stream_profiles(self) -> None:
"""Update stream profiles group of parameters."""
await self.update("StreamProfile")
await self.stream_profile_handler.update()

@property
def stream_profiles_params(self) -> StreamProfileParam:
"""Provide stream profiles parameters."""
return StreamProfileParam.decode(self.get_param("StreamProfile"))
return self.stream_profile_handler.get_params().get("0")

@property
def stream_profiles_max_groups(self) -> int:
"""Maximum number of supported stream profiles."""
return self.get_param("StreamProfile").get("MaxGroups", 0)
return self.stream_profile_handler.get_params().get("0").max_groups

@property
def stream_profiles(self) -> list[StreamProfile]:
"""Return a list of stream profiles."""
if not (data := self.get_param("StreamProfile")):
return []

profiles = dict(data)
del profiles["MaxGroups"]

return [
StreamProfile(
id=str(profile["Name"]),
description=str(profile["Description"]),
parameters=str(profile["Parameters"]),
)
for profile in profiles.values()
]
return self.stream_profile_handler.get_params().get("0").stream_profiles
44 changes: 44 additions & 0 deletions axis/vapix/interfaces/parameters/param_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Parameter handler sub class of API handler.
Generalises parameter specific handling like
- Subscribing to new data
- Defining parameter group
"""

from abc import abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .param_cgi import Params

from ...models.api import ApiItemT
from ...models.api_discovery import ApiId
from ..api_handler import ApiHandler


class ParamHandler(ApiHandler[ApiItemT]):
"""Base class for a map of API Items."""

parameter_group: str
api_id = ApiId.PARAM_CGI

def __init__(self, param_handler: "Params") -> None:
"""Initialize API items."""
super().__init__(param_handler.vapix)
param_handler.subscribe(self.update_params, self.parameter_group)

@abstractmethod
def get_params(self) -> dict[str, ApiItemT]:
"""Retrieve parameters from param_cgi class."""

def update_params(self, obj_id: str) -> None:
"""Update parameter data.
Callback from parameter handler subscription.
"""
self._items = self.get_params()

async def _api_request(self) -> dict[str, ApiItemT]:
"""Get API data method defined by subclass."""
await self.vapix.params.update(self.parameter_group)
return self.get_params()
17 changes: 17 additions & 0 deletions axis/vapix/interfaces/parameters/properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Property parameters."""

from ...models.parameters.properties import PropertyParam
from .param_handler import ParamHandler


class PropertyParameterHandler(ParamHandler[PropertyParam]):
"""Handler for property parameters."""

parameter_group = "Properties"

def get_params(self) -> dict[str, PropertyParam]:
"""Retrieve brand properties."""
params = {}
if data := self.vapix.params.get_param(self.parameter_group):
params["0"] = PropertyParam.decode(data)
return params
17 changes: 17 additions & 0 deletions axis/vapix/interfaces/parameters/ptz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""PTZ parameters."""

from ...models.parameters.ptz import GetPtzResponse, PtzItem
from .param_handler import ParamHandler


class PtzParameterHandler(ParamHandler[PtzItem]):
"""Handler for PTZ parameters."""

parameter_group = "PTZ"

def get_params(self) -> dict[str, PtzItem]:
"""Retrieve brand properties."""
params = {}
if data := self.vapix.params.get_param(self.parameter_group):
params.update(GetPtzResponse.from_dict(data).data)
return params
Loading

0 comments on commit d3abfa7

Please sign in to comment.