From 460d6a9f7aa04f2ec757b359e4b977c8d0d799cc Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 30 Dec 2021 20:28:57 -1000 Subject: [PATCH] Fix RGBW on SK6812RGBW strips (#287) --- flux_led/aiodevice.py | 5 +- flux_led/base_device.py | 49 +++---- flux_led/device.py | 7 +- flux_led/protocol.py | 297 ++++++++++++++++++++++------------------ tests_aio.py | 26 +++- 5 files changed, 211 insertions(+), 173 deletions(-) diff --git a/flux_led/aiodevice.py b/flux_led/aiodevice.py index e71d66e3..c1587bd6 100644 --- a/flux_led/aiodevice.py +++ b/flux_led/aiodevice.py @@ -319,11 +319,12 @@ async def async_set_levels( ) async def _async_process_levels_change( - self, msg: bytearray, updates: Dict[str, int] + self, msgs: List[bytearray], updates: Dict[str, int] ) -> None: """Process and send a levels change.""" self._set_transition_complete_time() - await self._async_send_msg(msg) + for msg in msgs: + await self._async_send_msg(msg) if updates: self._replace_raw_state(updates) diff --git a/flux_led/base_device.py b/flux_led/base_device.py index 1dd8b8ef..5ad2bae2 100644 --- a/flux_led/base_device.py +++ b/flux_led/base_device.py @@ -376,6 +376,12 @@ def color_modes(self) -> Set[str]: def _internal_color_modes(self) -> Set[str]: """The internal available color modes.""" assert self.raw_state is not None + if ( + self._device_config is not None + # Currently this is only the SK6812RGBW strips on 0xA3 + and self._device_config.operating_mode == COLOR_MODE_RGBW + ): + return {COLOR_MODE_RGBW} if not is_known_model(self.model_num): # Default mode is RGB return BASE_MODE_MAP.get(self.raw_state.mode & 0x0F, {DEFAULT_MODE}) @@ -953,7 +959,7 @@ def speed(self) -> int: def getSpeed(self) -> int: return self.speed - def _generate_random_levels_change(self) -> Tuple[bytearray, Dict[str, int]]: + def _generate_random_levels_change(self) -> Tuple[List[bytearray], Dict[str, int]]: """Generate a random levels change.""" channels = {STATE_WARM_WHITE} if COLOR_MODES_RGB.intersection(self.color_modes): @@ -972,7 +978,7 @@ def _generate_levels_change( channels: Dict[str, Optional[int]], persist: bool = True, brightness: Optional[int] = None, - ) -> Tuple[bytearray, Dict[str, int]]: + ) -> Tuple[List[bytearray], Dict[str, int]]: """Generate the levels change request.""" channel_map = self.model_data.channel_map if channel_map: @@ -995,16 +1001,16 @@ def _generate_levels_change( if brightness is not None and r is not None and g is not None and b is not None: (r, g, b) = self._calculateBrightness((r, g, b), brightness) - r_value = 0 if r is None else int(r) - g_value = 0 if g is None else int(g) - b_value = 0 if b is None else int(b) - w_value = 0 if w is None else int(w) + r_value = None if r is None else int(r) + g_value = None if g is None else int(g) + b_value = None if b is None else int(b) + w_value = None if w is None else int(w) # ProtocolLEDENET9Byte devices support two white outputs for cold and warm. if w2 is None: # If we're only setting a single white value, # we set the second output to be the same as the first w2_value = ( - int(w) if w is not None and self.color_mode != COLOR_MODE_CCT else 0 + int(w) if w is not None and self.color_mode != COLOR_MODE_CCT else None ) else: w2_value = int(w2) @@ -1017,36 +1023,19 @@ def _generate_levels_change( elif r is None and g is None and b is None: write_mode = LevelWriteMode.WHITES - _LOGGER.debug( - "%s: _generate_levels_change using %s: persist=%s r=%s/%s, g=%s/%s b=%s/%s, w=%s/%s w2=%s/%s write_mode=%s/%s", - self.ipaddr, - self.protocol, - persist, - r_value, - f"0x{r_value:02X}", - g_value, - f"0x{g_value:02X}", - b_value, - f"0x{b_value:02X}", - w_value, - f"0x{w_value:02X}", - w2_value, - f"0x{w2_value:02X}", - write_mode, - f"0x{write_mode.value:02X}", - ) - assert self._protocol is not None - msg = self._protocol.construct_levels_change( + msgs = self._protocol.construct_levels_change( persist, r_value, g_value, b_value, w_value, w2_value, write_mode ) updates = {} multi_mode = self.multi_color_mode if multi_mode or write_mode in WRITE_ALL_COLORS: - updates.update({"red": r_value, "green": g_value, "blue": b_value}) + updates.update( + {"red": r_value or 0, "green": g_value or 0, "blue": b_value or 0} + ) if multi_mode or write_mode in WRITE_ALL_WHITES: - updates.update({"warm_white": w_value, "cool_white": w2_value}) - return msg, updates + updates.update({"warm_white": w_value or 0, "cool_white": w2_value or 0}) + return msgs, updates def _set_transition_complete_time(self) -> None: """Set the time we expect the transition will be completed. diff --git a/flux_led/device.py b/flux_led/device.py index 6d767662..82140127 100644 --- a/flux_led/device.py +++ b/flux_led/device.py @@ -181,12 +181,15 @@ def set_levels( ) @_socket_retry(attempts=2) # type: ignore - def _process_levels_change(self, msg: bytearray, updates: Dict[str, int]) -> None: + def _process_levels_change( + self, msgs: List[bytearray], updates: Dict[str, int] + ) -> None: # send the message with self._lock: self._connect_if_disconnected() self._set_transition_complete_time() - self._send_msg(msg) + for msg in msgs: + self._send_msg(msg) if updates: self._replace_raw_state(updates) diff --git a/flux_led/protocol.py b/flux_led/protocol.py index 753616b5..18a54347 100755 --- a/flux_led/protocol.py +++ b/flux_led/protocol.py @@ -513,13 +513,13 @@ def construct_music_mode( def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request.""" @abstractmethod @@ -698,13 +698,13 @@ def construct_state_change(self, turn_on: int) -> bytearray: def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request.""" # sample message for original LEDENET protocol (w/o checksum at end) # 0 1 2 3 4 @@ -715,7 +715,11 @@ def construct_levels_change( # | | green # | red # head - return self.construct_message(bytearray([0x56, red, green, blue, 0xAA])) + return [ + self.construct_message( + bytearray([0x56, red or 0x00, green or 0x00, blue or 0x00, 0xAA]) + ) + ] def construct_message(self, raw_bytes: bytearray) -> bytearray: """Original protocol uses no checksum.""" @@ -736,13 +740,13 @@ def name(self) -> str: def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request.""" # sample message for original LEDENET protocol (w/o checksum at end) # 0 1 2 3 4 @@ -753,7 +757,9 @@ def construct_levels_change( # | | green # | red # head - return self.construct_message(bytearray([0x56, red, green, 0xAA])) + return [ + self.construct_message(bytearray([0x56, red or 0x00, green or 0x00, 0xAA])) + ] class ProtocolLEDENET8Byte(ProtocolBase): @@ -814,13 +820,13 @@ def construct_preset_pattern( def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request.""" # sample message for 8-byte protocols (w/ checksum at end) # 0 1 2 3 4 5 6 @@ -846,19 +852,21 @@ def construct_levels_change( # value (f0). # # For all other rgb and rgbw devices, the value is 00 - return self.construct_message( - bytearray( - [ - 0x31 if persist else 0x41, - red, - green, - blue, - warm_white, - write_mode.value, - 0x0F, - ] + return [ + self.construct_message( + bytearray( + [ + 0x31 if persist else 0x41, + red or 0x00, + green or 0x00, + blue or 0x00, + warm_white or 0x00, + write_mode.value, + 0x0F, + ] + ) ) - ) + ] def construct_message(self, raw_bytes: bytearray) -> bytearray: """Calculate checksum of byte array and add to end.""" @@ -1077,13 +1085,13 @@ def name(self) -> str: def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request.""" # sample message for 9-byte LEDENET protocol (w/ checksum at end) # 0 1 2 3 4 5 6 7 @@ -1098,20 +1106,22 @@ def construct_levels_change( # | red # persistence (31 for true / 41 for false) # - return self.construct_message( - bytearray( - [ - 0x31 if persist else 0x41, - red, - green, - blue, - warm_white, - cool_white, - write_mode.value, - 0x0F, - ] + return [ + self.construct_message( + bytearray( + [ + 0x31 if persist else 0x41, + red or 0x00, + green or 0x00, + blue or 0x00, + warm_white or 0x00, + cool_white or 0x00, + write_mode.value, + 0x0F, + ] + ) ) - ) + ] class ProtocolLEDENET9ByteAutoOn(ProtocolLEDENET9Byte): @@ -1341,36 +1351,43 @@ def construct_preset_pattern( def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request. white 41 01 ff ff ff 00 00 00 60 ff 00 00 9e """ preset_number = 0x01 # aka fixed color - return self.construct_message( - bytearray( - [ - 0x41, - preset_number, - red, - green, - blue, - 0x00, - 0x00, - 0x00, - 0x60, - 0xFF, - 0x00, - 0x00, - ] + msgs = [] + if red is not None or green is not None or blue is not None: + msgs.append( + self.construct_message( + bytearray( + [ + 0x41, + preset_number, + red or 0x00, + green or 0x00, + blue or 0x00, + 0x00, + 0x00, + 0x00, + 0x60, + 0xFF, + 0x00, + 0x00, + ] + ) + ) ) - ) + if warm_white is not None: + msgs.append(self.construct_message(bytearray([0x47, warm_white or 0x00]))) + return msgs def construct_music_mode( self, @@ -1666,13 +1683,13 @@ def construct_music_mode( def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request. b0 [unknown static?] b1 [unknown static?] b2 [unknown static?] b3 [unknown static?] 00 [unknown static?] 01 [unknown static?] 01 [unknown static?] 6a [incrementing sequence number] 00 [unknown static?] 0d [unknown, sometimes 0c] 41 [unknown static?] 02 [preset number] ff [foreground r] 00 [foreground g] 00 [foreground b] 00 [background red] ff [background green] 00 [background blue] 06 [speed or direction?] 00 [unknown static?] 00 [unknown static?] 00 [unknown static?] 47 [speed or direction?] cd [check sum] @@ -1714,12 +1731,12 @@ def construct_levels_change( Set Red b0b1b2b30001010d0034a0000600010000ff0000ff0002ff00000000ff00030000ff0000ff0004ff00000000ff00050000ff0000ff0006ff00000000ffaf67 """ - return self.construct_wrapped_message( - super().construct_levels_change( + return [ + self.construct_wrapped_message(msg, inner_pre_constructed=True) + for msg in super().construct_levels_change( persist, red, green, blue, warm_white, cool_white, write_mode - ), - inner_pre_constructed=True, - ) + ) + ] def construct_zone_change( self, @@ -1833,38 +1850,42 @@ def requires_turn_on(self) -> bool: def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request. b0 b1 b2 b3 00 01 01 52 00 09 35 b1 00 64 00 00 00 03 4d bd - 100% warm b0 b1 b2 b3 00 01 01 72 00 09 35 b1 64 64 00 00 00 03 b1 a5 - 100% cool b0 b1 b2 b3 00 01 01 9f 00 09 35 b1 64 32 00 00 00 03 7f 6e - 100% cool - dim 50% """ + assert warm_white is not None + assert cool_white is not None scaled_temp, brightness = white_levels_to_scaled_color_temp( warm_white, cool_white ) - return self.construct_wrapped_message( - bytearray( - [ - 0x35, - 0xB1, - scaled_temp, - # If the brightness goes below the precision the device - # will flip from cold to warm - max(self.MIN_BRIGHTNESS, brightness), - 0x00, - 0x00, - 0x00, - 0x03, - ] + return [ + self.construct_wrapped_message( + bytearray( + [ + 0x35, + 0xB1, + scaled_temp, + # If the brightness goes below the precision the device + # will flip from cold to warm + max(self.MIN_BRIGHTNESS, brightness), + 0x00, + 0x00, + 0x00, + 0x03, + ] + ) ) - ) + ] class ProtocolLEDENETAddressableChristmas(ProtocolLEDENETAddressableBase): @@ -1920,16 +1941,15 @@ def construct_preset_pattern( def construct_levels_change( self, persist: int, - red: int, - green: int, - blue: int, - warm_white: int, - cool_white: int, + red: Optional[int], + green: Optional[int], + blue: Optional[int], + warm_white: Optional[int], + cool_white: Optional[int], write_mode: LevelWriteMode, - ) -> bytearray: + ) -> List[bytearray]: """The bytes to send for a level change request. - Green 100%: b0b1b2b300010180000d3ba100646400000000000000a49d @@ -1977,25 +1997,30 @@ def construct_levels_change( Single - Blue - Brightness 50% 3b a1 78 64 32 00 00 00 00 00 00 00 ea """ + assert red is not None + assert green is not None + assert blue is not None h, s, v = colorsys.rgb_to_hsv(red / 255, green / 255, blue / 255) - return self.construct_wrapped_message( - bytearray( - [ - 0x3B, - 0xA1, - int(h * 180), - int(s * 100), - int(v * 100), - 0x00, - 0x00, - 0x00, - 0x00, - 0x00, - 0x00, - 0x00, - ] + return [ + self.construct_wrapped_message( + bytearray( + [ + 0x3B, + 0xA1, + int(h * 180), + int(s * 100), + int(v * 100), + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + ] + ) ) - ) + ] def construct_zone_change( self, diff --git a/tests_aio.py b/tests_aio.py index e92670f1..8a12f455 100644 --- a/tests_aio.py +++ b/tests_aio.py @@ -13,6 +13,7 @@ from flux_led.const import ( COLOR_MODE_CCT, COLOR_MODE_RGB, + COLOR_MODE_RGBW, COLOR_MODE_RGBWW, EFFECT_MUSIC, MultiColorEffects, @@ -782,6 +783,25 @@ def _updated_callback(*args, **kwargs): assert light.model_num == 0xA3 assert light.dimmable_effects is True assert light.requires_turn_on is False + assert light.color_mode == COLOR_MODE_RGBW + assert light.color_modes == {COLOR_MODE_RGBW} + transport.reset_mock() + + await light.async_set_levels(r=255, g=255, b=255, w=255) + assert transport.mock_calls == [ + call.write( + bytearray( + b"\xb0\xb1\xb2\xb3\x00\x01\x01\x01\x00\rA\x01\xff\xff\xff\x00\x00\x00`\xff\x00\x00\x9e\x12" + ) + ), + call.write(bytearray(b"\xb0\xb1\xb2\xb3\x00\x01\x01\x02\x00\x03G\xffFY")), + ] + + transport.reset_mock() + await light.async_set_levels(w=255) + assert transport.mock_calls == [ + call.write(bytearray(b"\xb0\xb1\xb2\xb3\x00\x01\x01\x03\x00\x03G\xffFZ")) + ] @pytest.mark.asyncio @@ -2087,7 +2107,7 @@ def _updated_callback(*args, **kwargs): assert transport.mock_calls[0][0] == "write" assert ( transport.mock_calls[0][1][0] - == b'\xb0\xb1\xb2\xb3\x00\x01\x01\x01\x00\x10*\x01\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x0f5C' + == b"\xb0\xb1\xb2\xb3\x00\x01\x01\x01\x00\x10*\x01\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x0f5C" ) transport.reset_mock() @@ -2095,7 +2115,7 @@ def _updated_callback(*args, **kwargs): assert transport.mock_calls[0][0] == "write" assert ( transport.mock_calls[0][1][0] - == b'\xb0\xb1\xb2\xb3\x00\x01\x01\x03\x00\x10*\x02\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x0f6G' + == b"\xb0\xb1\xb2\xb3\x00\x01\x01\x03\x00\x10*\x02\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x0f6G" ) transport.reset_mock() @@ -2103,7 +2123,7 @@ def _updated_callback(*args, **kwargs): assert transport.mock_calls[0][0] == "write" assert ( transport.mock_calls[0][1][0] - == b'\xb0\xb1\xb2\xb3\x00\x01\x01\x05\x00\x10*\x03\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x0f7K' + == b"\xb0\xb1\xb2\xb3\x00\x01\x01\x05\x00\x10*\x03\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x0f7K" )