From 36bef5ba33705406fe2493841fc002edda63d343 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 22 Dec 2021 00:03:39 -0700 Subject: [PATCH] Fix Legacy CCT Controllers (#262) --- README.md | 1 + flux_led/base_device.py | 18 ++---- flux_led/device.py | 6 +- flux_led/fluxled.py | 10 ++++ flux_led/models_db.py | 21 +++++++ flux_led/protocol.py | 10 ++-- flux_led/scanner.py | 11 +++- tests.py | 125 ++++++++++++++++++++++++++++++++++++++-- tests_aio.py | 10 ++++ 9 files changed, 185 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index c2822059..ac8ab7c4 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ The following models have been tested with library. | Model | Description | Notes | | ----- | --------------------------- | ------------------------------- | +| 0x03 | Legacy CCT Controller | Original protocol | | 0x04 | UFO Controller RGBW | | | 0x06 | Controller RGBW | | | 0x07 | Controller RGBCW | | diff --git a/flux_led/base_device.py b/flux_led/base_device.py index 60a77af8..31284627 100644 --- a/flux_led/base_device.py +++ b/flux_led/base_device.py @@ -209,7 +209,7 @@ def _whites_are_temp_brightness(self) -> bool: @property def model(self) -> str: """Return the human readable model description.""" - if self._discovery and self._discovery[ATTR_MODEL_DESCRIPTION]: + if self._discovery and self._discovery.get(ATTR_MODEL_DESCRIPTION): return f"{self._discovery[ATTR_MODEL_DESCRIPTION]} (0x{self.model_num:02X})" return f"{self.model_data.description} (0x{self.model_num:02X})" @@ -221,7 +221,7 @@ def version_num(self) -> int: if hasattr(raw_state, "version_number"): assert isinstance(raw_state, LEDENETRawState) return raw_state.version_number - return 1 + return 0 # old devices report as 0 @property def preset_pattern_num(self) -> int: @@ -279,10 +279,7 @@ def white_active(self) -> bool: """Any white channel is active.""" assert self.raw_state is not None raw_state = self.raw_state - if hasattr(raw_state, "cool_white"): - assert isinstance(raw_state, LEDENETRawState) - return bool(raw_state.warm_white or raw_state.cool_white) - return bool(raw_state.warm_white) + return bool(raw_state.warm_white or raw_state.cool_white) @property def color_active(self) -> bool: @@ -419,7 +416,6 @@ def _named_effect(self) -> Optional[str]: def cool_white(self) -> int: assert self.raw_state is not None if self._rgbwwprotocol: - assert isinstance(self.raw_state, LEDENETRawState) return self.raw_state.cool_white return 0 @@ -463,7 +459,6 @@ def brightness(self) -> int: if color_mode == COLOR_MODE_RGBW: return round((v_255 + raw_state.warm_white) / 2) if color_mode == COLOR_MODE_RGBWW: - assert isinstance(raw_state, LEDENETRawState) return round((v_255 + raw_state.warm_white + raw_state.cool_white) / 3) # Default color mode (RGB) @@ -692,7 +687,6 @@ def getWhiteTemperature(self) -> Tuple[int, int]: # Assume input temperature of between 2700 and 6500 Kelvin, and scale # the warm and cold LEDs linearly to provide that assert self.raw_state is not None - assert isinstance(self.raw_state, LEDENETRawState) raw_state = self.raw_state temp, brightness = white_levels_to_color_temp( raw_state.warm_white, raw_state.cool_white @@ -726,8 +720,8 @@ def getRgbww(self) -> Tuple[int, int, int, int, int]: @property def rgbww(self) -> Tuple[int, int, int, int, int]: """Returns red,green,blue,warm,cool.""" - assert isinstance(self.raw_state, LEDENETRawState) raw_state = self.raw_state + assert raw_state is not None return ( raw_state.red, raw_state.green, @@ -745,8 +739,8 @@ def getRgbcw(self) -> Tuple[int, int, int, int, int]: @property def rgbcw(self) -> Tuple[int, int, int, int, int]: """Returns red,green,blue,cool,warm.""" - assert isinstance(self.raw_state, LEDENETRawState) raw_state = self.raw_state + assert raw_state is not None return ( raw_state.red, raw_state.green, @@ -758,8 +752,8 @@ def rgbcw(self) -> Tuple[int, int, int, int, int]: def getCCT(self) -> Tuple[int, int]: if self.color_mode != COLOR_MODE_CCT: return (255, 255) - assert isinstance(self.raw_state, LEDENETRawState) raw_state = self.raw_state + assert raw_state is not None return (raw_state.warm_white, raw_state.cool_white) @property diff --git a/flux_led/device.py b/flux_led/device.py index 968f83af..ced4ce91 100644 --- a/flux_led/device.py +++ b/flux_led/device.py @@ -290,8 +290,10 @@ def _determine_protocol(self) -> bytearray: full_msg = rx + self._read_msg( protocol.state_response_length - read_bytes ) - if protocol.is_valid_state_response(full_msg): - self._set_protocol_from_msg(full_msg, protocol.name) + if not protocol.is_valid_state_response(full_msg): + self.close() + continue + self._set_protocol_from_msg(full_msg, protocol.name) return full_msg raise Exception("Cannot determine protocol") diff --git a/flux_led/fluxled.py b/flux_led/fluxled.py index 1d641f0f..12005904 100644 --- a/flux_led/fluxled.py +++ b/flux_led/fluxled.py @@ -394,6 +394,13 @@ def parseArgs() -> Tuple[Values, Any]: # noqa: C901 other_group = OptionGroup(parser, "Other options") parser.add_option_group(info_group) + info_group.add_option( + "--debug", + action="store_true", + dest="debug", + default=False, + help="Enable debug logging", + ) info_group.add_option( "-e", "--examples", @@ -588,6 +595,9 @@ def parseArgs() -> Tuple[Values, Any]: # noqa: C901 parser.usage = "usage: %prog [-sS10cwdkpCiltThe] [addr1 [addr2 [addr3] ...]." (options, args) = parser.parse_args() + if options.debug: + logging.basicConfig(level=logging.DEBUG) + if options.showexamples: showUsageExamples() sys.exit(0) diff --git a/flux_led/models_db.py b/flux_led/models_db.py index 4dd907d2..3450ea55 100755 --- a/flux_led/models_db.py +++ b/flux_led/models_db.py @@ -242,6 +242,11 @@ class LEDENETHardware: chip=LEDENETChip.HFLPB100, rf_remote=False, # unverified ), + LEDENETHardware( + model="HF-A11-ZJ002", + chip=LEDENETChip.HFLPB100, + rf_remote=False, # unverified + ), LEDENETHardware( model="HF-LPB100", # reported on older UFO chip=LEDENETChip.HFLPB100, @@ -281,6 +286,22 @@ class LEDENETHardware: channel_map={}, microphone=False, ), + LEDENETModel( + model_num=0x03, + models=["HF-A11-ZJ002"], + description="Legacy Controller CCT", + always_writes_white_and_colors=False, # Formerly rgbwprotocol + protocols=[MinVersionProtocol(0, PROTOCOL_LEDENET_ORIGINAL)], + mode_to_color_mode={}, + color_modes={COLOR_MODE_CCT}, # Formerly rgbwcapable + channel_map={ + STATE_WARM_WHITE: STATE_RED, + STATE_RED: STATE_WARM_WHITE, + STATE_COOL_WHITE: STATE_GREEN, + STATE_GREEN: STATE_COOL_WHITE, + }, + microphone=False, + ), LEDENETModel( model_num=0x04, models=[ diff --git a/flux_led/protocol.py b/flux_led/protocol.py index 6a6a4725..073fb798 100755 --- a/flux_led/protocol.py +++ b/flux_led/protocol.py @@ -150,6 +150,7 @@ class LEDENETOriginalRawState(NamedTuple): blue: int warm_white: int check_sum: int + cool_white: int # typical response: @@ -477,11 +478,7 @@ def is_valid_power_state_response(self, msg: bytes) -> bool: def is_valid_state_response(self, raw_state: bytes) -> bool: """Check if a state response is valid.""" - return ( - len(raw_state) == self.state_response_length - and raw_state[0] == 0x66 - and raw_state[1] == 0x01 - ) + return len(raw_state) == self.state_response_length and raw_state[0] == 0x66 def construct_state_query(self) -> bytearray: """The bytes to send for a query request.""" @@ -521,7 +518,8 @@ def construct_message(self, raw_bytes: bytearray) -> bytearray: def named_raw_state(self, raw_state: bytes) -> LEDENETOriginalRawState: """Convert raw_state to a namedtuple.""" - return LEDENETOriginalRawState(*raw_state) + raw_bytearray = bytearray([*raw_state, 0]) + return LEDENETOriginalRawState(*raw_bytearray) class ProtocolLEDENET8Byte(ProtocolBase): diff --git a/flux_led/scanner.py b/flux_led/scanner.py index 04d19438..e0eee4a5 100644 --- a/flux_led/scanner.py +++ b/flux_led/scanner.py @@ -50,11 +50,16 @@ class FluxLEDDiscovery(TypedDict): remote_access_port: Optional[int] # the remote access port -def create_udp_socket() -> socket.socket: +def create_udp_socket(discovery_port: int) -> socket.socket: """Create a udp socket used for communicating with the device.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.bind(("", 0)) + try: + # Legacy devices require source port to be the discovery port + sock.bind(("", discovery_port)) + except OSError: + _LOGGER.debug("Port %s is not available: %s", discovery_port) + sock.bind(("", 0)) sock.setblocking(False) return sock @@ -167,7 +172,7 @@ def getBulbInfo(self) -> List[FluxLEDDiscovery]: return self.found_bulbs def _create_socket(self) -> socket.socket: - return create_udp_socket() + return create_udp_socket(self.DISCOVERY_PORT) def _destination_from_address(self, address: Optional[str]) -> Tuple[str, int]: if address is None: diff --git a/tests.py b/tests.py index 88927106..a0d1f9df 100644 --- a/tests.py +++ b/tests.py @@ -1359,7 +1359,7 @@ def read_data(expected): self.assertEqual( light.__str__(), - "ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,]", + "ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,0,]", ) self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL) self.assertEqual(light.is_on, True) @@ -1381,7 +1381,7 @@ def read_data(expected): self.assertEqual( light.__str__(), - "OFF [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,36,65,33,8,1,25,80,1,153,]", + "OFF [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,36,65,33,8,1,25,80,1,153,0,]", ) self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL) self.assertEqual(light.is_on, False) @@ -1403,7 +1403,7 @@ def read_data(expected): self.assertEqual( light.__str__(), - "ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,]", + "ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,0,]", ) self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL) self.assertEqual(light.is_on, True) @@ -1412,7 +1412,124 @@ def read_data(expected): self.assertEqual(light.cool_white, 0) self.assertEqual(light.brightness, 80) self.assertEqual(light.getRgb(), (1, 25, 80)) - self.assertEqual(light.version_num, 1) + self.assertEqual(light.version_num, 0) + + @patch("flux_led.WifiLedBulb._send_msg") + @patch("flux_led.WifiLedBulb._read_msg") + @patch("flux_led.WifiLedBulb.connect") + def test_original_ledenet_cct(self, mock_connect, mock_read, mock_send): + calls = 0 + + def read_data(expected): + nonlocal calls + calls += 1 + if calls == 1: + self.assertEqual(expected, 2) + return bytearray(b"") + if calls == 2: + self.assertEqual(expected, 2) + return bytearray(b"f\x03") + if calls == 3: + self.assertEqual(expected, 9) + return bytearray(b"#A!\x08\xff\x80*\x01\x99") + if calls == 4: + self.assertEqual(expected, 11) + return bytearray(b"f\x03#A!\x08\x01\x19P\x01\x99") + if calls == 5: # ready turn off response + self.assertEqual(expected, 4) + return bytearray(b"\x0fq#\xa3") + if calls == 6: + self.assertEqual(expected, 11) + return bytearray(b"f\x03$A!\x08\x01\x19P\x01\x99") + if calls == 7: # ready turn on response + self.assertEqual(expected, 4) + return bytearray(b"\x0fq#\xa3") + if calls == 8: + self.assertEqual(expected, 11) + return bytearray(b"f\x03#A!\x08\x01\x19P\x01\x99") + + mock_read.side_effect = read_data + light = flux_led.WifiLedBulb("192.168.1.164") + assert light.color_modes == {COLOR_MODE_CCT} + self.assertEqual(light.model_num, 0x03) + self.assertEqual(light.model, "Legacy Controller CCT (0x03)") + self.assertEqual(light.dimmable_effects, False) + self.assertEqual(light.requires_turn_on, True) + self.assertEqual(light.white_active, True) + self.assertEqual(light._protocol.power_push_updates, False) + self.assertEqual(light._protocol.state_push_updates, False) + + self.assertEqual(mock_read.call_count, 3) + self.assertEqual(mock_send.call_count, 2) + self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w"))) + + light.setWhiteTemperature(2700, 255) + self.assertEqual(mock_read.call_count, 3) + self.assertEqual(mock_send.call_count, 3) + self.assertEqual( + mock_send.call_args, mock.call(bytearray((b"V\xff\x00\x00\xaa"))) + ) + + light._transition_complete_time = 0 + light.update_state() + self.assertEqual(mock_read.call_count, 4) + self.assertEqual(mock_send.call_count, 4) + self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w"))) + + self.assertEqual( + light.__str__(), + "ON [CCT: 6354K Brightness: 10% raw state: 102,3,35,65,33,8,1,0,80,1,153,25,]", + ) + self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL) + self.assertEqual(light.is_on, True) + self.assertEqual(light.mode, "ww") + self.assertEqual(light.warm_white, 0) + self.assertEqual(light.brightness, 26) + + light.turnOff() + self.assertEqual(mock_read.call_count, 5) + self.assertEqual(mock_send.call_count, 5) + self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xcc$3"))) + + light._transition_complete_time = 0 + light.update_state() + self.assertEqual(mock_read.call_count, 6) + self.assertEqual(mock_send.call_count, 6) + self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w"))) + + self.assertEqual( + light.__str__(), + "OFF [CCT: 6354K Brightness: 10% raw state: 102,3,36,65,33,8,1,0,80,1,153,25,]", + ) + self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL) + self.assertEqual(light.is_on, False) + self.assertEqual(light.mode, "ww") + self.assertEqual(light.cool_white, 0) + self.assertEqual(light.warm_white, 0) + self.assertEqual(light.brightness, 26) + + light.turnOn() + self.assertEqual(mock_read.call_count, 7) + self.assertEqual(mock_send.call_count, 7) + self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xcc#3"))) + + light._transition_complete_time = 0 + light.update_state() + self.assertEqual(mock_read.call_count, 8) + self.assertEqual(mock_send.call_count, 8) + self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w"))) + + self.assertEqual( + light.__str__(), + "ON [CCT: 6354K Brightness: 10% raw state: 102,3,35,65,33,8,1,0,80,1,153,25,]", + ) + self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL) + self.assertEqual(light.is_on, True) + self.assertEqual(light.mode, "ww") + self.assertEqual(light.warm_white, 0) + self.assertEqual(light.cool_white, 0) + self.assertEqual(light.brightness, 26) + self.assertEqual(light.version_num, 0) @patch("flux_led.WifiLedBulb._send_msg") @patch("flux_led.WifiLedBulb._read_msg") diff --git a/tests_aio.py b/tests_aio.py index 946cba2a..3f84e2bf 100644 --- a/tests_aio.py +++ b/tests_aio.py @@ -7,6 +7,7 @@ import pytest from flux_led import aiodevice, aioscanner +from flux_led.scanner import create_udp_socket from flux_led.aio import AIOWifiLedBulb from flux_led.aioprotocol import AIOLEDENETProtocol from flux_led.aioscanner import AIOBulbScanner, LEDENETDiscovery @@ -1778,6 +1779,15 @@ async def test_async_scanner_times_out_with_nothing_specific_address( assert data == [] +@pytest.mark.asyncio +async def test_async_scanner_falls_back_to_any_source_port_if_socket_in_use(): + """Test port fallback.""" + hold_socket = create_udp_socket(AIOBulbScanner.DISCOVERY_PORT) + assert hold_socket.getsockname() == ("0.0.0.0", 48899) + random_socket = create_udp_socket(AIOBulbScanner.DISCOVERY_PORT) + assert random_socket.getsockname() != ("0.0.0.0", 48899) + + @pytest.mark.asyncio async def test_async_scanner_enable_remote_access(mock_discovery_aio_protocol): """Test scanner enabling remote access with a specific address."""