Skip to content

Commit

Permalink
further improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
jabdoa2 committed Oct 31, 2021
1 parent 9493e98 commit c2df1ba
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions mpf/platforms/pkone/pkone.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,23 +198,24 @@ def _parse_coil_number(self, number: str) -> PKONECoilNumber:
try:
board_id_str, coil_num_str = number.split("-")
except ValueError:
raise AssertionError("Invalid coil number {}".format(number))
self.raise_config_error("Invalid coil number {}".format(number), 2)
raise

board_id = int(board_id_str)
coil_num = int(coil_num_str)

if board_id not in self.pkone_extensions:
raise AssertionError("PKONE Extension {} does not exist for coil {}".format(board_id, number))
self.raise_config_error("PKONE Extension {} does not exist for coil {}".format(board_id, number), 3)

if coil_num == 0:
raise AssertionError("PKONE coil numbering begins with 1. Coil: {}".format(number))
self.raise_config_error("PKONE coil numbering begins with 1. Coil: {}".format(number), 4)

coil_count = self.pkone_extensions[board_id].coil_count
if coil_count < coil_num or coil_num < 1:
raise AssertionError(
self.raise_config_error(
"PKONE Extension {board_id} only has {coil_count} coils "
"({first_coil} - {last_coil}). Coil: {number}".format(
board_id=board_id, coil_count=coil_count, first_coil=1, last_coil=coil_count, number=number))
board_id=board_id, coil_count=coil_count, first_coil=1, last_coil=coil_count, number=number), 5)

return PKONECoilNumber(board_id, coil_num)

