Skip to content

Commit

Permalink
Switch cli to use asyncio so all bulbs on the network can be operated…
Browse files Browse the repository at this point in the history
… in parallel (#303)
  • Loading branch information
bdraco authored Jan 4, 2022
1 parent 9f6b908 commit 2524409
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 137 deletions.
3 changes: 3 additions & 0 deletions flux_led/aiodevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ async def async_get_time(self) -> Optional[datetime]:
async def async_get_timers(self) -> Optional[List[LedTimer]]:
"""Get the timers."""
assert self._protocol is not None
if isinstance(self._protocol, ProtocolLEDENETOriginal):
led_timers: List[LedTimer] = []
return led_timers
await self._async_send_msg(self._protocol.construct_get_timers())
async with self._get_timers_lock:
self._get_timers_future = asyncio.Future()
Expand Down
302 changes: 167 additions & 135 deletions flux_led/fluxled.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,18 @@
"""

import asyncio
import datetime
import logging
from optparse import OptionGroup, OptionParser, Values
import sys
from typing import Any, List, Optional, Tuple

from .aio import AIOWifiLedBulb
from .aioscanner import AIOBulbScanner
from .const import ATTR_ID, ATTR_IPADDR
from .device import WifiLedBulb
from .pattern import PresetPattern
from .scanner import BulbScanner, FluxLEDDiscovery
from .scanner import FluxLEDDiscovery
from .timer import LedTimer
from .utils import utils

Expand Down Expand Up @@ -693,13 +695,159 @@ def parseArgs() -> Tuple[Values, Any]: # noqa: C901


# -------------------------------------------
def main() -> None: # noqa: C901


async def _async_run_commands( # noqa: C901
bulb: AIOWifiLedBulb, info: FluxLEDDiscovery, options: Any
) -> None:
"""Run requested commands on a bulb."""
buffer = ""

def buf_in(str: str) -> None:
nonlocal buffer
buffer += str + "\n"

if options.getclock:
buf_in(
"{} [{}] {}".format(info["id"], info["ipaddr"], await bulb.async_get_time())
)

if options.setclock:
await bulb.async_set_time()

if options.ww is not None:
if options.ww > 100:
buf_in("Input can not be higher than 100%")
else:
buf_in(f"Setting warm white mode, level: {options.ww}%")
await bulb.async_set_levels(w=options.ww, persist=not options.volatile)

if options.cw is not None:
if options.cw > 100:
buf_in("Input can not be higher than 100%")
else:
buf_in(f"Setting cold white mode, level: {options.cw}%")
await bulb.async_set_levels(w2=options.cw, persist=not options.volatile)

if options.cct is not None:
if options.cct[1] > 100:
buf_in("Brightness can not be higher than 100%")
elif options.cct[0] < 2700 or options.cct[0] > 6500:
buf_in("Color Temp must be between 2700 and 6500")
else:
buf_in(
"Setting LED temperature {}K and brightness: {}%".format(
options.cct[0], options.cct[1]
)
)
await bulb.async_set_white_temp(
options.cct[0], options.cct[1] * 2.55, persist=not options.volatile
)

if options.color is not None:
buf_in(
f"Setting color RGB:{options.color}",
)
name = utils.color_tuple_to_string(options.color)
if name is None:
buf_in()
else:
buf_in(f"[{name}]")
if any(i < 0 or i > 255 for i in options.color):
buf_in("Invalid value received must be between 0-255")
elif len(options.color) == 3:
await bulb.async_set_levels(
options.color[0],
options.color[1],
options.color[2],
persist=not options.volatile,
)
elif len(options.color) == 4:
await bulb.async_set_levels(
options.color[0],
options.color[1],
options.color[2],
options.color[3],
persist=not options.volatile,
)
elif len(options.color) == 5:
await bulb.async_set_levels(
options.color[0],
options.color[1],
options.color[2],
options.color[3],
options.color[4],
persist=not options.volatile,
)

elif options.custom is not None:
await bulb.async_set_custom_pattern(
options.custom[2], options.custom[1], options.custom[0]
)
buf_in(
"Setting custom pattern: {}, Speed={}%, {}".format(
options.custom[0], options.custom[1], options.custom[2]
)
)

elif options.preset is not None:
buf_in(
"Setting preset pattern: {}, Speed={}%".format(
PresetPattern.valtostr(options.preset[0]), options.preset[1]
)
)
await bulb.async_set_preset_pattern(options.preset[0], options.preset[1])

if options.on:
buf_in(f"Turning on bulb at {bulb.ipaddr}")
await bulb.async_turn_on()
elif options.off:
buf_in(f"Turning off bulb at {bulb.ipaddr}")
await bulb.async_turn_off()

if options.info:
buf_in("{} [{}] {} ({})".format(info["id"], info["ipaddr"], bulb, bulb.model))

if options.settimer:
empty_timers: List[LedTimer] = []
timers = await bulb.async_get_timers() or empty_timers
num = int(options.settimer[0])
buf_in(f"New Timer ---- #{num}: {options.new_timer}")
if options.new_timer.isExpired():
buf_in("[timer is already expired, will be deactivated]")
timers[num - 1] = options.new_timer
await bulb.async_set_timers(timers)

if options.showtimers:
show_timers = await bulb.async_get_timers()
num = 0
if show_timers:
for t in show_timers:
num += 1
buf_in(f" Timer #{num}: {t}")
buf_in("")

print(buffer.rstrip("\n"))


async def _async_process_bulb( # noqa: C901
info: FluxLEDDiscovery, options: Any
) -> None:
"""Process a bulb."""
bulb = AIOWifiLedBulb(info["ipaddr"], discovery=info)
await bulb.async_setup(lambda *args: None)
try:
await _async_run_commands(bulb, info, options)
finally:
await bulb.async_stop()


async def async_main() -> None: # noqa: C901
(options, args) = parseArgs()
scanner = BulbScanner()
scanner = AIOBulbScanner()

if options.scan:
scanner.scan(timeout=6)
await scanner.async_scan(timeout=6)
bulb_info_list = scanner.getBulbInfo()
# we have a list of buld info dicts
addrs = []
Expand All @@ -710,11 +858,11 @@ def main() -> None: # noqa: C901
print(f"{len(bulb_info_list)} bulbs found")
for b in bulb_info_list:
print(" {} {}".format(b["id"], b["ipaddr"]))
sys.exit(0)
return

elif options.info:
for addr in args:
scanner.scan(timeout=6, address=addr)
await scanner.async_scan(timeout=6, address=addr)
bulb_info_list = scanner.getBulbInfo()
found_addrs = {discovery[ATTR_IPADDR] for discovery in bulb_info_list}
for addr in args:
Expand All @@ -723,137 +871,21 @@ def main() -> None: # noqa: C901
bulb_info_list.append(FluxLEDDiscovery({ATTR_IPADDR: addr, ATTR_ID: "Unknown ID"})) # type: ignore

# now we have our bulb list, perform same operation on all of them
for info in bulb_info_list:
try:
bulb = WifiLedBulb(info["ipaddr"], discovery=info)
except Exception as e:
print("Unable to connect to bulb at [{}]: {}".format(info["ipaddr"], e))
continue

if options.getclock:
print("{} [{}] {}".format(info["id"], info["ipaddr"], bulb.getClock()))

if options.setclock:
bulb.setClock()

if options.protocol:
bulb.setProtocol(options.protocol)

if options.ww is not None:
if options.ww > 100:
print("Input can not be higher than 100%")
else:
print(f"Setting warm white mode, level: {options.ww}%")
bulb.setWarmWhite(options.ww, not options.volatile)

if options.cw is not None:
if options.cw > 100:
print("Input can not be higher than 100%")
else:
print(f"Setting cold white mode, level: {options.cw}%")
bulb.setColdWhite(options.cw, not options.volatile)

if options.cct is not None:
if options.cct[1] > 100:
print("Brightness can not be higher than 100%")
elif options.cct[0] < 2700 or options.cct[0] > 6500:
print("Color Temp must be between 2700 and 6500")
else:
print(
"Setting LED temperature {}K and brightness: {}%".format(
options.cct[0], options.cct[1]
)
)
bulb.setWhiteTemperature(
options.cct[0], options.cct[1] * 2.55, not options.volatile
)

if options.color is not None:
print(
f"Setting color RGB:{options.color}",
)
name = utils.color_tuple_to_string(options.color)
if name is None:
print()
else:
print(f"[{name}]")
if any(i < 0 or i > 255 for i in options.color):
print("Invalid value received must be between 0-255")
elif len(options.color) == 3:
bulb.setRgb(
options.color[0],
options.color[1],
options.color[2],
not options.volatile,
)
elif len(options.color) == 4:
bulb.setRgbw(
options.color[0],
options.color[1],
options.color[2],
options.color[3],
not options.volatile,
)
elif len(options.color) == 5:
bulb.setRgbw(
options.color[0],
options.color[1],
options.color[2],
options.color[3],
not options.volatile,
None,
options.color[4],
)

elif options.custom is not None:
bulb.setCustomPattern(
options.custom[2], options.custom[1], options.custom[0]
)
print(
"Setting custom pattern: {}, Speed={}%, {}".format(
options.custom[0], options.custom[1], options.custom[2]
)
)

elif options.preset is not None:
print(
"Setting preset pattern: {}, Speed={}%".format(
PresetPattern.valtostr(options.preset[0]), options.preset[1]
)
)
bulb.setPresetPattern(options.preset[0], options.preset[1])

if options.on:
print(f"Turning on bulb at {bulb.ipaddr}")
bulb.turnOn()
elif options.off:
print(f"Turning off bulb at {bulb.ipaddr}")
bulb.turnOff()

if options.info:
print(
"{} [{}] {} ({})".format(info["id"], info["ipaddr"], bulb, bulb.model)
)
tasks = [_async_process_bulb(info, options) for info in bulb_info_list]
results = await asyncio.gather(
*tasks,
return_exceptions=True,
)
for idx, info in enumerate(bulb_info_list):
if isinstance(results[idx], Exception):
print(f"Error while processing {info}: {results[idx]}")
return

if options.settimer:
timers = bulb.getTimers()
num = int(options.settimer[0])
print(f"New Timer ---- #{num}: {options.new_timer}")
if options.new_timer.isExpired():
print("[timer is already expired, will be deactivated]")
timers[num - 1] = options.new_timer
bulb.sendTimers(timers)

if options.showtimers:
timers = bulb.getTimers()
num = 0
for t in timers:
num += 1
print(f" Timer #{num}: {t}")
print("")

sys.exit(0)
def main() -> None:
asyncio.run(async_main())


if __name__ == "__main__":
main()
sys.exit(0)
4 changes: 2 additions & 2 deletions flux_led/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2116,8 +2116,8 @@ def construct_levels_change(
b0 b1 b2 b3 00 01 01 72 00 09 35 b1 64 64 00 00 00 03 b1 a5 - 100% cool
b0 b1 b2 b3 00 01 01 9f 00 09 35 b1 64 32 00 00 00 03 7f 6e - 100% cool - dim 50%
"""
assert warm_white is not None
assert cool_white is not None
assert warm_white is not None, "CCT devices must set a warm white value"
assert cool_white is not None, "CCT devices must set a cool white value"
scaled_temp, brightness = white_levels_to_scaled_color_temp(
warm_white, cool_white
)
Expand Down

0 comments on commit 2524409

Please sign in to comment.