From 82db204333289025ad20be5816376f930186d404 Mon Sep 17 00:00:00 2001 From: Robert Svensson Date: Sun, 29 Oct 2023 22:27:18 +0100 Subject: [PATCH] Improve ports --- axis/vapix/interfaces/param_cgi.py | 41 ++--- axis/vapix/interfaces/port_cgi.py | 65 ++++--- axis/vapix/models/api.py | 5 + axis/vapix/models/port_cgi.py | 266 +++++++++++++++++++++-------- axis/vapix/vapix.py | 14 +- tests/test_param_cgi.py | 33 ++-- tests/test_port_cgi.py | 26 +-- 7 files changed, 284 insertions(+), 166 deletions(-) diff --git a/axis/vapix/interfaces/param_cgi.py b/axis/vapix/interfaces/param_cgi.py index 1ec0c379..b4b068a2 100644 --- a/axis/vapix/interfaces/param_cgi.py +++ b/axis/vapix/interfaces/param_cgi.py @@ -18,6 +18,7 @@ PTZParam, StreamProfileParam, ) +from ..models.port_cgi import GetPortsRequest, ListInputRequest, ListOutputRequest from ..models.stream_profile import StreamProfile from .api import APIItems @@ -239,11 +240,13 @@ def image_sources(self) -> dict: async def update_ports(self) -> None: """Update port groups of parameters.""" - await asyncio.gather( - self.update(INPUT), - self.update(IOPORT), - self.update(OUTPUT), + bytes_list = await asyncio.gather( + self.vapix.new_request(ListInputRequest()), + self.vapix.new_request(GetPortsRequest()), + self.vapix.new_request(ListOutputRequest()), ) + for bytes_data in bytes_list: + self.process_raw(bytes_data.decode()) @property def port_params(self) -> PortParam: @@ -262,33 +265,15 @@ def nbrofoutput(self) -> int: return int(self[OUTPUT]["NbrOfOutputs"]) # type: ignore @property - def ports(self) -> dict: + def ports(self) -> bytes: """Create a smaller dictionary containing all ports.""" if IOPORT not in self: - return {} - - attributes = ( - "Usage", - "Configurable", - "Direction", - "Input.Name", - "Input.Trig", - "Output.Active", - "Output.Button", - "Output.DelayTime", - "Output.Mode", - "Output.Name", - "Output.PulseTime", - ) - - ports = self.process_dynamic_group( - self[IOPORT], # type: ignore[arg-type] - "I", - attributes, - range(self.nbrofinput + self.nbrofoutput), - ) + return b"" - return ports + io_port = "" + for k, v in self[IOPORT].raw.items(): + io_port += f"root.IOPort.{k}={v}\n" + return io_port.encode() # Properties diff --git a/axis/vapix/interfaces/port_cgi.py b/axis/vapix/interfaces/port_cgi.py index e0ccdbbd..e3095f80 100644 --- a/axis/vapix/interfaces/port_cgi.py +++ b/axis/vapix/interfaces/port_cgi.py @@ -8,36 +8,47 @@ Virtual input API. """ -from ..models.port_cgi import ACTION_HIGH # noqa: F401 -from ..models.port_cgi import ACTION_LOW # noqa: F401 -from ..models.port_cgi import DIRECTION_IN # noqa: F401 -from ..models.port_cgi import DIRECTION_OUT # noqa: F401 -from ..models.port_cgi import Port -from ..models.port_cgi import URL # noqa: F401 -from .api import APIItems +from ..models.port_cgi import ( + GetPortsRequest, + GetPortsResponse, + Port, + PortAction, + PortActionRequest, + PortDirection, +) +from .api_handler import ApiHandler PROPERTY = "Properties.API.HTTP.Version=3" -class Ports(APIItems): +class Ports(ApiHandler): """Represents all ports of io/port.cgi.""" - item_cls = Port - path = "" - - def __init__(self, vapix) -> None: - """Initialize port cgi manager.""" - super().__init__(vapix, vapix.params.ports) - - async def update(self) -> None: - """Refresh data.""" - await self.vapix.params.update_ports() - self.process_raw(self.vapix.params.ports) - - @staticmethod - def pre_process_raw(ports: dict) -> dict: - """Pre process ports for process raw. - - Index needs to be a string. - """ - return {str(k): v for k, v in ports.items()} + async def _api_request(self) -> dict[str, Port]: + """Get API data method defined by subsclass.""" + return await self.get_ports() + + async def get_ports(self) -> dict[str, Port]: + """Retrieve privilege rights for current user.""" + bytes_data = await self.vapix.new_request(GetPortsRequest()) + return GetPortsResponse.decode(bytes_data).data + + def process_ports(self) -> dict[str, Port]: + """Process ports.""" + assert self.vapix.params is not None + return GetPortsResponse.decode(self.vapix.params.ports).data + + async def action(self, id: str, action: PortAction) -> None: + """Activate or deactivate an output.""" + port = self[id] + if port.direction != PortDirection.OUT: + return + await self.vapix.new_request(PortActionRequest(id, action.value)) + + async def open(self, id: str) -> None: + """Open port.""" + await self.action(id, PortAction.LOW) + + async def close(self, id: str) -> None: + """Close port.""" + await self.action(id, PortAction.HIGH) diff --git a/axis/vapix/models/api.py b/axis/vapix/models/api.py index 934d0da4..36530aa4 100644 --- a/axis/vapix/models/api.py +++ b/axis/vapix/models/api.py @@ -61,6 +61,11 @@ def data(self) -> dict[str, str] | None: """Request data.""" return None + @property + def params(self) -> dict[str, str] | None: + """Request query parameters.""" + return None + class APIItem: """Base class for all end points using APIItems class.""" diff --git a/axis/vapix/models/port_cgi.py b/axis/vapix/models/port_cgi.py index 4e025910..d0b08683 100644 --- a/axis/vapix/models/port_cgi.py +++ b/axis/vapix/models/port_cgi.py @@ -8,93 +8,209 @@ Virtual input API. """ -from typing import Callable -from urllib.parse import quote +from dataclasses import dataclass +import enum +from typing import Any, TypedDict -PROPERTY = "Properties.API.HTTP.Version=3" +from typing_extensions import NotRequired, Self -URL = "/axis-cgi/io/port.cgi" +from .api import ApiItem, ApiRequest, ApiResponse -ACTION_HIGH = "/" -ACTION_LOW = "\\" -DIRECTION_IN = "input" -DIRECTION_OUT = "output" +class InputPortT(TypedDict): + """Input port representation.""" + Name: str + Trig: str -class Port: - """Represents a port.""" - def __init__(self, id: str, raw: dict, request: Callable) -> None: - """Initialize port.""" - self.id = id - self.raw = raw - self._request = request +class OutputPortT(TypedDict): + """Output port representation.""" - @property - def configurable(self) -> bool: - """Is port configurable.""" - return self.raw.get("Configurable", False) + Active: str + Button: str + DelayTime: str + Mode: str + Name: str + PulseTime: str - @property - def direction(self) -> str: - """Port is configured to act as input or output. - Read-only for non-configurable ports. - """ - return self.raw.get("Direction", DIRECTION_IN) +class PortItemT(TypedDict): + """Port representation.""" - @property - def input_trig(self) -> str: - """When port should trigger. + Configurable: NotRequired[bool] + Direction: str + Usage: str + Input: NotRequired[InputPortT] + Output: NotRequired[OutputPortT] - closed=The input port triggers when the circuit is closed. - open=The input port triggers when the circuit is open. - """ - return self.raw.get("Input.Trig", "") - @property - def name(self) -> str: - """Return name relevant to direction.""" - if self.direction == DIRECTION_IN: - return self.raw.get("Input.Name", "") - return self.raw.get("Output.Name", "") +def params_to_dict( + params: str, starts_with: str | tuple[str, ...] | None = None +) -> dict[str, Any]: + """Convert params to dictionary.""" + param_dict: dict[str, Any] = {} + for line in params.splitlines(): + if starts_with is not None and not line.startswith(starts_with): + continue + travel(line, param_dict) + return param_dict + + +def travel(data: str, store: dict[str, Any]) -> None: + """Travel through data and add new keys and value to store. + + travel("root.IOPort.I1.Output.Active=closed", {})) + {'root': {'IOPort': {'I1': {'Output': {'Active': 'closed'}}}}} + """ + k, _, v = data.partition(".") # "root", ".", "IOPort.I1.Output.Active=closed" + travel(v, store.setdefault(k, {})) if v else store.update(dict([k.split("=")])) + + +class PortAction(enum.Enum): + """Port action.""" + + HIGH = "/" + LOW = "\\" + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls, value: object) -> "PortAction": + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN + + +class PortDirection(enum.Enum): + """Port action.""" + + IN = "input" + OUT = "output" + UNKNOWN = "unknown" + + @classmethod + def _missing_(cls, value: object) -> "PortDirection": + """Set default enum member if an unknown value is provided.""" + return cls.UNKNOWN + + +@dataclass +class Port(ApiItem): + """Represents a IO port.""" + + configurable: bool + """Is port configurable.""" + + direction: PortDirection + """Port is configured to act as input or output. + + Read-only for non-configurable ports. + """ + + input_trig: str + """When port should trigger. + + closed=The input port triggers when the circuit is closed. + open=The input port triggers when the circuit is open. + """ + + name: str + """Return name relevant to direction.""" + + output_active: str + """When is output port state active. + + closed=The output port is active when the circuit is closed. + open=The output port is active when the circuit is open. + """ + + @classmethod + def decode(cls, id: str, data: PortItemT) -> Self: + """Decode dict to class object.""" + direction = PortDirection(data.get("Direction", "input")) + name = ( + data.get("Input", {}).get("Name", "") + if direction == PortDirection.IN + else data.get("Output", {}).get("Name", "") + ) + return cls( + id=id, + configurable=data.get("Configurable") == "yes", + direction=direction, + input_trig=data.get("Input", {}).get("Trig", ""), + name=name, + output_active=data.get("Output", {}).get("Active", ""), + ) + + @classmethod + def from_dict(cls, data: dict[str, PortItemT]) -> dict[str, Self]: + """Create objects from dict.""" + ports = [cls.decode(k, v) for k, v in data.items()] + return {port.id: port for port in ports} + + +@dataclass +class GetPortsRequest(ApiRequest): + """Request object for listing IO ports.""" + + method = "get" + path = "/axis-cgi/param.cgi?action=list&group=root.IOPort" + content_type = "text/plain" + + +@dataclass +class GetPortsResponse(ApiResponse[dict[str, Port]]): + """Response object for listing ports.""" + + @classmethod + def decode(cls, bytes_data: bytes) -> Self: + """Prepare API description dictionary.""" + data = bytes_data.decode() + ports = params_to_dict(data, "root.IOPort").get("root", {}).get("IOPort", {}) + return cls(Port.from_dict({k[1:]: v for k, v in ports.items()})) + + +@dataclass +class ListOutputRequest(ApiRequest): + """Request object for listing number of outputs.""" + + method = "get" + path = "/axis-cgi/param.cgi?action=list&group=root.Output" + content_type = "text/plain" + + +@dataclass +class ListInputRequest(ApiRequest): + """Request object for listing number of inputs.""" + + method = "get" + path = "/axis-cgi/param.cgi?action=list&group=root.Input" + content_type = "text/plain" + + +@dataclass +class PortActionRequest(ApiRequest): + r"""Request object for activate or deactivate an output. + + Use the option to activate/deactivate the port for a + limited period of time. + = Port name. Default: Name from Output.Name + = Action character. /=active, \=inactive + = Delay before the next action. Unit: milliseconds + Note: The :, / and \ characters must be percent-encoded in the URI. + See Percent encoding. + Example: + To set output 1 to active, use 1:/. + In the URI, the action argument becomes action=1%3A%2F + """ + + method = "get" + path = "/axis-cgi/io/port.cgi" + content_type = "text/plain" + + port: str + action: str @property - def output_active(self) -> str: - """When is output port state active. - - closed=The output port is active when the circuit is closed. - open=The output port is active when the circuit is open. - """ - return self.raw.get("Output.Active", "") - - async def action(self, action: str) -> None: - r"""Activate or deactivate an output. - - Use the option to activate/deactivate the port for a - limited period of time. - = Port name. Default: Name from Output.Name - = Action character. /=active, \=inactive - = Delay before the next action. Unit: milliseconds - Note: The :, / and \ characters must be percent-encoded in the URI. - See Percent encoding. - Example: - To set output 1 to active, use 1:/. - In the URI, the action argument becomes action=1%3A%2F - """ - if not self.direction == DIRECTION_OUT: - return - - port_action = quote(f"{int(self.id) + 1}:{action}", safe="") - url = URL + f"?action={port_action}" - - await self._request("get", url) - - async def open(self) -> None: - """Open port.""" - await self.action(ACTION_LOW) - - async def close(self) -> None: - """Close port.""" - await self.action(ACTION_HIGH) + def params(self) -> dict[str, str]: + """Request query parameters.""" + return {"action": f"{int(self.port) + 1}:{self.action}"} diff --git a/axis/vapix/vapix.py b/axis/vapix/vapix.py index e0d1434f..1a4d34f3 100644 --- a/axis/vapix/vapix.py +++ b/axis/vapix/vapix.py @@ -60,7 +60,6 @@ def __init__(self, device: "AxisDevice") -> None: self.motion_guard: MotionGuard | None = None self.object_analytics: ObjectAnalytics | None = None self.params: Params | None = None - self._ports: Ports | None = None self.ptz: PtzControl | None = None self.vmd4: Vmd4 | None = None @@ -76,6 +75,8 @@ def __init__(self, device: "AxisDevice") -> None: self.stream_profiles = StreamProfilesHandler(self) self.view_areas = ViewAreaHandler(self) + self.port_cgi = Ports(self) + @property def firmware_version(self) -> str: """Firmware version of device.""" @@ -121,8 +122,8 @@ def streaming_profiles(self) -> list: @property def ports(self) -> IoPortManagement | Ports: """Temporary port property.""" - if not self.io_port_management.supported() and self._ports is not None: - return self._ports + if not self.io_port_management.supported(): + return self.port_cgi return self.io_port_management async def initialize(self) -> None: @@ -214,7 +215,7 @@ async def initialize_param_cgi(self, preload_data: bool = True) -> None: pass if not self.io_port_management.supported(): - self._ports = Ports(self) + self.port_cgi._items = self.port_cgi.process_ports() if not self.ptz and self.params.ptz: self.ptz = PtzControl(self) @@ -335,6 +336,7 @@ async def new_request(self, api_request: ApiRequest) -> bytes: path=api_request.path, content=api_request.content, data=api_request.data, + params=api_request.params, ) async def do_request( @@ -343,10 +345,11 @@ async def do_request( path: str, content: bytes | None = None, data: dict[str, str] | None = None, + params: dict[str, str] | None = None, ) -> bytes: """Make a request to the device.""" url = self.device.config.url + path - LOGGER.debug("%s, %s, %s, %s", method, url, content, data) + LOGGER.debug("%s, %s, %s, %s %s", method, url, content, data, params) try: response = await self.device.config.session.request( @@ -354,6 +357,7 @@ async def do_request( url, content=content, data=data, + params=params, auth=self.auth, timeout=TIME_OUT, ) diff --git a/tests/test_param_cgi.py b/tests/test_param_cgi.py index e9d4f4b9..d0b36a0d 100644 --- a/tests/test_param_cgi.py +++ b/tests/test_param_cgi.py @@ -148,14 +148,10 @@ async def test_params(params: Params): # Ports assert params.nbrofinput == 1 assert params.nbrofoutput == 0 - assert params.ports == { - 0: { - "Configurable": False, - "Direction": "input", - "Input.Name": "PIR sensor", - "Input.Trig": "closed", - } - } + assert params.ports == ( + b"root.IOPort.I0.Configurable=no\nroot.IOPort.I0.Direction=input\n" + + b"root.IOPort.I0.Input.Name=PIR sensor\nroot.IOPort.I0.Input.Trig=closed\n" + ) # Properties property_params = params.property_params @@ -385,17 +381,18 @@ async def test_update_ports(params: Params): assert output_route.calls.last.request.url.path == "/axis-cgi/param.cgi" assert port_param.nbr_of_input == params.nbrofinput == 1 - assert ( - port_param.ports - == params.ports - == { - 0: { - "Configurable": False, - "Direction": "input", - "Input.Name": "PIR sensor", - "Input.Trig": "closed", - } + assert port_param.ports == { + 0: { + "Configurable": False, + "Direction": "input", + "Input.Name": "PIR sensor", + "Input.Trig": "closed", } + } + assert ( + params.ports + == b"root.IOPort.I0.Configurable=no\nroot.IOPort.I0.Direction=input\n" + + b"root.IOPort.I0.Input.Name=PIR sensor\nroot.IOPort.I0.Input.Trig=closed\n" ) assert port_param.nbr_of_output == params.nbrofoutput == 0 diff --git a/tests/test_port_cgi.py b/tests/test_port_cgi.py index 2e73d986..edf90be4 100644 --- a/tests/test_port_cgi.py +++ b/tests/test_port_cgi.py @@ -7,7 +7,8 @@ import respx from axis.vapix.interfaces.param_cgi import Params -from axis.vapix.interfaces.port_cgi import ACTION_LOW, Ports +from axis.vapix.interfaces.port_cgi import Ports +from axis.vapix.models.port_cgi import PortAction, PortDirection from .conftest import HOST @@ -20,7 +21,6 @@ def ports(axis_device) -> Ports: @respx.mock -@pytest.mark.asyncio async def test_ports(ports): """Test that different types of ports work.""" update_ports_route = respx.route( @@ -63,43 +63,43 @@ async def test_ports(ports): await ports.update() - assert update_ports_route.call_count == 3 + assert update_ports_route.call_count == 1 + # assert update_ports_route.call_count == 3 assert ports["0"].id == "0" assert ports["0"].configurable is False - assert ports["0"].direction == "input" + assert ports["0"].direction == PortDirection.IN assert ports["0"].name == "" - await ports["0"].action(action=ACTION_LOW) + await ports.action("0", action=PortAction.LOW) assert not action_low_route.called assert ports["1"].id == "1" assert ports["1"].configurable is False - assert ports["1"].direction == "input" + assert ports["1"].direction == PortDirection.IN assert ports["1"].name == "PIR sensor" assert ports["1"].input_trig == "closed" assert ports["2"].id == "2" assert ports["2"].configurable is False - assert ports["2"].direction == "input" + assert ports["2"].direction == PortDirection.IN assert ports["2"].name == "" assert ports["2"].output_active == "closed" assert ports["3"].id == "3" assert ports["3"].configurable is False - assert ports["3"].direction == "output" + assert ports["3"].direction == PortDirection.OUT assert ports["3"].name == "Tampering" assert ports["3"].output_active == "open" - await ports["3"].close() - + await ports.close("3") assert action_low_route.called assert action_low_route.calls.last.request.method == "GET" assert action_low_route.calls.last.request.url.path == "/axis-cgi/io/port.cgi" assert action_low_route.calls.last.request.url.query.decode() == "action=4%3A%2F" - await ports["3"].open() + await ports.open("3") assert action_high_route.called assert action_high_route.calls.last.request.method == "GET" assert action_high_route.calls.last.request.url.path == "/axis-cgi/io/port.cgi" @@ -107,7 +107,6 @@ async def test_ports(ports): @respx.mock -@pytest.mark.asyncio async def test_no_ports(ports): """Test that no ports also work.""" route = respx.route(url__startswith=f"http://{HOST}/axis-cgi/param.cgi").respond( @@ -117,5 +116,6 @@ async def test_no_ports(ports): await ports.update() - assert route.call_count == 3 + assert route.call_count == 1 + # assert route.call_count == 3 assert len(ports.values()) == 0