Expand All @@ -233,22 +234,21 @@ def configure_driver(self, config: DriverConfig, number: str, platform_settings:
platform_settings = deepcopy(platform_settings)

if not self.controller_connection:
raise AssertionError('A request was made to configure a PKONE coil, but no '
'connection to a PKONE controller is available')
self.raise_config_error('A request was made to configure a PKONE coil, but no '
'connection to a PKONE controller is available', 6)

if not number:
raise AssertionError("Coil number is required")
self.raise_config_error("Coil number is required", 7)

coil_number = self._parse_coil_number(str(number))
return PKONECoil(config, self, coil_number, platform_settings)

@staticmethod
def _check_coil_switch_combination(coil: DriverSettings, switch: SwitchSettings):
def _check_coil_switch_combination(self, coil: DriverSettings, switch: SwitchSettings):
"""Check to see if the coil/switch combination is legal for hardware rules."""
# coil and switch must be on the same extension board (same board address id)
if switch.hw_switch.number.board_address_id != coil.hw_driver.number.board_address_id:
raise AssertionError("Coil {} and switch {} are on different boards. Cannot apply hardware rule!".format(
coil.hw_driver.number, switch.hw_switch.number))
self.raise_config_error("Coil {} and switch {} are on different boards. Cannot apply hardware rule!".format(
coil.hw_driver.number, switch.hw_switch.number), 8)

def clear_hw_rule(self, switch: SwitchSettings, coil: DriverSettings):
"""Clear a hardware rule.
Expand Down Expand Up @@ -341,21 +341,23 @@ def _parse_servo_number(self, number: str) -> PKONEServoNumber:
try:
board_id_str, servo_num_str = number.split("-")
except ValueError:
raise AssertionError("Invalid servo number {}".format(number))
self.raise_config_error("Invalid servo number {}".format(number), 9)
raise

board_id = int(board_id_str)
servo_num = int(servo_num_str)

if board_id not in self.pkone_extensions:
raise AssertionError("PKONE Extension {} does not exist for servo {}".format(board_id, number))
self.raise_config_error("PKONE Extension {} does not exist for servo {}".format(board_id, number), 10)

# Servos are numbered in sequence immediately after the highest coil number
driver_count = self.pkone_extensions[board_id].coil_count
servo_count = self.pkone_extensions[board_id].servo_count
if servo_num <= driver_count or servo_num > driver_count + servo_count:
raise AssertionError("PKONE Extension {} supports {} servos ({} - {}). "
"Servo: {} is not a valid number.".format(
board_id, servo_count, driver_count + 1, driver_count + servo_count, number))
self.raise_config_error("PKONE Extension {} supports {} servos ({} - {}). "
"Servo: {} is not a valid number.".format(
board_id, servo_count, driver_count + 1, driver_count + servo_count, number),
11)

return PKONEServoNumber(board_id, servo_num)

Expand All @@ -373,20 +375,21 @@ def _parse_switch_number(self, number: str) -> PKONESwitchNumber:
try:
board_id_str, switch_num_str = number.split("-")
except ValueError:
raise AssertionError("Invalid switch number {}".format(number))
self.raise_config_error("Invalid switch number {}".format(number), 12)
raise

board_id = int(board_id_str)
switch_num = int(switch_num_str)

if board_id not in self.pkone_extensions:
raise AssertionError("PKONE Extension {} does not exist for switch {}".format(board_id, number))
self.raise_config_error("PKONE Extension {} does not exist for switch {}".format(board_id, number), 13)

if switch_num == 0:
raise AssertionError("PKONE switch numbering begins with 1. Switch: {}".format(number))
self.raise_config_error("PKONE switch numbering begins with 1. Switch: {}".format(number), 14)

if self.pkone_extensions[board_id].switch_count < switch_num:
raise AssertionError("PKONE Extension {} only has {} switches. Switch: {}".format(
board_id, self.pkone_extensions[board_id].switch_count, number))
self.raise_config_error("PKONE Extension {} only has {} switches. Switch: {}".format(
board_id, self.pkone_extensions[board_id].switch_count, number), 15)

return PKONESwitchNumber(board_id, switch_num)

Expand All @@ -403,18 +406,19 @@ def configure_switch(self, number: str, config: SwitchConfig, platform_config: d
"""
del platform_config
if not number:
raise AssertionError("Switch requires a number")
self.raise_config_error("Switch requires a number", 16)

if not self.controller_connection:
raise AssertionError("A request was made to configure a PKONE switch, but no "
"connection to PKONE controller is available")
self.raise_config_error("A request was made to configure a PKONE switch, but no "
"connection to PKONE controller is available", 17)

try:
switch_number = self._parse_switch_number(number)
except ValueError:
raise AssertionError("Could not parse switch number {}/{}. Seems "
"to be not a valid switch number for the"
"PKONE platform.".format(config.name, number))
self.raise_config_error("Could not parse switch number {}/{}. Seems "
"to be not a valid switch number for the"
"PKONE platform.".format(config.name, number), 18)
raise

self.debug_log("PKONE Switch: %s (%s)", number, config.name)
return PKONESwitch(config, switch_number, self)
Expand Down Expand Up @@ -528,13 +532,14 @@ async def _send_multiple_light_update(self, sequential_brightness_list: List[Tup
"".join("%03d" % (b[1] * 255) for b in sequential_brightness_list))
self.controller_connection.send(cmd)

# pylint: disable-msg=inconsistent-return-statements
def configure_light(self, number, subtype, config, platform_settings):
"""Configure light in platform."""
del platform_settings

if not self.controller_connection:
raise AssertionError("A request was made to configure a PKONE light, but no "
"connection to PKONE controller is available")
self.raise_config_error("A request was made to configure a PKONE light, but no "
"connection to PKONE controller is available", 19)

if subtype == "simple":
# simple LEDs use the format <board_address_id> - <led> (simple LEDs only have 1 channel)
Expand All @@ -550,7 +555,7 @@ def configure_light(self, number, subtype, config, platform_settings):
self._light_system.mark_dirty(led_channel)
return led_channel

raise AssertionError("Unknown subtype {}".format(subtype))
self.raise_config_error("Unknown subtype {}".format(subtype), 20)

def _led_is_hardware_aligned(self, led_name) -> bool:
"""Determine whether the specified LED is hardware aligned."""
Expand All @@ -572,11 +577,18 @@ def _initialize_led_hw_driver_alignment(self):
for channel in lightshow.get_all_channel_hw_drivers():
channel.set_hardware_aligned(self._led_is_hardware_aligned(channel.config.name))

def _assert_is_light_board(self, board_address_id):
"""Make sure that this id is connected to a light board."""
if int(board_address_id) not in self.pkone_lightshows:
self.raise_config_error("Board {} is not a lightboard.".format(board_address_id), 1)

# pylint: disable-msg=inconsistent-return-statements
def parse_light_number_to_channels(self, number: str, subtype: str):
"""Parse light channels from number string."""
if subtype == "simple":
# simple LEDs use the format <board_address_id> - <led> (simple LEDs only have 1 channel)
board_address_id, index = number.split('-')
self._assert_is_light_board(board_address_id)
return [
{
"number": "{}-{}".format(board_address_id, index)
Expand All @@ -587,6 +599,7 @@ def parse_light_number_to_channels(self, number: str, subtype: str):
# Normal LED number format: <board_address_id> - <group> - <led>
board_address_id, group, number_str = str(number).split('-')
index = int(number_str)
self._assert_is_light_board(board_address_id)

# Determine if there are 3 or 4 channels depending upon firmware on board
if self.pkone_lightshows[int(board_address_id)].rgbw_firmware:
Expand Down Expand Up @@ -619,4 +632,4 @@ def parse_light_number_to_channels(self, number: str, subtype: str):
},
]

raise AssertionError("Unknown light subtype {}".format(subtype))
self.raise_config_error("Unknown light subtype {}".format(subtype), 21)

0 comments on commit c2df1ba

Please sign in to comment.