Skip to content

Commit

Permalink
Fix Legacy CCT Controllers (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Dec 22, 2021
1 parent c758515 commit 36bef5b
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 27 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ The following models have been tested with library.

| Model | Description | Notes |
| ----- | --------------------------- | ------------------------------- |
| 0x03 | Legacy CCT Controller | Original protocol |
| 0x04 | UFO Controller RGBW | |
| 0x06 | Controller RGBW | |
| 0x07 | Controller RGBCW | |
Expand Down
18 changes: 6 additions & 12 deletions flux_led/base_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _whites_are_temp_brightness(self) -> bool:
@property
def model(self) -> str:
"""Return the human readable model description."""
if self._discovery and self._discovery[ATTR_MODEL_DESCRIPTION]:
if self._discovery and self._discovery.get(ATTR_MODEL_DESCRIPTION):
return f"{self._discovery[ATTR_MODEL_DESCRIPTION]} (0x{self.model_num:02X})"
return f"{self.model_data.description} (0x{self.model_num:02X})"

Expand All @@ -221,7 +221,7 @@ def version_num(self) -> int:
if hasattr(raw_state, "version_number"):
assert isinstance(raw_state, LEDENETRawState)
return raw_state.version_number
return 1
return 0 # old devices report as 0

@property
def preset_pattern_num(self) -> int:
Expand Down Expand Up @@ -279,10 +279,7 @@ def white_active(self) -> bool:
"""Any white channel is active."""
assert self.raw_state is not None
raw_state = self.raw_state
if hasattr(raw_state, "cool_white"):
assert isinstance(raw_state, LEDENETRawState)
return bool(raw_state.warm_white or raw_state.cool_white)
return bool(raw_state.warm_white)
return bool(raw_state.warm_white or raw_state.cool_white)

@property
def color_active(self) -> bool:
Expand Down Expand Up @@ -419,7 +416,6 @@ def _named_effect(self) -> Optional[str]:
def cool_white(self) -> int:
assert self.raw_state is not None
if self._rgbwwprotocol:
assert isinstance(self.raw_state, LEDENETRawState)
return self.raw_state.cool_white
return 0

Expand Down Expand Up @@ -463,7 +459,6 @@ def brightness(self) -> int:
if color_mode == COLOR_MODE_RGBW:
return round((v_255 + raw_state.warm_white) / 2)
if color_mode == COLOR_MODE_RGBWW:
assert isinstance(raw_state, LEDENETRawState)
return round((v_255 + raw_state.warm_white + raw_state.cool_white) / 3)

# Default color mode (RGB)
Expand Down Expand Up @@ -692,7 +687,6 @@ def getWhiteTemperature(self) -> Tuple[int, int]:
# Assume input temperature of between 2700 and 6500 Kelvin, and scale
# the warm and cold LEDs linearly to provide that
assert self.raw_state is not None
assert isinstance(self.raw_state, LEDENETRawState)
raw_state = self.raw_state
temp, brightness = white_levels_to_color_temp(
raw_state.warm_white, raw_state.cool_white
Expand Down Expand Up @@ -726,8 +720,8 @@ def getRgbww(self) -> Tuple[int, int, int, int, int]:
@property
def rgbww(self) -> Tuple[int, int, int, int, int]:
"""Returns red,green,blue,warm,cool."""
assert isinstance(self.raw_state, LEDENETRawState)
raw_state = self.raw_state
assert raw_state is not None
return (
raw_state.red,
raw_state.green,
Expand All @@ -745,8 +739,8 @@ def getRgbcw(self) -> Tuple[int, int, int, int, int]:
@property
def rgbcw(self) -> Tuple[int, int, int, int, int]:
"""Returns red,green,blue,cool,warm."""
assert isinstance(self.raw_state, LEDENETRawState)
raw_state = self.raw_state
assert raw_state is not None
return (
raw_state.red,
raw_state.green,
Expand All @@ -758,8 +752,8 @@ def rgbcw(self) -> Tuple[int, int, int, int, int]:
def getCCT(self) -> Tuple[int, int]:
if self.color_mode != COLOR_MODE_CCT:
return (255, 255)
assert isinstance(self.raw_state, LEDENETRawState)
raw_state = self.raw_state
assert raw_state is not None
return (raw_state.warm_white, raw_state.cool_white)

@property
Expand Down
6 changes: 4 additions & 2 deletions flux_led/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ def _determine_protocol(self) -> bytearray:
full_msg = rx + self._read_msg(
protocol.state_response_length - read_bytes
)
if protocol.is_valid_state_response(full_msg):
self._set_protocol_from_msg(full_msg, protocol.name)
if not protocol.is_valid_state_response(full_msg):
self.close()
continue
self._set_protocol_from_msg(full_msg, protocol.name)
return full_msg
raise Exception("Cannot determine protocol")

Expand Down
10 changes: 10 additions & 0 deletions flux_led/fluxled.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ def parseArgs() -> Tuple[Values, Any]: # noqa: C901
other_group = OptionGroup(parser, "Other options")

parser.add_option_group(info_group)
info_group.add_option(
"--debug",
action="store_true",
dest="debug",
default=False,
help="Enable debug logging",
)
info_group.add_option(
"-e",
"--examples",
Expand Down Expand Up @@ -588,6 +595,9 @@ def parseArgs() -> Tuple[Values, Any]: # noqa: C901
parser.usage = "usage: %prog [-sS10cwdkpCiltThe] [addr1 [addr2 [addr3] ...]."
(options, args) = parser.parse_args()

if options.debug:
logging.basicConfig(level=logging.DEBUG)

if options.showexamples:
showUsageExamples()
sys.exit(0)
Expand Down
21 changes: 21 additions & 0 deletions flux_led/models_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ class LEDENETHardware:
chip=LEDENETChip.HFLPB100,
rf_remote=False, # unverified
),
LEDENETHardware(
model="HF-A11-ZJ002",
chip=LEDENETChip.HFLPB100,
rf_remote=False, # unverified
),
LEDENETHardware(
model="HF-LPB100", # reported on older UFO
chip=LEDENETChip.HFLPB100,
Expand Down Expand Up @@ -281,6 +286,22 @@ class LEDENETHardware:
channel_map={},
microphone=False,
),
LEDENETModel(
model_num=0x03,
models=["HF-A11-ZJ002"],
description="Legacy Controller CCT",
always_writes_white_and_colors=False, # Formerly rgbwprotocol
protocols=[MinVersionProtocol(0, PROTOCOL_LEDENET_ORIGINAL)],
mode_to_color_mode={},
color_modes={COLOR_MODE_CCT}, # Formerly rgbwcapable
channel_map={
STATE_WARM_WHITE: STATE_RED,
STATE_RED: STATE_WARM_WHITE,
STATE_COOL_WHITE: STATE_GREEN,
STATE_GREEN: STATE_COOL_WHITE,
},
microphone=False,
),
LEDENETModel(
model_num=0x04,
models=[
Expand Down
10 changes: 4 additions & 6 deletions flux_led/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class LEDENETOriginalRawState(NamedTuple):
blue: int
warm_white: int
check_sum: int
cool_white: int


# typical response:
Expand Down Expand Up @@ -477,11 +478,7 @@ def is_valid_power_state_response(self, msg: bytes) -> bool:

def is_valid_state_response(self, raw_state: bytes) -> bool:
"""Check if a state response is valid."""
return (
len(raw_state) == self.state_response_length
and raw_state[0] == 0x66
and raw_state[1] == 0x01
)
return len(raw_state) == self.state_response_length and raw_state[0] == 0x66

def construct_state_query(self) -> bytearray:
"""The bytes to send for a query request."""
Expand Down Expand Up @@ -521,7 +518,8 @@ def construct_message(self, raw_bytes: bytearray) -> bytearray:

def named_raw_state(self, raw_state: bytes) -> LEDENETOriginalRawState:
"""Convert raw_state to a namedtuple."""
return LEDENETOriginalRawState(*raw_state)
raw_bytearray = bytearray([*raw_state, 0])
return LEDENETOriginalRawState(*raw_bytearray)


class ProtocolLEDENET8Byte(ProtocolBase):
Expand Down
11 changes: 8 additions & 3 deletions flux_led/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ class FluxLEDDiscovery(TypedDict):
remote_access_port: Optional[int] # the remote access port


def create_udp_socket() -> socket.socket:
def create_udp_socket(discovery_port: int) -> socket.socket:
"""Create a udp socket used for communicating with the device."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(("", 0))
try:
# Legacy devices require source port to be the discovery port
sock.bind(("", discovery_port))
except OSError:
_LOGGER.debug("Port %s is not available: %s", discovery_port)
sock.bind(("", 0))
sock.setblocking(False)
return sock

Expand Down Expand Up @@ -167,7 +172,7 @@ def getBulbInfo(self) -> List[FluxLEDDiscovery]:
return self.found_bulbs

def _create_socket(self) -> socket.socket:
return create_udp_socket()
return create_udp_socket(self.DISCOVERY_PORT)

def _destination_from_address(self, address: Optional[str]) -> Tuple[str, int]:
if address is None:
Expand Down
125 changes: 121 additions & 4 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ def read_data(expected):

self.assertEqual(
light.__str__(),
"ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,]",
"ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,0,]",
)
self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL)
self.assertEqual(light.is_on, True)
Expand All @@ -1381,7 +1381,7 @@ def read_data(expected):

self.assertEqual(
light.__str__(),
"OFF [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,36,65,33,8,1,25,80,1,153,]",
"OFF [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,36,65,33,8,1,25,80,1,153,0,]",
)
self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL)
self.assertEqual(light.is_on, False)
Expand All @@ -1403,7 +1403,7 @@ def read_data(expected):

self.assertEqual(
light.__str__(),
"ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,]",
"ON [Color: (1, 25, 80) Brightness: 31% raw state: 102,1,35,65,33,8,1,25,80,1,153,0,]",
)
self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL)
self.assertEqual(light.is_on, True)
Expand All @@ -1412,7 +1412,124 @@ def read_data(expected):
self.assertEqual(light.cool_white, 0)
self.assertEqual(light.brightness, 80)
self.assertEqual(light.getRgb(), (1, 25, 80))
self.assertEqual(light.version_num, 1)
self.assertEqual(light.version_num, 0)

@patch("flux_led.WifiLedBulb._send_msg")
@patch("flux_led.WifiLedBulb._read_msg")
@patch("flux_led.WifiLedBulb.connect")
def test_original_ledenet_cct(self, mock_connect, mock_read, mock_send):
calls = 0

def read_data(expected):
nonlocal calls
calls += 1
if calls == 1:
self.assertEqual(expected, 2)
return bytearray(b"")
if calls == 2:
self.assertEqual(expected, 2)
return bytearray(b"f\x03")
if calls == 3:
self.assertEqual(expected, 9)
return bytearray(b"#A!\x08\xff\x80*\x01\x99")
if calls == 4:
self.assertEqual(expected, 11)
return bytearray(b"f\x03#A!\x08\x01\x19P\x01\x99")
if calls == 5: # ready turn off response
self.assertEqual(expected, 4)
return bytearray(b"\x0fq#\xa3")
if calls == 6:
self.assertEqual(expected, 11)
return bytearray(b"f\x03$A!\x08\x01\x19P\x01\x99")
if calls == 7: # ready turn on response
self.assertEqual(expected, 4)
return bytearray(b"\x0fq#\xa3")
if calls == 8:
self.assertEqual(expected, 11)
return bytearray(b"f\x03#A!\x08\x01\x19P\x01\x99")

mock_read.side_effect = read_data
light = flux_led.WifiLedBulb("192.168.1.164")
assert light.color_modes == {COLOR_MODE_CCT}
self.assertEqual(light.model_num, 0x03)
self.assertEqual(light.model, "Legacy Controller CCT (0x03)")
self.assertEqual(light.dimmable_effects, False)
self.assertEqual(light.requires_turn_on, True)
self.assertEqual(light.white_active, True)
self.assertEqual(light._protocol.power_push_updates, False)
self.assertEqual(light._protocol.state_push_updates, False)

self.assertEqual(mock_read.call_count, 3)
self.assertEqual(mock_send.call_count, 2)
self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w")))

light.setWhiteTemperature(2700, 255)
self.assertEqual(mock_read.call_count, 3)
self.assertEqual(mock_send.call_count, 3)
self.assertEqual(
mock_send.call_args, mock.call(bytearray((b"V\xff\x00\x00\xaa")))
)

light._transition_complete_time = 0
light.update_state()
self.assertEqual(mock_read.call_count, 4)
self.assertEqual(mock_send.call_count, 4)
self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w")))

self.assertEqual(
light.__str__(),
"ON [CCT: 6354K Brightness: 10% raw state: 102,3,35,65,33,8,1,0,80,1,153,25,]",
)
self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL)
self.assertEqual(light.is_on, True)
self.assertEqual(light.mode, "ww")
self.assertEqual(light.warm_white, 0)
self.assertEqual(light.brightness, 26)

light.turnOff()
self.assertEqual(mock_read.call_count, 5)
self.assertEqual(mock_send.call_count, 5)
self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xcc$3")))

light._transition_complete_time = 0
light.update_state()
self.assertEqual(mock_read.call_count, 6)
self.assertEqual(mock_send.call_count, 6)
self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w")))

self.assertEqual(
light.__str__(),
"OFF [CCT: 6354K Brightness: 10% raw state: 102,3,36,65,33,8,1,0,80,1,153,25,]",
)
self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL)
self.assertEqual(light.is_on, False)
self.assertEqual(light.mode, "ww")
self.assertEqual(light.cool_white, 0)
self.assertEqual(light.warm_white, 0)
self.assertEqual(light.brightness, 26)

light.turnOn()
self.assertEqual(mock_read.call_count, 7)
self.assertEqual(mock_send.call_count, 7)
self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xcc#3")))

light._transition_complete_time = 0
light.update_state()
self.assertEqual(mock_read.call_count, 8)
self.assertEqual(mock_send.call_count, 8)
self.assertEqual(mock_send.call_args, mock.call(bytearray(b"\xef\x01w")))

self.assertEqual(
light.__str__(),
"ON [CCT: 6354K Brightness: 10% raw state: 102,3,35,65,33,8,1,0,80,1,153,25,]",
)
self.assertEqual(light.protocol, PROTOCOL_LEDENET_ORIGINAL)
self.assertEqual(light.is_on, True)
self.assertEqual(light.mode, "ww")
self.assertEqual(light.warm_white, 0)
self.assertEqual(light.cool_white, 0)
self.assertEqual(light.brightness, 26)
self.assertEqual(light.version_num, 0)

@patch("flux_led.WifiLedBulb._send_msg")
@patch("flux_led.WifiLedBulb._read_msg")
Expand Down
10 changes: 10 additions & 0 deletions tests_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest

from flux_led import aiodevice, aioscanner
from flux_led.scanner import create_udp_socket
from flux_led.aio import AIOWifiLedBulb
from flux_led.aioprotocol import AIOLEDENETProtocol
from flux_led.aioscanner import AIOBulbScanner, LEDENETDiscovery
Expand Down Expand Up @@ -1778,6 +1779,15 @@ async def test_async_scanner_times_out_with_nothing_specific_address(
assert data == []


@pytest.mark.asyncio
async def test_async_scanner_falls_back_to_any_source_port_if_socket_in_use():
"""Test port fallback."""
hold_socket = create_udp_socket(AIOBulbScanner.DISCOVERY_PORT)
assert hold_socket.getsockname() == ("0.0.0.0", 48899)
random_socket = create_udp_socket(AIOBulbScanner.DISCOVERY_PORT)
assert random_socket.getsockname() != ("0.0.0.0", 48899)


@pytest.mark.asyncio
async def test_async_scanner_enable_remote_access(mock_discovery_aio_protocol):
"""Test scanner enabling remote access with a specific address."""
Expand Down

0 comments on commit 36bef5b

Please sign in to comment.