diff --git a/axis/vapix/interfaces/port_management.py b/axis/vapix/interfaces/port_management.py index 82523da6..0c5371ef 100644 --- a/axis/vapix/interfaces/port_management.py +++ b/axis/vapix/interfaces/port_management.py @@ -1,89 +1,69 @@ """I/O Port Management API. -The I/O port management API makes it possible to retrieve information about the ports and apply product dependent configurations +The I/O port management API makes it possible to retrieve +information about the ports and apply product dependent configurations """ -import attr - -from ..models.port_management import Port, PortSequence -from ..models.port_management import Sequence # noqa: F401 -from ..models.port_management import SetPort # noqa: F401 -from .api import APIItems, Body - -URL = "/axis-cgi/io/portmanagement.cgi" - -API_DISCOVERY_ID = "io-port-management" -API_VERSION = "1.0" - - -class IoPortManagement(APIItems): +from ..models.api_discovery import ApiId +from ..models.port_management import ( + API_VERSION, + GetPortsRequest, + GetPortsResponse, + GetSupportedVersionsRequest, + GetSupportedVersionsResponse, + Port, + PortConfiguration, + Sequence, + SetPortsRequest, + SetStateSequenceRequest, +) +from .api_handler import ApiHandler + + +class IoPortManagement(ApiHandler): """I/O port management for Axis devices.""" - item_cls = Port - path = URL + api_id = ApiId.IO_PORT_MANAGEMENT + default_api_version = API_VERSION - async def update(self) -> None: - """Refresh data.""" - raw = await self.get_ports() - self.process_raw(raw) + async def _api_request(self) -> dict[str, Port]: + """Get default data of I/O port management.""" + return await self.get_ports() - @staticmethod - def pre_process_raw(raw: dict) -> dict: - """Return a dictionary of ports.""" - if not raw: - return {} + async def get_ports(self) -> dict[str, Port]: + """List all APIs registered on API Discovery service.""" + bytes_data = await self.vapix.new_request(GetPortsRequest()) + print(bytes_data) + return GetPortsResponse.decode(bytes_data).data - if raw.get("data", {}).get("numberOfPorts", 0) == 0: - return {} - - ports = raw["data"]["items"] - return {port["port"]: port for port in ports} - - async def get_ports(self) -> dict: - """Retrieve information about all ports on the device and their capabilities.""" - return await self.vapix.request( - "post", - URL, - json=attr.asdict( - Body("getPorts", API_VERSION), - filter=attr.filters.exclude(attr.fields(Body).params), - ), - ) - - async def set_ports(self, ports: list) -> None: + async def set_ports(self, ports: list[PortConfiguration]) -> None: """Configure one or more ports. Some of the available options are: * Setting a nice name that can be used in the user interface. - * Configuring the states and what constitutes a normal and triggered state respectively. + * Configuring the states and what constitutes a normal + and triggered state respectively. This will make triggers activate in either open or closed circuits. - The reason the change is treated as a nice name is because it doesn’t affect the underlying behavior of the port. - Devices with configurable ports can change the direction to either input or output. + The reason the change is treated as a nice name is because it doesn’t + affect the underlying behavior of the port. + Devices with configurable ports can change the direction + to either input or output. """ - await self.vapix.request( - "post", - URL, - json=attr.asdict( - Body("setPorts", API_VERSION, params=ports), - filter=lambda attr, value: value is not None, - ), - ) + await self.vapix.new_request(SetPortsRequest(ports)) - async def set_state_sequence(self, sequence: PortSequence) -> None: + async def set_state_sequence(self, port_id: str, sequence: list[Sequence]) -> None: """Apply a sequence of state changes with a delay in milliseconds between states.""" - await self.vapix.request( - "post", - URL, - json=attr.asdict(Body("setStateSequence", API_VERSION, params=sequence)), - ) + await self.vapix.new_request(SetStateSequenceRequest(port_id, sequence)) + + async def get_supported_versions(self) -> list[str]: + """List supported API versions.""" + bytes_data = await self.vapix.new_request(GetSupportedVersionsRequest()) + return GetSupportedVersionsResponse.decode(bytes_data).data + + async def open(self, port_id: str) -> None: + """Shortcut method to open a port.""" + await self.set_ports([PortConfiguration(port_id, state="open")]) - async def get_supported_versions(self) -> dict: - """Retrieve a list of supported API versions.""" - return await self.vapix.request( - "post", - URL, - json=attr.asdict( - Body("getSupportedVersions", API_VERSION), - filter=attr.filters.include(attr.fields(Body).method), - ), - ) + async def close(self, port_id: str) -> None: + """Shortcut method to close a port.""" + await self.set_ports([PortConfiguration(port_id, state="closed")]) diff --git a/axis/vapix/models/port_management.py b/axis/vapix/models/port_management.py index 7bf4e8af..3d52088c 100644 --- a/axis/vapix/models/port_management.py +++ b/axis/vapix/models/port_management.py @@ -1,109 +1,334 @@ """I/O Port Management API. -The I/O port management API makes it possible to retrieve information about the ports and apply product dependent configurations +The I/O port management API makes it possible to retrieve +information about the ports and apply product dependent configurations """ -import attr +from dataclasses import dataclass +from typing import Literal, TypedDict -from ..interfaces.api import Body -from .api import APIItem +import orjson +from typing_extensions import NotRequired, Self -URL = "/axis-cgi/io/portmanagement.cgi" +from .api import CONTEXT, ApiItem, ApiRequest, ApiResponse -API_DISCOVERY_ID = "io-port-management" API_VERSION = "1.0" -@attr.s -class SetPort: - """Port configuration class.""" +class ErrorDataT(TypedDict): + """Error data in response.""" - port: str = attr.ib() - usage: str = attr.ib(default=None) - direction: str = attr.ib(default=None) - name: str = attr.ib(default=None) - normalState: str = attr.ib(default=None) - state: str = attr.ib(default=None) + code: int + message: str -@attr.s -class PortSequence: - """Port sequence class.""" +class PortItemT(TypedDict): + """""" + + port: str + configurable: bool + usage: str + name: str + direction: str + state: str + normalState: str + + +class PortDataT(TypedDict): + """""" + + numberOfPorts: int + items: list[PortItemT] + + +class SequenceT(TypedDict): + """""" + + state: Literal["open", "closed"] + time: int + + +class GetPortsResponseT(TypedDict): + """""" + + apiVersion: str + context: str + method: str + data: PortDataT + error: NotRequired[ErrorDataT] + + +class ApiVersionsT(TypedDict): + """List of supported API versions.""" + + apiVersions: list[str] + + +class GetSupportedVersionsResponseT(TypedDict): + """ListApis response.""" + + apiVersion: str + context: str + method: str + data: ApiVersionsT + error: NotRequired[ErrorDataT] - port: str = attr.ib() - sequence: list = attr.ib(factory=list) +error_codes = { + 1000: "Invalid parameter value specified", + 2002: "HTTP request type not supported. Only POST is supported", + 2003: "Requested API version is not supported", + 2004: "Method not supported", + 4000: "Invalid JSON", + 4002: "Required parameter missing or invalid", + 8000: "Internal error", +} -@attr.s + +@dataclass +class PortConfiguration: + """Port configuration used with set ports.""" + + port: str + usage: str | None = None + direction: str | None = None + name: str | None = None + normal_state: str | None = None + state: str | None = None + + def to_dict(self) -> dict[str, str]: + """Convert to dictionary with populated fields.""" + data: dict[str, str] = {"port": self.port} + if self.usage is not None: + data["usage"] = self.usage + if self.direction is not None: + data["direction"] = self.direction + if self.name is not None: + data["name"] = self.name + if self.normal_state is not None: + data["normalState"] = self.normal_state + if self.state is not None: + data["state"] = self.state + return data + + +@dataclass class Sequence: """Sequence class.""" - state: str = attr.ib() - time: int = attr.ib() + state: Literal["open", "closed"] + time: int + def to_dict(self) -> SequenceT: + """Convert to dictionary.""" + return {"state": self.state, "time": self.time} + + +@dataclass +class PortSequence: + """Port sequence class.""" -class Port(APIItem): + port: str + sequence: list[Sequence] + + +@dataclass +class Port(ApiItem): """I/O port management port.""" - @property - def configurable(self) -> bool: - """Is port configurable.""" - return self.raw["configurable"] + configurable: bool + """Is port configurable.""" - @property - def direction(self) -> str: - """Direction of port. + direction: str + """Direction of port. - . - """ - return self.raw["direction"] + . + """ - @property - def name(self) -> str: - """Name of port.""" - return self.raw["name"] + name: str + """Name of port.""" + + normalState: str + """Port normal state. + + . + """ + + state: str + """State of port. + + . + """ + + usage: str + """Usage of port.""" + + @classmethod + def from_dict(cls, data: PortItemT) -> Self: + """Create port object from dict.""" + return cls( + id=data["port"], + configurable=data["configurable"], + direction=data["direction"], + name=data["name"], + normalState=data["normalState"], + state=data["state"], + usage=data["usage"], + ) + + @classmethod + def from_list(cls, data: list[PortItemT]) -> dict[str, Self]: + """Create port objects from list.""" + ports = [cls.from_dict(item) for item in data] + return {port.id: port for port in ports} + + +@dataclass +class GetPortsRequest(ApiRequest): + """Request object for listing ports.""" + + method = "post" + path = "/axis-cgi/io/portmanagement.cgi" + content_type = "application/json" + error_codes = error_codes + + api_version: str = API_VERSION + context: str = CONTEXT @property - def normalState(self) -> str: - """Port normal state. + def content(self) -> bytes: + """Initialize request data.""" + return orjson.dumps( + { + "apiVersion": self.api_version, + "context": self.context, + "method": "getPorts", + } + ) + + +@dataclass +class GetPortsResponse(ApiResponse[dict[str, Port]]): + """Response object for listing ports.""" + + api_version: str + context: str + method: str + data: dict[str, Port] + # error: ErrorDataT | None = None + + @classmethod + def decode(cls, bytes_data: bytes) -> Self: + """Prepare API description dictionary.""" + data: GetPortsResponseT = orjson.loads(bytes_data) + return cls( + api_version=data["apiVersion"], + context=data["context"], + method=data["method"], + data=Port.from_list(data["data"]["items"]), + ) + + +@dataclass +class SetPortsRequest(ApiRequest): + """Request object for configuring ports.""" - . - """ - return self.raw["normalState"] + method = "post" + path = "/axis-cgi/io/portmanagement.cgi" + content_type = "application/json" + error_codes = error_codes + + port_config: list[PortConfiguration] | PortConfiguration + + api_version: str = API_VERSION + context: str = CONTEXT @property - def port(self) -> str: - """Index of port.""" - return self.raw["port"] + def content(self) -> bytes: + """Initialize request data.""" + if not isinstance(self.port_config, list): + self.port_config = [self.port_config] + ports: list[dict[str, str]] = [port.to_dict() for port in self.port_config] + + return orjson.dumps( + { + "apiVersion": self.api_version, + "context": self.context, + "method": "setPorts", + "params": ports, + } + ) + + +@dataclass +class SetStateSequenceRequest(ApiRequest): + """Request object for configuring ports.""" + + method = "post" + path = "/axis-cgi/io/portmanagement.cgi" + content_type = "application/json" + error_codes = error_codes + + port: str + sequence: list[Sequence] + + api_version: str = API_VERSION + context: str = CONTEXT @property - def state(self) -> str: - """State of port. + def content(self) -> bytes: + """Initialize request data.""" + sequence = [item.to_dict() for item in self.sequence] - . - """ - return self.raw["state"] + return orjson.dumps( + { + "apiVersion": self.api_version, + "context": self.context, + "method": "setStateSequence", + "params": {"port": self.port, "sequence": sequence}, + } + ) + + +@dataclass +class GetSupportedVersionsRequest(ApiRequest): + """Request object for listing supported API versions.""" + + method = "post" + path = "/axis-cgi/io/portmanagement.cgi" + content_type = "application/json" + error_codes = error_codes + + context: str = CONTEXT @property - def usage(self) -> str: - """Usage of port.""" - return self.raw["usage"] - - async def set_state(self, set_port: SetPort) -> None: - """Set port state.""" - await self._request( - "post", - URL, - json=attr.asdict( - Body("setPorts", API_VERSION, params=[set_port]), - filter=lambda attr, value: value is not None, - ), + def content(self) -> bytes: + """Initialize request data.""" + return orjson.dumps( + { + "context": self.context, + "method": "getSupportedVersions", + } ) - async def open(self) -> None: - """Open port.""" - await self.set_state(SetPort(self.port, state="open")) - async def close(self) -> None: - """Close port.""" - await self.set_state(SetPort(self.port, state="closed")) +@dataclass +class GetSupportedVersionsResponse(ApiResponse[list[str]]): + """Response object for supported versions.""" + + api_version: str + context: str + method: str + data: list[str] + # error: ErrorDataT | None = None + + @classmethod + def decode(cls, bytes_data: bytes) -> Self: + """Prepare API description dictionary.""" + data: GetSupportedVersionsResponseT = orjson.loads(bytes_data) + return cls( + api_version=data["apiVersion"], + context=data["context"], + method=data["method"], + data=data.get("data", {}).get("apiVersions", []), + ) diff --git a/axis/vapix/vapix.py b/axis/vapix/vapix.py index f14b284f..1804ec21 100644 --- a/axis/vapix/vapix.py +++ b/axis/vapix/vapix.py @@ -28,10 +28,7 @@ from .interfaces.param_cgi import Params from .interfaces.pir_sensor_configuration import PirSensorConfigurationHandler from .interfaces.port_cgi import Ports -from .interfaces.port_management import ( - API_DISCOVERY_ID as IO_PORT_MANAGEMENT_ID, - IoPortManagement, -) +from .interfaces.port_management import IoPortManagement from .interfaces.ptz import PtzControl from .interfaces.pwdgrp_cgi import Users from .interfaces.stream_profiles import StreamProfilesHandler @@ -62,7 +59,7 @@ def __init__(self, device: "AxisDevice") -> None: self.motion_guard: MotionGuard | None = None self.object_analytics: ObjectAnalytics | None = None self.params: Params | None = None - self.ports: IoPortManagement | Ports | None = None + self._ports: Ports | None = None self.ptz: PtzControl | None = None self.user_groups: UserGroups | None = None self.users: Users | None = None @@ -70,6 +67,7 @@ def __init__(self, device: "AxisDevice") -> None: self.api_discovery: ApiDiscoveryHandler = ApiDiscoveryHandler(self) self.basic_device_info = BasicDeviceInfoHandler(self) + self.io_port_management = IoPortManagement(self) self.light_control = LightHandler(self) self.mqtt = MqttClientHandler(self) self.pir_sensor_configuration = PirSensorConfigurationHandler(self) @@ -118,6 +116,13 @@ def streaming_profiles(self) -> list: return list(self.stream_profiles.values()) return self.params.stream_profiles # type: ignore[union-attr] + @property + def ports(self) -> IoPortManagement | Ports: + """Temporary port property.""" + if not self.io_port_management.supported() and self._ports is not None: + return self._ports + return self.io_port_management + async def initialize(self) -> None: """Initialize Vapix functions.""" await self.initialize_api_discovery() @@ -143,11 +148,6 @@ async def initialize_api_discovery(self) -> None: except PathNotFound: # Device doesn't support API discovery return - tasks = [] - - if IO_PORT_MANAGEMENT_ID in self.api_discovery: - tasks.append(self._initialize_api_attribute(IoPortManagement, "ports")) - async def do_api_request(api: ApiHandler) -> None: """Try update of API.""" try: @@ -165,6 +165,11 @@ async def do_api_request(api: ApiHandler) -> None: self.stream_profiles, self.view_areas, ) + if self.io_port_management.supported(): + apis += (self.io_port_management,) + + tasks = [] + for api in apis: if not api.supported(): continue @@ -189,7 +194,7 @@ async def initialize_param_cgi(self, preload_data: bool = True) -> None: if not self.basic_device_info.supported(): tasks.append(self.params.update_brand()) - if not self.ports: + if not self.io_port_management.supported(): tasks.append(self.params.update_ports()) if not self.stream_profiles.supported(): @@ -207,8 +212,8 @@ async def initialize_param_cgi(self, preload_data: bool = True) -> None: except Unauthorized: # Probably a viewer account pass - if not self.ports: - self.ports = Ports(self) + if not self.io_port_management.supported(): + self._ports = Ports(self) if not self.ptz and self.params.ptz: self.ptz = PtzControl(self) diff --git a/tests/test_port_management.py b/tests/test_port_management.py index 9dee5138..23eca25e 100644 --- a/tests/test_port_management.py +++ b/tests/test_port_management.py @@ -8,12 +8,8 @@ import pytest import respx -from axis.vapix.interfaces.port_management import ( - IoPortManagement, - PortSequence, - Sequence, - SetPort, -) +from axis.vapix.interfaces.port_management import IoPortManagement +from axis.vapix.models.port_management import PortConfiguration, Sequence from .conftest import HOST @@ -47,7 +43,6 @@ async def test_get_ports(io_port_management): item = io_port_management["0"] assert item.id == "0" - assert item.port == "0" assert item.name == "PIR sensor" assert item.configurable is False assert item.usage == "" @@ -55,7 +50,7 @@ async def test_get_ports(io_port_management): assert item.state == "open" assert item.normalState == "open" - await item.open() + await io_port_management.open("0") assert route.called assert route.calls.last.request.method == "POST" @@ -67,7 +62,7 @@ async def test_get_ports(io_port_management): "params": [{"port": "0", "state": "open"}], } - await item.close() + await io_port_management.close("0") assert route.called assert route.calls.last.request.method == "POST" @@ -86,7 +81,7 @@ async def test_set_ports(io_port_management): """Test set_ports call.""" route = respx.post(f"http://{HOST}:80/axis-cgi/io/portmanagement.cgi") - await io_port_management.set_ports([SetPort("0", state="closed")]) + await io_port_management.set_ports([PortConfiguration("0", state="closed")]) assert route.called assert route.calls.last.request.method == "POST" @@ -106,7 +101,7 @@ async def test_set_state_sequence(io_port_management): route = respx.post(f"http://{HOST}:80/axis-cgi/io/portmanagement.cgi") await io_port_management.set_state_sequence( - PortSequence("0", [Sequence("open", 3000), Sequence("closed", 5000)]) + "0", [Sequence("open", 3000), Sequence("closed", 5000)] ) assert route.called @@ -140,9 +135,10 @@ async def test_get_supported_versions(io_port_management): assert route.calls.last.request.method == "POST" assert route.calls.last.request.url.path == "/axis-cgi/io/portmanagement.cgi" assert json.loads(route.calls.last.request.content) == { - "method": "getSupportedVersions" + "method": "getSupportedVersions", + "context": "Axis library", } - assert response["data"] == {"apiVersions": ["1.0"]} + assert response == ["1.0"] response_getPorts = { @@ -166,6 +162,8 @@ async def test_get_supported_versions(io_port_management): } response_getSupportedVersions = { + "apiVersion": "1.0", + "context": "", "method": "getSupportedVersions", "data": {"apiVersions": ["1.0"]}, } diff --git a/tests/test_vapix.py b/tests/test_vapix.py index 3310e16c..a8b2ace0 100644 --- a/tests/test_vapix.py +++ b/tests/test_vapix.py @@ -188,7 +188,8 @@ async def test_initialize_api_discovery_unauthorized(vapix: Vapix): await vapix.initialize_api_discovery() assert len(vapix.basic_device_info) == 0 - assert vapix.ports is None + assert len(vapix.ports) == 0 + assert vapix.ports == vapix.io_port_management assert vapix.light_control is not None assert vapix.mqtt is not None assert len(vapix.stream_profiles) == 0