diff --git a/src/arcam/fmj/__init__.py b/src/arcam/fmj/__init__.py index 6044e89..1900a11 100644 --- a/src/arcam/fmj/__init__.py +++ b/src/arcam/fmj/__init__.py @@ -4,44 +4,56 @@ import logging import re from asyncio.exceptions import IncompleteReadError -from typing import Dict, Iterable, Optional, SupportsBytes, Tuple, Type, TypeVar, Union, Set, Literal, SupportsIndex +from typing import ( + Dict, + Iterable, + Optional, + SupportsBytes, + Tuple, + Type, + TypeVar, + Union, + Set, + Literal, + SupportsIndex, +) import attr -PROTOCOL_STR = b'\x21' -PROTOCOL_ETR = b'\x0D' -PROTOCOL_EOF = b'' +PROTOCOL_STR = b"\x21" +PROTOCOL_ETR = b"\x0D" +PROTOCOL_EOF = b"" _LOGGER = logging.getLogger(__name__) _WRITE_TIMEOUT = 3 _READ_TIMEOUT = 3 + class ArcamException(Exception): pass + class ConnectionFailed(ArcamException): pass + class NotConnectedException(ArcamException): pass + class ResponseException(ArcamException): def __init__(self, ac=None, zn=None, cc=None, data=None): self.ac = ac self.zn = zn self.cc = cc self.data = data - super().__init__("'ac':{}, 'zn':{}, 'cc':{}, 'data':{}".format( - ac, zn, cc, data - )) + super().__init__( + "'ac':{}, 'zn':{}, 'cc':{}, 'data':{}".format(ac, zn, cc, data) + ) @staticmethod - def from_response(response: 'ResponsePacket'): - kwargs = { - 'zn': response.zn, - 'cc': response.cc, - 'data': response.data - } + def from_response(response: "ResponsePacket"): + kwargs = {"zn": response.zn, "cc": response.cc, "data": response.data} if response.ac == AnswerCodes.ZONE_INVALID: return InvalidZoneException(**kwargs) elif response.ac == AnswerCodes.COMMAND_NOT_RECOGNISED: @@ -55,43 +67,82 @@ def from_response(response: 'ResponsePacket'): else: return ResponseException(ac=response.ac, **kwargs) + class InvalidZoneException(ResponseException): def __init__(self, zn=None, cc=None, data=None): - super().__init__(ac=AnswerCodes.ZONE_INVALID, - zn=zn, cc=cc, data=data) + super().__init__(ac=AnswerCodes.ZONE_INVALID, zn=zn, cc=cc, data=data) + class CommandNotRecognised(ResponseException): def __init__(self, zn=None, cc=None, data=None): - super().__init__(ac=AnswerCodes.COMMAND_NOT_RECOGNISED, - zn=zn, cc=cc, data=data) + super().__init__(ac=AnswerCodes.COMMAND_NOT_RECOGNISED, zn=zn, cc=cc, data=data) + class ParameterNotRecognised(ResponseException): def __init__(self, zn=None, cc=None, data=None): - super().__init__(ac=AnswerCodes.PARAMETER_NOT_RECOGNISED, - zn=zn, cc=cc, data=data) + super().__init__( + ac=AnswerCodes.PARAMETER_NOT_RECOGNISED, zn=zn, cc=cc, data=data + ) + class CommandInvalidAtThisTime(ResponseException): def __init__(self, zn=None, cc=None, data=None): - super().__init__(ac=AnswerCodes.COMMAND_INVALID_AT_THIS_TIME, - zn=zn, cc=cc, data=data) + super().__init__( + ac=AnswerCodes.COMMAND_INVALID_AT_THIS_TIME, zn=zn, cc=cc, data=data + ) + class InvalidDataLength(ResponseException): def __init__(self, zn=None, cc=None, data=None): - super().__init__(ac=AnswerCodes.INVALID_DATA_LENGTH, - zn=zn, cc=cc, data=data) + super().__init__(ac=AnswerCodes.INVALID_DATA_LENGTH, zn=zn, cc=cc, data=data) + class InvalidPacket(ArcamException): pass + class NullPacket(ArcamException): pass + APIVERSION_450_SERIES = {"AVR380", "AVR450", "AVR750"} APIVERSION_860_SERIES = {"AV860", "AVR850", "AVR550", "AVR390", "SR250"} APIVERSION_SA_SERIES = {"SA10", "SA20", "SA30"} -APIVERSION_HDA_SERIES = {"AVR5", "AVR10", "AVR20", "AVR30", "AV40", "AVR11", "AVR21", "ARV31", "AV41", "SDP-55", "SDP-58"} -APIVERSION_HDA_PREMIUM_SERIES = {"AVR10", "AVR20", "AVR30", "AV40", "AVR11", "AVR21", "ARV31", "AV41", "SDP-55", "SDP-58"} -APIVERSION_HDA_MULTI_ZONE_SERIES = {"AVR20", "AVR30", "AV40", "AVR21", "ARV31", "AV41", "SDP-55", "SDP-58"} +APIVERSION_HDA_SERIES = { + "AVR5", + "AVR10", + "AVR20", + "AVR30", + "AV40", + "AVR11", + "AVR21", + "ARV31", + "AV41", + "SDP-55", + "SDP-58", +} +APIVERSION_HDA_PREMIUM_SERIES = { + "AVR10", + "AVR20", + "AVR30", + "AV40", + "AVR11", + "AVR21", + "ARV31", + "AV41", + "SDP-55", + "SDP-58", +} +APIVERSION_HDA_MULTI_ZONE_SERIES = { + "AVR20", + "AVR30", + "AV40", + "AVR21", + "ARV31", + "AV41", + "SDP-55", + "SDP-58", +} APIVERSION_PA_SERIES = {"PA720", "PA240", "PA410"} APIVERSION_ST_SERIES = {"ST60"} @@ -136,6 +187,7 @@ class NullPacket(ArcamException): APIVERSION_APP_SAFETY_SERIES = {"SA30"} + class ApiModel(enum.Enum): API450_SERIES = 1 API860_SERIES = 2 @@ -144,7 +196,10 @@ class ApiModel(enum.Enum): APIPA_SERIES = 5 APIST_SERIES = 6 + _T = TypeVar("_T", bound="IntOrTypeEnum") + + class IntOrTypeEnum(enum.IntEnum): version: Optional[Set[str]] @@ -166,17 +221,17 @@ def _create_member(cls, value): return pseudo_member def __new__(cls, value: int, version: Optional[set] = None): - obj = int.__new__(cls, value) - obj._value_ = value - obj.version = version - return obj + obj = int.__new__(cls, value) + obj._value_ = value + obj.version = version + return obj @classmethod def from_int(cls: Type[_T], value: int) -> _T: return cls(value) @classmethod - def from_bytes(cls: Type[_T], bytes: Union[Iterable[SupportsIndex], SupportsBytes], byteorder: Literal['little', 'big'] = 'big', *, signed: bool = False) -> _T: # type: ignore[override] + def from_bytes(cls: Type[_T], bytes: Union[Iterable[SupportsIndex], SupportsBytes], byteorder: Literal["little", "big"] = "big", *, signed: bool = False) -> _T: # type: ignore[override] return cls.from_int(int.from_bytes(bytes, byteorder=byteorder, signed=signed)) @@ -203,12 +258,10 @@ class CommandCodes(IntOrTypeEnum): CURRENT_SOURCE = 0x1D # Request HEADPHONES_OVERRIDE = 0x1F - # Input Commands VIDEO_SELECTION = 0x0A SELECT_ANALOG_DIGITAL = 0x0B - VIDEO_INPUT_TYPE = 0x0C # IMAX_ENHANCED on 860 and HDA Series (not AVR5) - + VIDEO_INPUT_TYPE = 0x0C # IMAX_ENHANCED on 860 and HDA Series (not AVR5) # Output Commands VOLUME = 0x0D # Set/Request @@ -217,8 +270,7 @@ class CommandCodes(IntOrTypeEnum): DECODE_MODE_STATUS_2CH = 0x10 # Request DECODE_MODE_STATUS_MCH = 0x11 # Request RDS_INFORMATION = 0x12 # Request - VIDEO_OUTPUT_RESOLUTION = 0x13 # Set/Request - + VIDEO_OUTPUT_RESOLUTION = 0x13 # Set/Request # Menu Command MENU = 0x14 # Request @@ -227,18 +279,16 @@ class CommandCodes(IntOrTypeEnum): DAB_STATION = 0x18 # Set/Request DAB_PROGRAM_TYPE_CATEGORY = 0x19 # Set/Request DLS_PDT_INFO = 0x1A # Request - PRESET_DETAIL = 0x1B # Request - NETWORK_PLAYBACK_STATUS = 0x1C, APIVERSION_HDA_SERIES # Request - + PRESET_DETAIL = 0x1B # Request + NETWORK_PLAYBACK_STATUS = 0x1C, APIVERSION_HDA_SERIES # Request # Network Command - # Setup TREBLE_EQUALIZATION = 0x35 BASS_EQUALIZATION = 0x36 ROOM_EQUALIZATION = 0x37 - DOLBY_VOLUME = 0x38 # DOLBY_AUDIO on HDA series + DOLBY_VOLUME = 0x38 # DOLBY_AUDIO on HDA series DOLBY_LEVELER = 0x39 DOLBY_VOLUME_CALIBRATION_OFFSET = 0x3A BALANCE = 0x3B @@ -291,8 +341,14 @@ class CommandCodes(IntOrTypeEnum): DC_OFFSET = 0x51, APIVERSION_AMP_DIAGNOSTICS_SERIES SHORT_CIRCUIT_STATUS = 0x52, APIVERSION_CLASS_G_SERIES TIMEOUT_COUNTER = 0x55, APIVERSION_AMP_DIAGNOSTICS_SERIES - LIFTER_TEMPERATURE = 0x56, APIVERSION_CLASS_G_SERIES # Bug in PA720 1.8 firmware - does not return sensor id - OUTPUT_TEMPERATURE = 0x57, APIVERSION_AMP_DIAGNOSTICS_SERIES # Bug in PA720 1.8 firmware - does not return sensor id + LIFTER_TEMPERATURE = ( + 0x56, + APIVERSION_CLASS_G_SERIES, + ) # Bug in PA720 1.8 firmware - does not return sensor id + OUTPUT_TEMPERATURE = ( + 0x57, + APIVERSION_AMP_DIAGNOSTICS_SERIES, + ) # Bug in PA720 1.8 firmware - does not return sensor id AUTO_SHUTDOWN_CONTROL = 0x58, APIVERSION_AMP_DIAGNOSTICS_SERIES # Status/Diagnostics @@ -304,7 +360,7 @@ class CommandCodes(IntOrTypeEnum): PROCESSOR_MODE_VOLUME = 0x5C, APIVERSION_SA_SERIES SYSTEM_STATUS = 0x5D, APIVERSION_AMP_DIAGNOSTICS_SERIES SYSTEM_MODEL = 0x5E, APIVERSION_AMP_DIAGNOSTICS_SERIES - DAC_FILTER = 0x61, APIVERSION_SA_SERIES # Clashes with AMPLIFIER_MODE on PA240 + DAC_FILTER = 0x61, APIVERSION_SA_SERIES # Clashes with AMPLIFIER_MODE on PA240 MAXIMUM_TURN_ON_VOLUME = 0x65, APIVERSION_APP_SAFETY_SERIES MAXIMUM_VOLUME = 0x66, APIVERSION_APP_SAFETY_SERIES MAXIMUM_STREAMING_VOLUME = 0x67, APIVERSION_APP_SAFETY_SERIES @@ -337,24 +393,37 @@ class SourceCodes(enum.Enum): NET_USB = enum.auto() @classmethod - def from_bytes(cls, data: bytes, model: ApiModel, zn: int) -> 'SourceCodes': + def from_bytes(cls, data: bytes, model: ApiModel, zn: int) -> "SourceCodes": try: table = SOURCE_CODES[(model, zn)] except KeyError: - raise ValueError("Unknown source map for model {} and zone {}".format(model, zn)) + raise ValueError( + "Unknown source map for model {} and zone {}".format(model, zn) + ) for key, value in table.items(): if value == data: return key - raise ValueError("Unknown source code for model {} and zone {} and value {!r}".format(model, zn, data)) + raise ValueError( + "Unknown source code for model {} and zone {} and value {!r}".format( + model, zn, data + ) + ) def to_bytes(self, model: ApiModel, zn: int): try: table = SOURCE_CODES[(model, zn)] except KeyError: - raise ValueError("Unknown source map for model {} and zone {}".format(model, zn)) + raise ValueError( + "Unknown source map for model {} and zone {}".format(model, zn) + ) if data := table.get(self): return data - raise ValueError("Unknown byte code for model {} and zone {} and value {}".format(model, zn, self)) + raise ValueError( + "Unknown byte code for model {} and zone {} and value {}".format( + model, zn, self + ) + ) + class MenuCodes(IntOrTypeEnum): NONE = 0x00 @@ -388,6 +457,7 @@ class DecodeMode2CH(IntOrTypeEnum): AURO_MATIC_3D = 0x0F, APIVERSION_AURO_SERIES AURO_2D = 0x10, APIVERSION_AURO_SERIES + class DecodeModeMCH(IntOrTypeEnum): STEREO_DOWNMIX = 0x01 MULTI_CHANNEL = 0x02 @@ -509,10 +579,8 @@ class DecodeModeMCH(IntOrTypeEnum): (ApiModel.API860_SERIES, 1): { DecodeModeMCH.STEREO_DOWNMIX: bytes([16, 107]), DecodeModeMCH.MULTI_CHANNEL: bytes([16, 106]), - # We map to DTS_NEURAL_X DecodeModeMCH.DOLBY_D_EX_OR_DTS_ES: bytes([16, 113]), - DecodeModeMCH.DOLBY_SURROUND: bytes([16, 110]), DecodeModeMCH.DTS_VIRTUAL_X: bytes([16, 115]), }, @@ -520,17 +588,13 @@ class DecodeModeMCH(IntOrTypeEnum): (ApiModel.APIHDA_SERIES, 1): { DecodeModeMCH.STEREO_DOWNMIX: bytes([16, 107]), DecodeModeMCH.MULTI_CHANNEL: bytes([16, 106]), - # We map to DTS_NEURAL_X DecodeModeMCH.DOLBY_D_EX_OR_DTS_ES: bytes([16, 113]), - DecodeModeMCH.DOLBY_SURROUND: bytes([16, 110]), - DecodeModeMCH.DOLBY_VIRTUAL_HEIGHT: bytes([16, 115]), DecodeModeMCH.AURO_NATIVE: bytes([16, 103]), DecodeModeMCH.AURO_MATIC_3D: bytes([16, 71]), DecodeModeMCH.AURO_2D: bytes([16, 104]), - }, (ApiModel.APIHDA_SERIES, 2): {}, (ApiModel.APISA_SERIES, 1): {}, @@ -539,7 +603,7 @@ class DecodeModeMCH(IntOrTypeEnum): (ApiModel.APIPA_SERIES, 2): {}, } -RC5CODE_DECODE_MODE_2CH: Dict[Tuple[ApiModel, int], Dict[DecodeMode2CH, bytes]] = { +RC5CODE_DECODE_MODE_2CH: Dict[Tuple[ApiModel, int], Dict[DecodeMode2CH, bytes]] = { (ApiModel.API450_SERIES, 1): { DecodeMode2CH.STEREO: bytes([16, 107]), DecodeMode2CH.DOLBY_PLII_IIx_MOVIE: bytes([16, 103]), @@ -611,7 +675,7 @@ class DecodeModeMCH(IntOrTypeEnum): SourceCodes.PVR: bytes([23, 15]), SourceCodes.USB: bytes([23, 18]), SourceCodes.NET: bytes([23, 19]), - SourceCodes.FOLLOW_ZONE_1: bytes([16, 20]) + SourceCodes.FOLLOW_ZONE_1: bytes([16, 20]), }, (ApiModel.API860_SERIES, 1): { SourceCodes.STB: bytes([16, 100]), @@ -643,7 +707,7 @@ class DecodeModeMCH(IntOrTypeEnum): SourceCodes.NET: bytes([23, 19]), SourceCodes.SAT: bytes([23, 20]), SourceCodes.VCR: bytes([23, 21]), - SourceCodes.FOLLOW_ZONE_1: bytes([16, 20]) + SourceCodes.FOLLOW_ZONE_1: bytes([16, 20]), }, (ApiModel.APIHDA_SERIES, 1): { SourceCodes.STB: bytes([16, 100]), @@ -690,7 +754,7 @@ class DecodeModeMCH(IntOrTypeEnum): SourceCodes.NET: bytes([16, 92]), SourceCodes.USB: bytes([16, 93]), SourceCodes.GAME: bytes([16, 97]), - SourceCodes.ARC_ERC: bytes([16, 125]) + SourceCodes.ARC_ERC: bytes([16, 125]), }, (ApiModel.APISA_SERIES, 2): { SourceCodes.PHONO: bytes([16, 117]), @@ -704,7 +768,7 @@ class DecodeModeMCH(IntOrTypeEnum): SourceCodes.NET: bytes([16, 92]), SourceCodes.USB: bytes([16, 93]), SourceCodes.GAME: bytes([16, 97]), - SourceCodes.ARC_ERC: bytes([16, 125]) + SourceCodes.ARC_ERC: bytes([16, 125]), }, (ApiModel.APIST_SERIES, 1): { SourceCodes.DIG1: bytes([21, 94]), @@ -712,7 +776,7 @@ class DecodeModeMCH(IntOrTypeEnum): SourceCodes.DIG3: bytes([21, 27]), SourceCodes.DIG4: bytes([21, 97]), SourceCodes.USB: bytes([21, 93]), - SourceCodes.NET: bytes([21, 92]) + SourceCodes.NET: bytes([21, 92]), }, (ApiModel.APIST_SERIES, 2): {}, (ApiModel.APIPA_SERIES, 1): {}, @@ -720,38 +784,14 @@ class DecodeModeMCH(IntOrTypeEnum): } RC5CODE_POWER = { - (ApiModel.API450_SERIES, 1): { - True: bytes([16, 123]), - False: bytes([16, 124]) - }, - (ApiModel.API450_SERIES, 2): { - True: bytes([23, 123]), - False: bytes([23, 124]) - }, - (ApiModel.API860_SERIES, 1): { - True: bytes([16, 123]), - False: bytes([16, 124]) - }, - (ApiModel.API860_SERIES, 2): { - True: bytes([23, 123]), - False: bytes([23, 124]) - }, - (ApiModel.APIHDA_SERIES, 1): { - True: bytes([16, 123]), - False: bytes([16, 124]) - }, - (ApiModel.APIHDA_SERIES, 2): { - True: bytes([23, 123]), - False: bytes([23, 124]) - }, - (ApiModel.APISA_SERIES, 1): { - True: bytes([16, 123]), - False: bytes([16, 124]) - }, - (ApiModel.APISA_SERIES, 2): { - True: bytes([16, 123]), - False: bytes([16, 124]) - } + (ApiModel.API450_SERIES, 1): {True: bytes([16, 123]), False: bytes([16, 124])}, + (ApiModel.API450_SERIES, 2): {True: bytes([23, 123]), False: bytes([23, 124])}, + (ApiModel.API860_SERIES, 1): {True: bytes([16, 123]), False: bytes([16, 124])}, + (ApiModel.API860_SERIES, 2): {True: bytes([23, 123]), False: bytes([23, 124])}, + (ApiModel.APIHDA_SERIES, 1): {True: bytes([16, 123]), False: bytes([16, 124])}, + (ApiModel.APIHDA_SERIES, 2): {True: bytes([23, 123]), False: bytes([23, 124])}, + (ApiModel.APISA_SERIES, 1): {True: bytes([16, 123]), False: bytes([16, 124])}, + (ApiModel.APISA_SERIES, 2): {True: bytes([16, 123]), False: bytes([16, 124])}, } RC5CODE_MUTE = { @@ -786,7 +826,7 @@ class DecodeModeMCH(IntOrTypeEnum): (ApiModel.APISA_SERIES, 2): { True: bytes([16, 26]), False: bytes([16, 120]), - } + }, } RC5CODE_VOLUME = { @@ -825,9 +865,10 @@ class DecodeModeMCH(IntOrTypeEnum): (ApiModel.APIST_SERIES, 1): { True: bytes([21, 86]), False: bytes([21, 85]), - } + }, } + class IncomingAudioFormat(IntOrTypeEnum): PCM = 0x00 ANALOGUE_DIRECT = 0x01 @@ -857,6 +898,7 @@ class IncomingAudioFormat(IntOrTypeEnum): class IncomingAudioConfig(IntOrTypeEnum): """List of possible audio configurations.""" + MONO = 0x01 CENTER_ONLY = 0x01 STEREO_ONLY = 0x02 @@ -865,22 +907,24 @@ class IncomingAudioConfig(IntOrTypeEnum): class PresetType(IntOrTypeEnum): """List of possible audio configurations.""" + AM_FREQUENCY = 0x00 FM_FREQUENCY = 0x01 FM_RDS_NAME = 0x02 DAB = 0x03 + @attr.s -class PresetDetail(): +class PresetDetail: index = attr.ib(type=int) type = attr.ib(type=Union[PresetType, int]) name = attr.ib(type=str) @staticmethod - def from_bytes(data: bytes) -> 'PresetDetail': + def from_bytes(data: bytes) -> "PresetDetail": type = PresetType.from_int(data[1]) if type == PresetType.FM_RDS_NAME or type == PresetType.DAB: - name = data[2:].decode('utf8').rstrip() + name = data[2:].decode("utf8").rstrip() elif type == PresetType.FM_FREQUENCY: name = f"{data[2]}.{data[3]:2} MHz" elif type == PresetType.AM_FREQUENCY: @@ -889,80 +933,80 @@ def from_bytes(data: bytes) -> 'PresetDetail': name = str(data[2:]) return PresetDetail(data[0], type, name) + @attr.s -class ResponsePacket(): +class ResponsePacket: """Represent a response from device.""" + zn = attr.ib(type=int) cc = attr.ib(type=int) ac = attr.ib(type=int) data = attr.ib(type=bytes) - def respons_to(self, request: Union['AmxDuetRequest', 'CommandPacket']): + def respons_to(self, request: Union["AmxDuetRequest", "CommandPacket"]): if not isinstance(request, CommandPacket): return False - return (self.zn == request.zn and - self.cc == request.cc) + return self.zn == request.zn and self.cc == request.cc @staticmethod - def from_bytes(data: bytes) -> 'ResponsePacket': + def from_bytes(data: bytes) -> "ResponsePacket": if len(data) < 6: raise InvalidPacket("Packet to short {!r}".format(data)) - if data[4] != len(data)-6: + if data[4] != len(data) - 6: raise InvalidPacket("Invalid length in data {!r}".format(data)) return ResponsePacket( data[1], CommandCodes.from_int(data[2]), AnswerCodes.from_int(data[3]), - data[5:5+data[4]]) + data[5 : 5 + data[4]], + ) def to_bytes(self): - return bytes([ - *PROTOCOL_STR, - self.zn, - self.cc, - self.ac, - len(self.data), - *self.data, - *PROTOCOL_ETR - ]) + return bytes( + [ + *PROTOCOL_STR, + self.zn, + self.cc, + self.ac, + len(self.data), + *self.data, + *PROTOCOL_ETR, + ] + ) + @attr.s -class CommandPacket(): +class CommandPacket: """Represent a command sent to device.""" + zn = attr.ib(type=int) cc = attr.ib(type=int) data = attr.ib(type=bytes) def to_bytes(self): - return bytes([ - *PROTOCOL_STR, - self.zn, - self.cc, - len(self.data), - *self.data, - *PROTOCOL_ETR - ]) + return bytes( + [*PROTOCOL_STR, self.zn, self.cc, len(self.data), *self.data, *PROTOCOL_ETR] + ) @staticmethod - def from_bytes(data: bytes) -> 'CommandPacket': + def from_bytes(data: bytes) -> "CommandPacket": if len(data) < 5: raise InvalidPacket("Packet to short {!r}".format(data)) - if data[3] != len(data)-5: + if data[3] != len(data) - 5: raise InvalidPacket("Invalid length in data {!r}".format(data)) return CommandPacket( - data[1], - CommandCodes.from_int(data[2]), - data[4:4+data[3]]) + data[1], CommandCodes.from_int(data[2]), data[4 : 4 + data[3]] + ) + @attr.s -class AmxDuetRequest(): - +class AmxDuetRequest: @staticmethod - def from_bytes(data: bytes) -> 'AmxDuetRequest': + def from_bytes(data: bytes) -> "AmxDuetRequest": if not data == b"AMX\r": raise InvalidPacket("Packet is not a amx request {!r}".format(data)) return AmxDuetRequest() @@ -970,9 +1014,9 @@ def from_bytes(data: bytes) -> 'AmxDuetRequest': def to_bytes(self): return b"AMX\r" -@attr.s -class AmxDuetResponse(): +@attr.s +class AmxDuetResponse: values = attr.ib(type=dict) @property @@ -997,7 +1041,7 @@ def respons_to(self, packet: Union[AmxDuetRequest, CommandPacket]): return True @staticmethod - def from_bytes(data: bytes) -> 'AmxDuetResponse': + def from_bytes(data: bytes) -> "AmxDuetResponse": if not data.startswith(b"AMXB"): raise InvalidPacket("Packet is not a amx response {!r}".format(data)) @@ -1005,10 +1049,11 @@ def from_bytes(data: bytes) -> 'AmxDuetResponse': return AmxDuetResponse(dict(tags)) def to_bytes(self): - res = "AMXB" + "".join([ - f"<{key}={value}>" - for key, value in self.values.items() - ]) + "\r" + res = ( + "AMXB" + + "".join([f"<{key}={value}>" for key, value in self.values.items()]) + + "\r" + ) return res.encode("ASCII") @@ -1020,9 +1065,9 @@ async def _read_delimited(reader: asyncio.StreamReader, header_len) -> Optional[ return None if start == PROTOCOL_STR: - header = await reader.read(header_len-1) + header = await reader.read(header_len - 1) data_len = await reader.read(1) - data = await reader.read(int.from_bytes(data_len, 'big')) + data = await reader.read(int.from_bytes(data_len, "big")) etr = await reader.read(1) if etr != PROTOCOL_ETR: @@ -1034,16 +1079,16 @@ async def _read_delimited(reader: asyncio.StreamReader, header_len) -> Optional[ header = await reader.read(4) if header != b"^AMX": raise InvalidPacket("Unexpected AMX header: {!r}".format(header)) - + data = await reader.readuntil(PROTOCOL_ETR) - packet = bytes([*b"AMX", *data]) + packet = bytes([*b"AMX", *data]) elif start == b"A": header = await reader.read(2) if header != b"MX": raise InvalidPacket("Unexpected AMX header") data = await reader.readuntil(PROTOCOL_ETR) - packet = bytes([*start, *header, *data]) + packet = bytes([*start, *header, *data]) elif start == b"\x00": raise NullPacket() else: @@ -1061,7 +1106,9 @@ async def _read_delimited(reader: asyncio.StreamReader, header_len) -> Optional[ raise ConnectionFailed() from exception -async def _read_response(reader: asyncio.StreamReader) -> Optional[Union[ResponsePacket, AmxDuetResponse]]: +async def _read_response( + reader: asyncio.StreamReader, +) -> Optional[Union[ResponsePacket, AmxDuetResponse]]: data = await _read_delimited(reader, 4) if not data: return None @@ -1072,7 +1119,9 @@ async def _read_response(reader: asyncio.StreamReader) -> Optional[Union[Respons return ResponsePacket.from_bytes(data) -async def read_response(reader: asyncio.StreamReader) -> Optional[Union[ResponsePacket, AmxDuetResponse]]: +async def read_response( + reader: asyncio.StreamReader, +) -> Optional[Union[ResponsePacket, AmxDuetResponse]]: while True: try: data = await _read_response(reader) @@ -1085,7 +1134,9 @@ async def read_response(reader: asyncio.StreamReader) -> Optional[Union[Response return data -async def _read_command(reader: asyncio.StreamReader) -> Optional[Union[CommandPacket, AmxDuetRequest]]: +async def _read_command( + reader: asyncio.StreamReader, +) -> Optional[Union[CommandPacket, AmxDuetRequest]]: data = await _read_delimited(reader, 3) if not data: return None @@ -1095,7 +1146,9 @@ async def _read_command(reader: asyncio.StreamReader) -> Optional[Union[CommandP return CommandPacket.from_bytes(data) -async def read_command(reader: asyncio.StreamReader) -> Optional[Union[CommandPacket, AmxDuetRequest]]: +async def read_command( + reader: asyncio.StreamReader, +) -> Optional[Union[CommandPacket, AmxDuetRequest]]: while True: try: data = await _read_command(reader) @@ -1105,11 +1158,10 @@ async def read_command(reader: asyncio.StreamReader) -> Optional[Union[CommandPa return data -async def write_packet(writer: asyncio.StreamWriter, - packet: Union[CommandPacket, - ResponsePacket, - AmxDuetRequest, - AmxDuetResponse]) -> None: +async def write_packet( + writer: asyncio.StreamWriter, + packet: Union[CommandPacket, ResponsePacket, AmxDuetRequest, AmxDuetResponse], +) -> None: try: data = packet.to_bytes() writer.write(data) diff --git a/src/arcam/fmj/client.py b/src/arcam/fmj/client.py index 458b02d..bd02c49 100644 --- a/src/arcam/fmj/client.py +++ b/src/arcam/fmj/client.py @@ -19,7 +19,7 @@ ResponseException, ResponsePacket, read_response, - write_packet + write_packet, ) from .utils import Throttle, async_retry @@ -28,7 +28,8 @@ _REQUEST_THROTTLE = 0.2 _HEARTBEAT_INTERVAL = timedelta(seconds=5) -_HEARTBEAT_TIMEOUT = _HEARTBEAT_INTERVAL + _HEARTBEAT_INTERVAL +_HEARTBEAT_TIMEOUT = _HEARTBEAT_INTERVAL + _HEARTBEAT_INTERVAL + class Client: def __init__(self, host: str, port: int) -> None: @@ -63,8 +64,7 @@ async def _process_heartbeat(self, writer: StreamWriter): else: _LOGGER.debug("Sending ping") await write_packet( - writer, - CommandPacket(1, CommandCodes.POWER, bytes([0xF0])) + writer, CommandPacket(1, CommandCodes.POWER, bytes([0xF0])) ) self._timestamp = datetime.now() @@ -73,8 +73,7 @@ async def _process_data(self, reader: StreamReader): while True: try: packet = await asyncio.wait_for( - read_response(reader), - _HEARTBEAT_TIMEOUT.total_seconds() + read_response(reader), _HEARTBEAT_TIMEOUT.total_seconds() ) except asyncio.TimeoutError as exception: _LOGGER.warning("Missed all pings") @@ -119,7 +118,8 @@ async def start(self) -> None: _LOGGER.debug("Connecting to %s:%d", self._host, self._port) try: self._reader, self._writer = await asyncio.open_connection( - self._host, self._port) + self._host, self._port + ) except ConnectionError as exception: raise ConnectionFailed() from exception except OSError as exception: @@ -148,11 +148,15 @@ async def request_raw(self, request: AmxDuetRequest) -> AmxDuetResponse: ... @async_retry(2, asyncio.TimeoutError) - async def request_raw(self, request: Union[CommandPacket, AmxDuetRequest]) -> Union[ResponsePacket, AmxDuetResponse]: + async def request_raw( + self, request: Union[CommandPacket, AmxDuetRequest] + ) -> Union[ResponsePacket, AmxDuetResponse]: if not self._writer: raise NotConnectedException() - writer = self._writer # keep copy around if stopped by another task - future: 'asyncio.Future[Union[ResponsePacket, AmxDuetResponse]]' = asyncio.Future() + writer = self._writer # keep copy around if stopped by another task + future: "asyncio.Future[Union[ResponsePacket, AmxDuetResponse]]" = ( + asyncio.Future() + ) def listen(response: Union[ResponsePacket, AmxDuetResponse]): if response.respons_to(request): @@ -168,9 +172,7 @@ async def req() -> Union[ResponsePacket, AmxDuetResponse]: self._timestamp = datetime.now() return await future - return await asyncio.wait_for( - req(), - _REQUEST_TIMEOUT.total_seconds()) + return await asyncio.wait_for(req(), _REQUEST_TIMEOUT.total_seconds()) async def send(self, zn: int, cc: int, data: bytes) -> None: if not self._writer: @@ -196,9 +198,7 @@ def __init__(self, client: Client): async def __aenter__(self) -> Client: await self._client.start() - self._task = asyncio.create_task( - self._client.process() - ) + self._task = asyncio.create_task(self._client.process()) return self._client async def __aexit__(self, exc_type, exc_val, exc_tb): diff --git a/src/arcam/fmj/console.py b/src/arcam/fmj/console.py index f585d57..28513ec 100644 --- a/src/arcam/fmj/console.py +++ b/src/arcam/fmj/console.py @@ -3,49 +3,72 @@ import logging import sys -from . import APIVERSION_450_SERIES, APIVERSION_860_SERIES, APIVERSION_HDA_SERIES, ApiModel, CommandCodes, CommandInvalidAtThisTime, SourceCodes, IncomingAudioFormat, IncomingAudioConfig, DecodeMode2CH, DecodeModeMCH, CommandNotRecognised, _LOGGER, ResponsePacket, AnswerCodes, RC5CODE_SOURCE, RC5CODE_DECODE_MODE_2CH, RC5CODE_DECODE_MODE_MCH +from . import ( + APIVERSION_450_SERIES, + APIVERSION_860_SERIES, + APIVERSION_HDA_SERIES, + ApiModel, + CommandCodes, + CommandInvalidAtThisTime, + SourceCodes, + IncomingAudioFormat, + IncomingAudioConfig, + DecodeMode2CH, + DecodeModeMCH, + CommandNotRecognised, + _LOGGER, + ResponsePacket, + AnswerCodes, + RC5CODE_SOURCE, + RC5CODE_DECODE_MODE_2CH, + RC5CODE_DECODE_MODE_MCH, +) from .client import Client, ClientContext from .server import Server, ServerContext from .state import State # pylint: disable=invalid-name + def auto_int(x): return int(x, 0) + def auto_bytes(x): print(x) return bytes.decode(x) + def auto_source(x): return SourceCodes[x] -parser = argparse.ArgumentParser(description='Communicate with arcam receivers.') -parser.add_argument('--verbose', action='store_true') + +parser = argparse.ArgumentParser(description="Communicate with arcam receivers.") +parser.add_argument("--verbose", action="store_true") subparsers = parser.add_subparsers(dest="subcommand") -parser_state = subparsers.add_parser('state') -parser_state.add_argument('--host', required=True) -parser_state.add_argument('--port', default=50000) -parser_state.add_argument('--zone', default=1, type=int) -parser_state.add_argument('--volume', type=int) -parser_state.add_argument('--source', type=auto_source) -parser_state.add_argument('--monitor', action='store_true') -parser_state.add_argument('--power-on', action=argparse.BooleanOptionalAction) -parser_state.add_argument('--power-off', action=argparse.BooleanOptionalAction) - -parser_client = subparsers.add_parser('client') -parser_client.add_argument('--host', required=True) -parser_client.add_argument('--port', default=50000) -parser_client.add_argument('--zone', default=1, type=int) -parser_client.add_argument('--command', type=auto_int) -parser_client.add_argument('--data', nargs='+', default=[0xF0], type=auto_int) - -parser_server = subparsers.add_parser('server') -parser_server.add_argument('--host', default='localhost') -parser_server.add_argument('--port', default=50000) -parser_server.add_argument('--model', default="AVR450") +parser_state = subparsers.add_parser("state") +parser_state.add_argument("--host", required=True) +parser_state.add_argument("--port", default=50000) +parser_state.add_argument("--zone", default=1, type=int) +parser_state.add_argument("--volume", type=int) +parser_state.add_argument("--source", type=auto_source) +parser_state.add_argument("--monitor", action="store_true") +parser_state.add_argument("--power-on", action=argparse.BooleanOptionalAction) +parser_state.add_argument("--power-off", action=argparse.BooleanOptionalAction) + +parser_client = subparsers.add_parser("client") +parser_client.add_argument("--host", required=True) +parser_client.add_argument("--port", default=50000) +parser_client.add_argument("--zone", default=1, type=int) +parser_client.add_argument("--command", type=auto_int) +parser_client.add_argument("--data", nargs="+", default=[0xF0], type=auto_int) + +parser_server = subparsers.add_parser("server") +parser_server.add_argument("--host", default="localhost") +parser_server.add_argument("--port", default=50000) +parser_server.add_argument("--model", default="AVR450") async def run_client(args): @@ -54,6 +77,7 @@ async def run_client(args): result = await client.request(args.zone, args.command, bytes(args.data)) print(result) + async def run_state(args): client = Client(args.host, args.port) async with ClientContext(client): @@ -65,7 +89,7 @@ async def run_state(args): if args.source is not None: await state.set_source(args.source) - + if args.power_on is not None: await state.set_power(True) @@ -87,7 +111,6 @@ async def run_state(args): async def run_server(args): class DummyServer(Server): - def __init__(self, host, port, model): super().__init__(host, port, model) @@ -104,40 +127,72 @@ def __init__(self, host, port, model): self._volume = bytes([10]) self._source = bytes([SourceCodes.PVR]) - self._audio_format = bytes([IncomingAudioFormat.PCM, IncomingAudioConfig.STEREO_ONLY]) - self._decode_mode_2ch = bytes([next(iter(RC5CODE_DECODE_MODE_2CH[rc5_key]))]) - self._decode_mode_mch = bytes([next(iter(RC5CODE_DECODE_MODE_MCH[rc5_key]))]) - self._tuner_preset = b'\0xff' + self._audio_format = bytes( + [IncomingAudioFormat.PCM, IncomingAudioConfig.STEREO_ONLY] + ) + self._decode_mode_2ch = bytes( + [next(iter(RC5CODE_DECODE_MODE_2CH[rc5_key]))] + ) + self._decode_mode_mch = bytes( + [next(iter(RC5CODE_DECODE_MODE_MCH[rc5_key]))] + ) + self._tuner_preset = b"\0xff" self._presets = { - b'\x01': b'\x03SR P1 ', - b'\x02': b'\x03SR Klass', - b'\x03' : b'\x03P3 Star ', - b'\x04': b'\x02SR P4 ', - b'\x05': b'\x02SR P4 ', - b'\x06': b'\x01jP', + b"\x01": b"\x03SR P1 ", + b"\x02": b"\x03SR Klass", + b"\x03": b"\x03P3 Star ", + b"\x04": b"\x02SR P4 ", + b"\x05": b"\x02SR P4 ", + b"\x06": b"\x01jP", } def invert_rc5(data): - return { - value: key - for key, value in data[rc5_key].items() - } + return {value: key for key, value in data[rc5_key].items()} self._source_rc5 = invert_rc5(RC5CODE_SOURCE) self._decode_mode_2ch_rc5 = invert_rc5(RC5CODE_DECODE_MODE_2CH) self._decode_mode_mch_rc5 = invert_rc5(RC5CODE_DECODE_MODE_MCH) - self.register_handler(0x01, CommandCodes.POWER, bytes([0xF0]), self.get_power) - self.register_handler(0x01, CommandCodes.VOLUME, bytes([0xF0]), self.get_volume) + self.register_handler( + 0x01, CommandCodes.POWER, bytes([0xF0]), self.get_power + ) + self.register_handler( + 0x01, CommandCodes.VOLUME, bytes([0xF0]), self.get_volume + ) self.register_handler(0x01, CommandCodes.VOLUME, None, self.set_volume) - self.register_handler(0x01, CommandCodes.CURRENT_SOURCE, bytes([0xF0]), self.get_source) - self.register_handler(0x01, CommandCodes.INCOMING_AUDIO_FORMAT, bytes([0xF0]), self.get_incoming_audio_format) - self.register_handler(0x01, CommandCodes.DECODE_MODE_STATUS_2CH, bytes([0xF0]), self.get_decode_mode_2ch) - self.register_handler(0x01, CommandCodes.DECODE_MODE_STATUS_MCH, bytes([0xF0]), self.get_decode_mode_mch) - self.register_handler(0x01, CommandCodes.SIMULATE_RC5_IR_COMMAND, None, self.ir_command) - self.register_handler(0x01, CommandCodes.PRESET_DETAIL, None, self.get_preset_detail) - self.register_handler(0x01, CommandCodes.TUNER_PRESET, bytes([0xF0]), self.get_tuner_preset) - self.register_handler(0x01, CommandCodes.TUNER_PRESET, None, self.set_tuner_preset) + self.register_handler( + 0x01, CommandCodes.CURRENT_SOURCE, bytes([0xF0]), self.get_source + ) + self.register_handler( + 0x01, + CommandCodes.INCOMING_AUDIO_FORMAT, + bytes([0xF0]), + self.get_incoming_audio_format, + ) + self.register_handler( + 0x01, + CommandCodes.DECODE_MODE_STATUS_2CH, + bytes([0xF0]), + self.get_decode_mode_2ch, + ) + self.register_handler( + 0x01, + CommandCodes.DECODE_MODE_STATUS_MCH, + bytes([0xF0]), + self.get_decode_mode_mch, + ) + self.register_handler( + 0x01, CommandCodes.SIMULATE_RC5_IR_COMMAND, None, self.ir_command + ) + self.register_handler( + 0x01, CommandCodes.PRESET_DETAIL, None, self.get_preset_detail + ) + self.register_handler( + 0x01, CommandCodes.TUNER_PRESET, bytes([0xF0]), self.get_tuner_preset + ) + self.register_handler( + 0x01, CommandCodes.TUNER_PRESET, None, self.set_tuner_preset + ) def get_power(self, **kwargs): return bytes([1]) @@ -158,7 +213,7 @@ def set_source(self, data, **kwargs): def ir_command(self, data, **kwargs): status = None - + source = self._source_rc5.get(data) if source: self.set_source(bytes([source])) @@ -167,14 +222,14 @@ def ir_command(self, data, **kwargs): zn=0x01, cc=CommandCodes.SIMULATE_RC5_IR_COMMAND, ac=AnswerCodes.STATUS_UPDATE, - data=data + data=data, ), ResponsePacket( zn=0x01, cc=CommandCodes.CURRENT_SOURCE, ac=AnswerCodes.STATUS_UPDATE, - data=bytes([source]) - ) + data=bytes([source]), + ), ] decode_mode_2ch = self._decode_mode_2ch_rc5.get(data) if decode_mode_2ch: @@ -184,14 +239,14 @@ def ir_command(self, data, **kwargs): zn=0x01, cc=CommandCodes.SIMULATE_RC5_IR_COMMAND, ac=AnswerCodes.STATUS_UPDATE, - data=data + data=data, ), ResponsePacket( zn=0x01, cc=CommandCodes.DECODE_MODE_STATUS_2CH, ac=AnswerCodes.STATUS_UPDATE, - data=self._decode_mode_2ch - ) + data=self._decode_mode_2ch, + ), ] decode_mode_mch = self._decode_mode_mch_rc5.get(data) @@ -202,14 +257,14 @@ def ir_command(self, data, **kwargs): zn=0x01, cc=CommandCodes.SIMULATE_RC5_IR_COMMAND, ac=AnswerCodes.STATUS_UPDATE, - data=data + data=data, ), ResponsePacket( zn=0x01, cc=CommandCodes.DECODE_MODE_STATUS_MCH, ac=AnswerCodes.STATUS_UPDATE, - data=self._decode_mode_mch - ) + data=self._decode_mode_mch, + ), ] raise CommandNotRecognised() @@ -242,8 +297,8 @@ def get_preset_detail(self, data, **kwargs): while True: await asyncio.sleep(delay=1) -def main(): +def main(): args = parser.parse_args() if args.verbose: @@ -252,16 +307,19 @@ def main(): channel = logging.StreamHandler(sys.stdout) channel.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) channel.setFormatter(formatter) root.addHandler(channel) - if args.subcommand == 'client': + if args.subcommand == "client": asyncio.run(run_client(args)) - elif args.subcommand == 'state': + elif args.subcommand == "state": asyncio.run(run_state(args)) - elif args.subcommand == 'server': + elif args.subcommand == "server": asyncio.run(run_server(args)) + if __name__ == "__main__": main() diff --git a/src/arcam/fmj/server.py b/src/arcam/fmj/server.py index 0bf068e..b8e5d10 100644 --- a/src/arcam/fmj/server.py +++ b/src/arcam/fmj/server.py @@ -12,24 +12,29 @@ ResponseException, ResponsePacket, read_command, - write_packet + write_packet, ) _LOGGER = logging.getLogger(__name__) -class Server(): + +class Server: def __init__(self, host: str, port: int, model: str) -> None: self._server: Optional[asyncio.AbstractServer] = None self._host = host self._port = port - self._handlers: Dict[Union[Tuple[int, int], Tuple[int, int, bytes]], Callable] = dict() + self._handlers: Dict[ + Union[Tuple[int, int], Tuple[int, int, bytes]], Callable + ] = dict() self._tasks: List[asyncio.Task] = list() - self._amxduet = AmxDuetResponse({ - "Device-SDKClass": "Receiver", - "Device-Make": "ARCAM", - "Device-Model": model, - "Device-Revision": "x.y.z" - }) + self._amxduet = AmxDuetResponse( + { + "Device-SDKClass": "Receiver", + "Device-Make": "ARCAM", + "Device-Model": model, + "Device-Revision": "x.y.z", + } + ) async def process(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): _LOGGER.debug("Client connected") @@ -42,7 +47,9 @@ async def process(self, reader: asyncio.StreamReader, writer: asyncio.StreamWrit _LOGGER.debug("Client disconnected") self._tasks.remove(task) - async def process_runner(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + async def process_runner( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ): while True: request = await read_command(reader) if request is None: @@ -64,32 +71,20 @@ async def process_request(self, request: Union[CommandPacket, AmxDuetRequest]): try: if handler: - data = handler( - zn=request.zn, - cc=request.cc, - data=request.data) + data = handler(zn=request.zn, cc=request.cc, data=request.data) if isinstance(data, bytes): response = [ ResponsePacket( - request.zn, - request.cc, - AnswerCodes.STATUS_UPDATE, - data) + request.zn, request.cc, AnswerCodes.STATUS_UPDATE, data + ) ] else: response = data else: raise CommandNotRecognised() except ResponseException as e: - response = [ - ResponsePacket( - request.zn, - request.cc, - e.ac, - e.data or bytes() - ) - ] + response = [ResponsePacket(request.zn, request.cc, e.ac, e.data or bytes())] return response @@ -101,10 +96,7 @@ def register_handler(self, zn, cc, data, fun): async def start(self): _LOGGER.debug("Starting server") - self._server = await asyncio.start_server( - self.process, - self._host, - self._port) + self._server = await asyncio.start_server(self.process, self._host, self._port) return self async def stop(self): @@ -121,7 +113,7 @@ async def stop(self): await asyncio.wait(self._tasks) -class ServerContext(): +class ServerContext: def __init__(self, server: Server): self._server = server diff --git a/src/arcam/fmj/state.py b/src/arcam/fmj/state.py index f0a3b82..dd24780 100644 --- a/src/arcam/fmj/state.py +++ b/src/arcam/fmj/state.py @@ -43,11 +43,14 @@ _LOGGER = logging.getLogger(__name__) _T = TypeVar("_T") -class State(): + +class State: _state: Dict[int, Optional[bytes]] _presets: Dict[int, PresetDetail] - def __init__(self, client: Client, zn: int, api_model: ApiModel = ApiModel.API450_SERIES) -> None: + def __init__( + self, client: Client, zn: int, api_model: ApiModel = ApiModel.API450_SERIES + ) -> None: self._zn = zn self._client = client self._state = dict() @@ -72,23 +75,25 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def to_dict(self) -> Dict[str, Any]: return { - 'POWER': self.get_power(), - 'VOLUME': self.get_volume(), - 'SOURCE': self.get_source(), - 'MUTE': self.get_mute(), - 'MENU': self.get_menu(), - 'INCOMING_AUDIO_FORMAT': self.get_incoming_audio_format(), - 'DECODE_MODE_2CH': self.get_decode_mode_2ch(), - 'DECODE_MODE_MCH': self.get_decode_mode_mch(), - 'DAB_STATION': self.get_dab_station(), - 'DLS_PDT': self.get_dls_pdt(), - 'RDS_INFORMATION': self.get_rds_information(), - 'TUNER_PRESET': self.get_tuner_preset(), - 'PRESET_DETAIL': self.get_preset_details(), + "POWER": self.get_power(), + "VOLUME": self.get_volume(), + "SOURCE": self.get_source(), + "MUTE": self.get_mute(), + "MENU": self.get_menu(), + "INCOMING_AUDIO_FORMAT": self.get_incoming_audio_format(), + "DECODE_MODE_2CH": self.get_decode_mode_2ch(), + "DECODE_MODE_MCH": self.get_decode_mode_mch(), + "DAB_STATION": self.get_dab_station(), + "DLS_PDT": self.get_dls_pdt(), + "RDS_INFORMATION": self.get_rds_information(), + "TUNER_PRESET": self.get_tuner_preset(), + "PRESET_DETAIL": self.get_preset_details(), } def __repr__(self) -> str: - return "State ({}) Amx ({})".format(self.to_dict(), self._amxduet.values if self._amxduet else {}) + return "State ({}) Amx ({})".format( + self.to_dict(), self._amxduet.values if self._amxduet else {} + ) def _listen(self, packet: Union[ResponsePacket, AmxDuetResponse]) -> None: if isinstance(packet, AmxDuetResponse): @@ -123,26 +128,39 @@ def revision(self) -> Optional[str]: return self._amxduet.device_revision return None - def get_rc5code(self, table: Dict[Tuple[ApiModel, int], Dict[_T, bytes]], value: _T) -> bytes: + def get_rc5code( + self, table: Dict[Tuple[ApiModel, int], Dict[_T, bytes]], value: _T + ) -> bytes: lookup = table.get((self._api_model, self._zn)) if not lookup: - raise ValueError("Unkown mapping for model {} and zone {}".format(self._api_model, self._zn)) + raise ValueError( + "Unkown mapping for model {} and zone {}".format( + self._api_model, self._zn + ) + ) command = lookup.get(value) if not command: - raise ValueError("Unkown command for model {} and zone {} and value {}".format(self._api_model, self._zn, value)) + raise ValueError( + "Unkown command for model {} and zone {} and value {}".format( + self._api_model, self._zn, value + ) + ) return command def get(self, cc): return self._state[cc] - def get_incoming_audio_format(self) -> Union[Tuple[IncomingAudioFormat, IncomingAudioConfig], Tuple[None, None]]: + def get_incoming_audio_format( + self, + ) -> Union[Tuple[IncomingAudioFormat, IncomingAudioConfig], Tuple[None, None]]: value = self._state.get(CommandCodes.INCOMING_AUDIO_FORMAT) if value is None: return None, None - return (IncomingAudioFormat.from_int(value[0]), - IncomingAudioConfig.from_int(value[1])) - + return ( + IncomingAudioFormat.from_int(value[0]), + IncomingAudioConfig.from_int(value[1]), + ) def get_decode_mode_2ch(self) -> Optional[DecodeMode2CH]: value = self._state.get(CommandCodes.DECODE_MODE_STATUS_2CH) @@ -153,7 +171,8 @@ def get_decode_mode_2ch(self) -> Optional[DecodeMode2CH]: async def set_decode_mode_2ch(self, mode: DecodeMode2CH) -> None: command = self.get_rc5code(RC5CODE_DECODE_MODE_2CH, mode) await self._client.request( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) def get_decode_mode_mch(self) -> Optional[DecodeModeMCH]: value = self._state.get(CommandCodes.DECODE_MODE_STATUS_MCH) @@ -164,7 +183,8 @@ def get_decode_mode_mch(self) -> Optional[DecodeModeMCH]: async def set_decode_mode_mch(self, mode: DecodeModeMCH) -> None: command = self.get_rc5code(RC5CODE_DECODE_MODE_MCH, mode) await self._client.request( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) def get_2ch(self) -> bool: """Return if source is 2 channel or not.""" @@ -185,13 +205,17 @@ def get_decode_mode(self) -> Optional[Union[DecodeModeMCH, DecodeMode2CH]]: else: return self.get_decode_mode_mch() - def get_decode_modes(self) -> Optional[Union[List[DecodeModeMCH], List[DecodeMode2CH]]]: + def get_decode_modes( + self, + ) -> Optional[Union[List[DecodeModeMCH], List[DecodeMode2CH]]]: if self.get_2ch(): return list(RC5CODE_DECODE_MODE_2CH[(self._api_model, self._zn)]) else: return list(RC5CODE_DECODE_MODE_MCH[(self._api_model, self._zn)]) - - async def set_decode_mode(self, mode: Union[str, DecodeModeMCH, DecodeMode2CH]) -> None: + + async def set_decode_mode( + self, mode: Union[str, DecodeModeMCH, DecodeMode2CH] + ) -> None: if self.get_2ch(): if isinstance(mode, str): mode = DecodeMode2CH[mode] @@ -209,7 +233,7 @@ def get_power(self) -> Optional[bool]: value = self._state.get(CommandCodes.POWER) if value is None: return None - return int.from_bytes(value, 'big') == 0x01 + return int.from_bytes(value, "big") == 0x01 async def set_power(self, power: bool) -> None: if self._api_model in POWER_WRITE_SUPPORTED: @@ -217,12 +241,14 @@ async def set_power(self, power: bool) -> None: if not power: self._state[CommandCodes.POWER] = bytes([0]) await self._client.request( - self._zn, CommandCodes.POWER, bytes([bool_to_hex])) + self._zn, CommandCodes.POWER, bytes([bool_to_hex]) + ) else: command = self.get_rc5code(RC5CODE_POWER, power) if power: await self._client.request( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) else: # seed with a response, since device might not # respond in timely fashion, so let's just @@ -230,7 +256,8 @@ async def set_power(self, power: bool) -> None: # back. self._state[CommandCodes.POWER] = bytes([0]) await self._client.send( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) def get_menu(self) -> Optional[MenuCodes]: value = self._state.get(CommandCodes.MENU) @@ -242,17 +269,19 @@ def get_mute(self) -> Optional[bool]: value = self._state.get(CommandCodes.MUTE) if value is None: return None - return int.from_bytes(value, 'big') == 0 + return int.from_bytes(value, "big") == 0 async def set_mute(self, mute: bool) -> None: if self._api_model in MUTE_WRITE_SUPPORTED: bool_to_hex = 0x00 if mute else 0x01 await self._client.request( - self._zn, CommandCodes.MUTE, bytes([bool_to_hex])) + self._zn, CommandCodes.MUTE, bytes([bool_to_hex]) + ) else: command = self.get_rc5code(RC5CODE_MUTE, mute) await self._client.request( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) def get_source(self) -> Optional[SourceCodes]: value = self._state.get(CommandCodes.CURRENT_SOURCE) @@ -269,22 +298,21 @@ def get_source_list(self) -> List[SourceCodes]: async def set_source(self, src: SourceCodes) -> None: if self._api_model in SOURCE_WRITE_SUPPORTED: value = src.to_bytes(self._api_model, self._zn) - await self._client.request( - self._zn, CommandCodes.CURRENT_SOURCE, value) + await self._client.request(self._zn, CommandCodes.CURRENT_SOURCE, value) else: command = self.get_rc5code(RC5CODE_SOURCE, src) await self._client.request( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) def get_volume(self) -> Optional[int]: value = self._state.get(CommandCodes.VOLUME) if value is None: return None - return int.from_bytes(value, 'big') + return int.from_bytes(value, "big") async def set_volume(self, volume: int) -> None: - await self._client.request( - self._zn, CommandCodes.VOLUME, bytes([volume])) + await self._client.request(self._zn, CommandCodes.VOLUME, bytes([volume])) async def inc_volume(self) -> None: if self._api_model in VOLUME_STEP_SUPPORTED: @@ -292,7 +320,8 @@ async def inc_volume(self) -> None: else: command = self.get_rc5code(RC5CODE_VOLUME, True) await self._client.request( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) async def dec_volume(self) -> None: if self._api_model in VOLUME_STEP_SUPPORTED: @@ -300,34 +329,35 @@ async def dec_volume(self) -> None: else: command = self.get_rc5code(RC5CODE_VOLUME, False) await self._client.request( - self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command) + self._zn, CommandCodes.SIMULATE_RC5_IR_COMMAND, command + ) def get_dab_station(self) -> Optional[str]: value = self._state.get(CommandCodes.DAB_STATION) if value is None: return None - return value.decode('utf8').rstrip() + return value.decode("utf8").rstrip() def get_dls_pdt(self) -> Optional[str]: value = self._state.get(CommandCodes.DLS_PDT_INFO) if value is None: return None - return value.decode('utf8').rstrip() + return value.decode("utf8").rstrip() def get_rds_information(self) -> Optional[str]: value = self._state.get(CommandCodes.RDS_INFORMATION) if value is None: return None - return value.decode('utf8').rstrip() + return value.decode("utf8").rstrip() async def set_tuner_preset(self, preset: int) -> None: await self._client.request(self._zn, CommandCodes.TUNER_PRESET, bytes([preset])) def get_tuner_preset(self) -> Optional[int]: value = self._state.get(CommandCodes.TUNER_PRESET) - if value is None or value == b'\xff': + if value is None or value == b"\xff": return None - return int.from_bytes(value, 'big') + return int.from_bytes(value, "big") def get_preset_details(self) -> Dict[int, PresetDetail]: return self._presets @@ -350,8 +380,10 @@ async def _update_presets() -> None: presets = {} for preset in range(1, 51): try: - data = await self._client.request(self._zn, CommandCodes.PRESET_DETAIL, bytes([preset])) - if data != b'\x00': + data = await self._client.request( + self._zn, CommandCodes.PRESET_DETAIL, bytes([preset]) + ) + if data != b"\x00": presets[preset] = PresetDetail.from_bytes(data) except CommandInvalidAtThisTime: break @@ -385,7 +417,7 @@ async def _update_amxduet() -> None: if data.device_model in APIVERSION_PA_SERIES: self._api_model = ApiModel.APIPA_SERIES - + if data.device_model in APIVERSION_ST_SERIES: self._api_model = ApiModel.APIST_SERIES @@ -400,22 +432,23 @@ async def _update_amxduet() -> None: if self._amxduet is None: await _update_amxduet() - await asyncio.gather(*[ - _update(CommandCodes.POWER), - _update(CommandCodes.VOLUME), - _update(CommandCodes.MUTE), - _update(CommandCodes.CURRENT_SOURCE), - _update(CommandCodes.MENU), - _update(CommandCodes.DECODE_MODE_STATUS_2CH), - _update(CommandCodes.DECODE_MODE_STATUS_MCH), - _update(CommandCodes.INCOMING_AUDIO_FORMAT), - _update(CommandCodes.DAB_STATION), - _update(CommandCodes.DLS_PDT_INFO), - _update(CommandCodes.RDS_INFORMATION), - _update(CommandCodes.TUNER_PRESET), - _update_presets(), - ]) + await asyncio.gather( + *[ + _update(CommandCodes.POWER), + _update(CommandCodes.VOLUME), + _update(CommandCodes.MUTE), + _update(CommandCodes.CURRENT_SOURCE), + _update(CommandCodes.MENU), + _update(CommandCodes.DECODE_MODE_STATUS_2CH), + _update(CommandCodes.DECODE_MODE_STATUS_MCH), + _update(CommandCodes.INCOMING_AUDIO_FORMAT), + _update(CommandCodes.DAB_STATION), + _update(CommandCodes.DLS_PDT_INFO), + _update(CommandCodes.RDS_INFORMATION), + _update(CommandCodes.TUNER_PRESET), + _update_presets(), + ] + ) else: if self._state: self._state = dict() - diff --git a/src/arcam/fmj/utils.py b/src/arcam/fmj/utils.py index 89b7c95..1012ce2 100644 --- a/src/arcam/fmj/utils.py +++ b/src/arcam/fmj/utils.py @@ -9,6 +9,7 @@ _LOGGER = logging.getLogger(__name__) + def async_retry(attempts=2, allowed_exceptions=()): def decorator(f): @functools.wraps(f) @@ -25,8 +26,10 @@ async def wrapper(*args, **kwargs): _LOGGER.warning("Retrying: %s %s", f, args) return wrapper + return decorator + class Throttle: def __init__(self, delay: float) -> None: self._timestamp = datetime.now() @@ -61,13 +64,19 @@ def get_possibly_invalid_xml(data) -> Any: return ElementTree.fromstring(data) except ElementTree.ParseError: _LOGGER.info("Device provide corrupt xml, trying with ampersand replacement") - data = re.sub(r'&(?![A-Za-z]+[0-9]*;|#[0-9]+;|#x[0-9a-fA-F]+;)', r'&', data) + data = re.sub(r"&(?![A-Za-z]+[0-9]*;|#[0-9]+;|#x[0-9a-fA-F]+;)", r"&", data) return ElementTree.fromstring(data) + def get_udn_from_xml(xml: Any) -> Optional[str]: - return xml.findtext("d:device/d:UDN", None, {"d": "urn:schemas-upnp-org:device-1-0"}) + return xml.findtext( + "d:device/d:UDN", None, {"d": "urn:schemas-upnp-org:device-1-0"} + ) + -async def get_uniqueid_from_device_description(session: aiohttp.ClientSession, url: str): +async def get_uniqueid_from_device_description( + session: aiohttp.ClientSession, url: str +): """Retrieve and extract unique id from url.""" try: async with session.get(url) as req: