Skip to content

Commit

Permalink
Fix RGBW on SK6812RGBW strips (#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Dec 31, 2021
1 parent c59e8d7 commit 460d6a9
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 173 deletions.
5 changes: 3 additions & 2 deletions flux_led/aiodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,12 @@ async def async_set_levels(
)

async def _async_process_levels_change(
self, msg: bytearray, updates: Dict[str, int]
self, msgs: List[bytearray], updates: Dict[str, int]
) -> None:
"""Process and send a levels change."""
self._set_transition_complete_time()
await self._async_send_msg(msg)
for msg in msgs:
await self._async_send_msg(msg)
if updates:
self._replace_raw_state(updates)

Expand Down
49 changes: 19 additions & 30 deletions flux_led/base_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,12 @@ def color_modes(self) -> Set[str]:
def _internal_color_modes(self) -> Set[str]:
"""The internal available color modes."""
assert self.raw_state is not None
if (
self._device_config is not None
# Currently this is only the SK6812RGBW strips on 0xA3
and self._device_config.operating_mode == COLOR_MODE_RGBW
):
return {COLOR_MODE_RGBW}
if not is_known_model(self.model_num):
# Default mode is RGB
return BASE_MODE_MAP.get(self.raw_state.mode & 0x0F, {DEFAULT_MODE})
Expand Down Expand Up @@ -953,7 +959,7 @@ def speed(self) -> int:
def getSpeed(self) -> int:
return self.speed

def _generate_random_levels_change(self) -> Tuple[bytearray, Dict[str, int]]:
def _generate_random_levels_change(self) -> Tuple[List[bytearray], Dict[str, int]]:
"""Generate a random levels change."""
channels = {STATE_WARM_WHITE}
if COLOR_MODES_RGB.intersection(self.color_modes):
Expand All @@ -972,7 +978,7 @@ def _generate_levels_change(
channels: Dict[str, Optional[int]],
persist: bool = True,
brightness: Optional[int] = None,
) -> Tuple[bytearray, Dict[str, int]]:
) -> Tuple[List[bytearray], Dict[str, int]]:
"""Generate the levels change request."""
channel_map = self.model_data.channel_map
if channel_map:
Expand All @@ -995,16 +1001,16 @@ def _generate_levels_change(
if brightness is not None and r is not None and g is not None and b is not None:
(r, g, b) = self._calculateBrightness((r, g, b), brightness)

r_value = 0 if r is None else int(r)
g_value = 0 if g is None else int(g)
b_value = 0 if b is None else int(b)
w_value = 0 if w is None else int(w)
r_value = None if r is None else int(r)
g_value = None if g is None else int(g)
b_value = None if b is None else int(b)
w_value = None if w is None else int(w)
# ProtocolLEDENET9Byte devices support two white outputs for cold and warm.
if w2 is None:
# If we're only setting a single white value,
# we set the second output to be the same as the first
w2_value = (
int(w) if w is not None and self.color_mode != COLOR_MODE_CCT else 0
int(w) if w is not None and self.color_mode != COLOR_MODE_CCT else None
)
else:
w2_value = int(w2)
Expand All @@ -1017,36 +1023,19 @@ def _generate_levels_change(
elif r is None and g is None and b is None:
write_mode = LevelWriteMode.WHITES

_LOGGER.debug(
"%s: _generate_levels_change using %s: persist=%s r=%s/%s, g=%s/%s b=%s/%s, w=%s/%s w2=%s/%s write_mode=%s/%s",
self.ipaddr,
self.protocol,
persist,
r_value,
f"0x{r_value:02X}",
g_value,
f"0x{g_value:02X}",
b_value,
f"0x{b_value:02X}",
w_value,
f"0x{w_value:02X}",
w2_value,
f"0x{w2_value:02X}",
write_mode,
f"0x{write_mode.value:02X}",
)

assert self._protocol is not None
msg = self._protocol.construct_levels_change(
msgs = self._protocol.construct_levels_change(
persist, r_value, g_value, b_value, w_value, w2_value, write_mode
)
updates = {}
multi_mode = self.multi_color_mode
if multi_mode or write_mode in WRITE_ALL_COLORS:
updates.update({"red": r_value, "green": g_value, "blue": b_value})
updates.update(
{"red": r_value or 0, "green": g_value or 0, "blue": b_value or 0}
)
if multi_mode or write_mode in WRITE_ALL_WHITES:
updates.update({"warm_white": w_value, "cool_white": w2_value})
return msg, updates
updates.update({"warm_white": w_value or 0, "cool_white": w2_value or 0})
return msgs, updates

def _set_transition_complete_time(self) -> None:
"""Set the time we expect the transition will be completed.
Expand Down
7 changes: 5 additions & 2 deletions flux_led/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,15 @@ def set_levels(
)

@_socket_retry(attempts=2) # type: ignore
def _process_levels_change(self, msg: bytearray, updates: Dict[str, int]) -> None:
def _process_levels_change(
self, msgs: List[bytearray], updates: Dict[str, int]
) -> None:
# send the message
with self._lock:
self._connect_if_disconnected()
self._set_transition_complete_time()
self._send_msg(msg)
for msg in msgs:
self._send_msg(msg)
if updates:
self._replace_raw_state(updates)

Expand Down
Loading

0 comments on commit 460d6a9

Please sign in to comment.