diff --git a/mpf/config_spec.yaml b/mpf/config_spec.yaml index c25a69e69..912a80f3b 100644 --- a/mpf/config_spec.yaml +++ b/mpf/config_spec.yaml @@ -1105,6 +1105,19 @@ pkone: debug: single|bool|false console_log: single|enum(none,basic,full)|none file_log: single|enum(none,basic,full)|basic +pinotaur: + __valid_in__: machine + __type__: config + debug: single|bool|false + console_log: single|enum(none,basic,full)|none + file_log: single|enum(none,basic,full)|basic + poll_hz: single|int|100 + port: single|str| + baud: single|int|115200 + motor_port1: single|enum(servo,stepper,stepper_continious,dc)|servo + motor_port2: single|enum(servo,stepper,stepper_continious,dc)|servo + last_rgb_num_bank1: single|int|64 + last_rgb_num_bank2: single|int|128 pd_led_boards: use_ws281x_0: single|bool|false use_ws281x_1: single|bool|false diff --git a/mpf/mpfconfig.yaml b/mpf/mpfconfig.yaml index 3b36a5630..2dfe72216 100644 --- a/mpf/mpfconfig.yaml +++ b/mpf/mpfconfig.yaml @@ -131,6 +131,7 @@ mpf: light_segment_displays: mpf.platforms.light_segment_displays.LightSegmentDisplaysPlatform pin2dmd: mpf.platforms.pin2dmd.Pin2DmdHardwarePlatform visual_pinball_engine: mpf.platforms.visual_pinball_engine.visual_pinball_engine.VisualPinballEnginePlatform + pinotaur: mpf.platforms.pinotaur.pinotaur.PinotaurHardwarePlatform paths: scriptlets: scriptlets diff --git a/mpf/platforms/pinotaur/__init__.py b/mpf/platforms/pinotaur/__init__.py new file mode 100644 index 000000000..c247ef7e1 --- /dev/null +++ b/mpf/platforms/pinotaur/__init__.py @@ -0,0 +1 @@ +"""Pinotaur System.""" diff --git a/mpf/platforms/pinotaur/defines.py b/mpf/platforms/pinotaur/defines.py new file mode 100644 index 000000000..b144b1450 --- /dev/null +++ b/mpf/platforms/pinotaur/defines.py @@ -0,0 +1,117 @@ +"""Interface defines for Pinotaur.""" + + +class PinotaurDefines: + + """Pinotaur messages.""" + + GetConnectedHardware = 0x00 + GetFirmwareVersion = 0x01 + GetAPIVersion = 0x02 + GetSimpleLampCount = 0x03 + GetSolenoidCount = 0x04 + GetModernLightCount = 0x09 + + GiStatus = 0x0A + GiChannelOn = 0x0B + GiChannelOff = 0x0C + GiAll = 0x0D + ReportCoilCurrent = 0x0F + + SetCoilIdleCurrentMax = 0x10 + SetHoldTime = 0x11 + SetSolenoidPulsePWM = 0x12 + SetSolenoidHoldPWM = 0x13 + GetStatusSolenoid = 0x14 + DisableSolenoid = 0x16 + PulseSolenoid = 0x17 + SetSolenoidPulseTime = 0x18 + SetSolenoidRecycle = 0x19 + PulseSolenoidPWM = 0x1A + SolenoidFuturePulse = 0x1B + SetCurrent = 0x1C + + HardwareRuleSolenoid = 0x1E + + ProfileCoils = 0x1D + SetupFlipperButton = 0x1E + SetSolenoidNext = 0x1F + + SetRGBLight = 0x30 + SetRGBBlink = 0x31 + SetRGBPulse = 0x32 + SetRGBStrobe = 0x33 + + SetBlinkSpeed = 0x34 + SetStrobeSpeed = 0x35 + SetChaseChild = 0x36 + SetRGBFade = 0x37 + + SetBankLimits = 0x3A + UpdateFrequency = 0x3B + SetActualRGBCount = 0x3C + LightFrameDone = 0x3D + LoadLightShowFrame = 0x3E + LightShowControl = 0x3F + + SetSwitchRamp = 0x50 + SetSwitchDebounce = 0x51 + SetSwitchClear = 0x52 + SetReportType = 0x57 + GetSwitchStatus = 0x58 + GetChangedSwitches = 0x59 + GetAllSwitches = 0x5A + SetAutoAction = 0x5B + ClearAutoAction = 0x5C + SetUpEOSSwitch = 0x5E + FlushChanged = 0x5F + + RelayControl = 0x60 + FlipperEnable = 0x61 + FlipperBeastHold = 0x62 + InitReset = 0x64 + WatchDogFlag = 0x65 + ClearCoil911 = 0x66 + SetErrorTolerance48V = 0x67 + + StartLight = 0x6A + StartLightBlinkSpeed = 0x6B + LaunchLight = 0x6C + LaunchLightBlinkSpeed = 0x6D + + CheckBootFaultFlag = 0x6F + + ProfileAMotor = 0x40 + GetLastMoveTime = 0x41 + SetServoCurrentLimits = 0x42 + ReadServoCurrent = 0x43 + SetInrushGracePeriod = 0x44 + ActionOnServoStall = 0x45 + ReadLastSafePosition = 0x46 + ReadMotorFaultFlags = 0x47 + + MotorBankType = 0x70 + ReadMotorState = 0x71 + MotorGotoSwitch = 0x72 + StopMotor = 0x73 + MoveDCMotor = 0x74 + SetDCMotor = 0x75 + ReadDCMotor = 0x76 + + MoveStepperMotor = 0x77 + SetStepperMotorSpeed = 0x78 + ReadStepperMotor = 0x79 + + SetServoMotor = 0x7A + Move180ServoMotor = 0x7B + MoveContServoMotor = 0x7C + ReadServoMotor = 0x7D + DisableServoMotor = 0x7E + ConfigServoMotors = 0x7F + + GetBanksBCD = 0x20 + SetBanksBCD = 0x21 + EraseNVMRow = 0x25 + ReadNVMRow = 0x28 + SetNVMAddress = 0x2A + WriteNVMHalfPage = 0x2B diff --git a/mpf/platforms/pinotaur/pinotaur.py b/mpf/platforms/pinotaur/pinotaur.py new file mode 100644 index 000000000..54d79f4c4 --- /dev/null +++ b/mpf/platforms/pinotaur/pinotaur.py @@ -0,0 +1,642 @@ +"""Pinotaur platform.""" +import asyncio +from typing import Dict, Optional, List + +from mpf.core.platform_batch_light_system import PlatformBatchLight, PlatformBatchLightSystem +from mpf.platforms.interfaces.driver_platform_interface import DriverPlatformInterface, PulseSettings, HoldSettings +from mpf.platforms.interfaces.servo_platform_interface import ServoPlatformInterface +from mpf.platforms.interfaces.stepper_platform_interface import StepperPlatformInterface +from mpf.platforms.interfaces.switch_platform_interface import SwitchPlatformInterface + +from mpf.core.logging import LogMixin +from mpf.core.utility_functions import Util + +from mpf.platforms.pinotaur.defines import PinotaurDefines + +from mpf.platforms.interfaces.light_platform_interface import LightPlatformSoftwareFade, LightPlatformInterface + +from mpf.core.platform import SwitchPlatform, LightsPlatform, DriverPlatform, SwitchSettings, DriverSettings, \ + DriverConfig, SwitchConfig, RepulseSettings, ServoPlatform, StepperPlatform + + +class PinotaurSwitch(SwitchPlatformInterface): + + """A switch in the Pinotaur platform.""" + + __slots__ = ["index"] # type: List[str] + + def __init__(self, config, number): + """Initialise switch.""" + super().__init__(config, number) + self.index = int(number) + + def get_board_name(self): + """Return board name.""" + return "Pinotaur" + + +class PinotaurDriver(DriverPlatformInterface): + + """A driver in the Pinotaur platform.""" + + __slots__ = ["platform", "_pulse_ms", "_recycle_time", "index", "has_rule", "_pulse_power", + "_hold_settings"] # type: List[str] + + def __init__(self, config, number, platform): + """Initialise driver.""" + super().__init__(config, number) + self.platform = platform # type: PinotaurHardwarePlatform + self._pulse_ms = -1 + self._pulse_power = -1 + self._recycle_time = None + self.index = int(number) + self.has_rule = False + self._hold_settings = None + + def configure_recycle(self, recycle_time): + """Configure recycle time.""" + if recycle_time > 255: + recycle_time = 255 + elif recycle_time < 0: + recycle_time = 0 + + if self._recycle_time != recycle_time: + self._recycle_time = recycle_time + self.platform.send_command_background(PinotaurDefines.SetSolenoidRecycle, + bytes([self.index, recycle_time])) + + def _configure_pulse_ms(self, pulse_ms): + """Configure pulse ms for this driver if it changed.""" + # pulse_ms 0 is ok here (but not in the pulse command) + assert 0 <= pulse_ms <= 255 + if pulse_ms != self._pulse_ms: + self._pulse_ms = pulse_ms + self.platform.send_command_background(PinotaurDefines.SetSolenoidPulseTime, bytes( + [self.index, + pulse_ms + ])) + + def _configure_pulse_power(self, pulse_power): + """Configure pulse power for this driver if it changed.""" + if pulse_power != self._pulse_power: + self._pulse_power = pulse_power + on, off = Util.power_to_on_off(pulse_power, 20) + self.platform.send_command_background(PinotaurDefines.SetSolenoidPulsePWM, bytes( + [self.index, + int(on), + int(on + off) + ])) + + def _configure_hold_power(self, hold_settings: Optional[HoldSettings]): + """Configure pulse power for this driver if it changed.""" + if hold_settings == self._hold_settings: + return + if hold_settings: + self.platform.send_command_background(PinotaurDefines.SetHoldTime, bytes([self.index, 255, 255])) + self._hold_settings = hold_settings + on, off = Util.power_to_on_off(hold_settings.power, 20) + self.platform.send_command_background(PinotaurDefines.SetSolenoidHoldPWM, bytes( + [self.index, + int(on), + int(on + off), + 0, # not implemented in firmware + 0, # not implemented in firmware + 0, # not implemented in firmware + 0, # not implemented in firmware + ])) + else: + self.platform.send_command_background(PinotaurDefines.SetHoldTime, bytes([self.index, 0, 0])) + + def pulse(self, pulse_settings: PulseSettings): + """Pulse driver.""" + if self.has_rule: + # TODO: check if this is actually possible with rule -> do we need a timer? + raise AssertionError("It is currently not possible to pulse this coil while a rule is active.") + + # pulse_ms 0 has special meaning in Pinotaur firmware + assert 0 < pulse_settings.duration <= 255 + self._pulse_ms = pulse_settings.duration + self._configure_pulse_power(pulse_settings.power) + self._configure_hold_power(None) + self.platform.send_command_background(PinotaurDefines.PulseSolenoid, + bytes([self.index, pulse_settings.duration])) + # TODO: restore pulse_ms/pulse_power/hold_power in case we have a rule + + def enable(self, pulse_settings: PulseSettings, hold_settings: HoldSettings): + """Enable driver.""" + if self.has_rule: + # TODO: check if this is actually possible with rule -> do we need a timer? + raise AssertionError("It is currently not possible to enable this coil while a rule is active.") + self._configure_pulse_power(pulse_settings.power) + assert 0 <= pulse_settings.duration <= 255 + # pulse_ms 0 has special meaning in Pinotaur firmware + pulse_ms = pulse_settings.duration if pulse_settings.duration > 0 else 1 + self._pulse_ms = pulse_settings.duration + self._configure_hold_power(hold_settings) + # TODO: remove child coil here in case this is another one + self.platform.send_command_background(PinotaurDefines.PulseSolenoid, + bytes([self.index, pulse_ms])) + # TODO: restore pulse_ms/pulse_power/hold_power in case we have a rule + + def disable(self): + """Disable driver.""" + self.platform.send_command_background(PinotaurDefines.DisableSolenoid, bytes([self.index])) + + def get_board_name(self): + """Return board name.""" + return "Pinotaur" + + +# TODO: Pinotaur start/launch lights + +class PinotaurSimpleLamp(LightPlatformSoftwareFade): + + """A simple light in the Pinotaur platform which only supports on/off.""" + + __slots__ = ["platform", "_state"] + + def __init__(self, number, platform): + """Initialise Pinotaur Light.""" + super().__init__(number, platform.machine.clock.loop, 50) + self.platform = platform + self._state = None + + def set_brightness(self, brightness: float): + """Turn lamp on or off.""" + if brightness > 0 and self._state is not True: + self.platform.send_byte(PinotaurDefines.GiChannelOn, bytes([self.number])) + self._state = True + elif brightness <= 0 and self._state is not False: + self.platform.send_byte(PinotaurDefines.GiChannelOff, bytes([self.number])) + self._state = False + + def get_board_name(self): + """Return board name.""" + return "Pinotaur" + + def is_successor_of(self, other): + """Return true if the other light has the previous number.""" + return self.number == other.number + 1 + + def get_successor_number(self): + """Return next number.""" + return self.number + 1 + + def __lt__(self, other): + """Order lights by their order on the hardware.""" + return self.number < other.number + + +class PinotaurModernLight(PlatformBatchLight): + + """A modern light in Pinotaur.""" + + __slots__ = ["platform"] + + def __init__(self, number, platform, light_system): + """Initialise Pinotaur Light.""" + super().__init__(number, light_system) + self.platform = platform + + def get_max_fade_ms(self) -> int: + """Return max fade time.""" + return 65535 + + def get_board_name(self): + """Return board name.""" + return "Pinotaur" + + def is_successor_of(self, other): + """Return true if the other light has the previous number.""" + return self.number == other.number + 1 + + def get_successor_number(self): + """Return next number.""" + return self.number + 1 + + def __lt__(self, other): + """Order lights by their order on the hardware.""" + return self.number < other.number + + +# pylint: disable-msg=too-many-instance-attributes +class PinotaurHardwarePlatform(SwitchPlatform, LightsPlatform, DriverPlatform, ServoPlatform, StepperPlatform, + LogMixin): + + """Pinotaur platform.""" + + # TODO: add new MotorPlatform + + __slots__ = ["config", "_writer", "_reader", "_poll_task", "_watchdog_task", "_number_of_lamps", + "_number_of_solenoids", "_inputs", + "_bus_lock", "_number_of_modern_lights", + "_light_system", "_firmware_version", "_hardware_name"] # type: List[str] + + def __init__(self, machine) -> None: + """Initialise platform.""" + super().__init__(machine) + self._writer = None # type: Optional[asyncio.StreamWriter] + self._reader = None # type: Optional[asyncio.StreamReader] + self._poll_task = None + self._watchdog_task = None + self._bus_lock = asyncio.Lock() + self._number_of_lamps = None # type: Optional[int] + self._number_of_solenoids = None # type: Optional[int] + self._number_of_modern_lights = None # type: Optional[int] + self._inputs = dict() # type: Dict[str, bool] + self.features['max_pulse'] = 255 + self._firmware_version = None + self._hardware_name = None + + self.config = self.machine.config_validator.validate_config("pinotaur", + self.machine.config.get('pinotaur', {})) + self._configure_device_logging_and_debug("Pinotaur", self.config) + self._light_system = None + + def _clear_read_buffer(self): + """Clear read buffer.""" + # pylint: disable-msg=protected-access + if self.debug and self._reader._buffer: + # pylint: disable-msg=protected-access + self.debug_log("Flushed: %s%s", self._reader._buffer, "".join(" 0x%02x" % b for b in self._reader._buffer)) + if hasattr(self._writer.transport, "_serial"): + # pylint: disable-msg=protected-access + self._writer.transport._serial.reset_input_buffer() + # pylint: disable-msg=protected-access + self._reader._buffer = bytearray() + # pylint: disable-msg=protected-access + self._reader._maybe_resume_transport() + + # pylint: disable-msg=too-many-statements + # pylint: disable-msg=too-many-branches + async def initialize(self): + """Initialise platform.""" + await super().initialize() + + self.log.info("Connecting to %s", self.config['port']) + connector = self.machine.clock.open_serial_connection( + url=self.config['port'], baudrate=self.config['baud']) + + self._reader, self._writer = await connector + + # give the serial a few ms to read the first bytes + await asyncio.sleep(.1) + + while True: + # reset platform + self.debug_log("Sending reset.") + self._clear_read_buffer() + try: + # try init + response = await asyncio.wait_for( + self.send_command_and_read_response(PinotaurDefines.InitReset, None, 1), timeout=.5) + except asyncio.TimeoutError: + self.warning_log("Reset of Pinotaur failed. Did get a timeout. Will retry.") + continue + if response[0] != 0: + # reset failed + self.warning_log("Reset of Pinotaur failed. Got %s instead of 0. Will retry.", response[0]) + continue + + # get type + hardware_name = await self.send_command_and_read_response(PinotaurDefines.GetConnectedHardware, + None, None) + + firmware_version = await self.send_command_and_read_response(PinotaurDefines.GetFirmwareVersion, + None, 1) + + self.debug_log("Connected to %s hardware. Firmware version: %s.", hardware_name, firmware_version) + + if not firmware_version: + self.error_log("Failed to read pinotaur_version from Pinotaur. Got %s", firmware_version) + continue + + self._firmware_version = firmware_version.decode() + + # if we made it here reset succeeded + break + + self._hardware_name = hardware_name.decode() + + self.machine.variables.set_machine_var("pinotaur_hardware", self._hardware_name) + '''machine_var: pinotaur_hardware + + desc: Connected Pinotaur hardware. + ''' + + self.machine.variables.set_machine_var("pinotaur_version", self._firmware_version) + '''machine_var: pinotaur_version + + desc: Pinotaur version. + ''' + + # get number of lamps + self._number_of_lamps = (await self.send_command_and_read_response(PinotaurDefines.GetSimpleLampCount, + None, 1))[0] + + # get number of solenoids + self._number_of_solenoids = (await self.send_command_and_read_response(PinotaurDefines.GetSolenoidCount, + None, 1))[0] + + # get number of modern lights + self._number_of_modern_lights = (await self.send_command_and_read_response( + PinotaurDefines.GetModernLightCount, None, 1))[0] + + self._light_system = PlatformBatchLightSystem(self.machine.clock, + self._send_multiple_light_update, + self.machine.config['mpf'][ + 'default_light_hw_update_hz'], + 10) # TODO: figure out max batch size + + self.debug_log("Number of lamps: %s. Number of coils: %s. Number of modern lights: %s", + self._number_of_lamps, self._number_of_solenoids, self._number_of_modern_lights) + + # initially read all switches + self.debug_log("Reading all switches.") + # clear all changes since we will read all switches now + await self.send_command(PinotaurDefines.FlushChanged) + for number in range(128): + state = await self.send_command_and_read_response(PinotaurDefines.GetSwitchStatus, bytes([number]), 2) + if state[1] == 2: + continue + if state[1] > 2: + raise AssertionError("Invalid switch {}. Got response: {}".format(number, state)) + + self._inputs[str(number)] = state[1] == 1 + + self.debug_log("Init of Pinotaur done.") + + async def _send_multiple_light_update(self, sequential_brightness_list): + # TODO: figure out correct command - current fade speed is tricky + common_fade_ms = sequential_brightness_list[0][2] + if common_fade_ms < 0: + common_fade_ms = 0 + fade_time = int(common_fade_ms) + + data = bytearray([int(sequential_brightness_list[0][0].number / 256), + sequential_brightness_list[0][0].number % 256, + int(fade_time / 255), int(fade_time & 0xFF), + len(sequential_brightness_list)]) + for _, brightness, _ in sequential_brightness_list: + data.append(int(255 * brightness)) + + # TODO: use correct command + await self.send_command(PinotaurDefines.SetRGBLight) + + async def start(self): + """Start reading switch changes.""" + self._watchdog_task = self.machine.clock.loop.create_task(self._watchdog()) + self._watchdog_task.add_done_callback(Util.raise_exceptions) + self._poll_task = self.machine.clock.loop.create_task(self._poll()) + self._poll_task.add_done_callback(Util.raise_exceptions) + self._light_system.start() + + # turn on relay + await self.send_command(PinotaurDefines.RelayControl, bytes([1])) + + def stop(self): + """Stop platform.""" + super().stop() + if self._poll_task: + self._poll_task.cancel() + self._poll_task = None + + if self._watchdog_task: + self._watchdog_task.cancel() + self._watchdog_task = None + + if self._reader: + self._writer.close() + self._reader = None + self._writer = None + + async def _poll(self): + sleep_time = 1.0 / self.config['poll_hz'] + while True: + try: + status = await self.send_command_and_read_response(PinotaurDefines.GetChangedSwitches, None, 1) + except TimeoutError: + self.warning_log("Polling switches timed out.") + await asyncio.sleep(sleep_time) + continue + if status[0] == 0x7f: + # no changes. sleep according to poll_hz + await asyncio.sleep(sleep_time) + else: + # bit 7 is state + switch_state = 1 if status[0] & 0b10000000 else 0 + # bits 0-6 are the switch number + switch_num = status[0] & 0b01111111 + + # tell the switch controller about the new state + self.machine.switch_controller.process_switch_by_num(str(switch_num), switch_state, self) + + # store in dict as well + self._inputs[str(switch_num)] = bool(switch_state) + + async def _watchdog(self): + """Periodically send watchdog.""" + # TODO: handle any over-currents or faults here + while True: + # send watchdog + try: + response = await self.send_command_and_read_response(PinotaurDefines.WatchDogFlag, None, 1) + except TimeoutError: + self.warning_log("Watchdog response timed out.") + await asyncio.sleep(.1) + continue + if response[0] != 0: + self.warning_log("Watchdog returned %s instead 0", response[0]) + # sleep 500ms + await asyncio.sleep(.5) + + def set_pulse_on_hit_and_enable_and_release_rule(self, enable_switch: SwitchSettings, coil: DriverSettings): + """Set pulse on hit and enable and release rule on driver.""" + # TODO: implement + + def set_pulse_on_hit_and_release_and_disable_rule(self, enable_switch: SwitchSettings, + eos_switch: SwitchSettings, coil: DriverSettings, + repulse_settings: Optional[RepulseSettings]): + """Set pulse on hit and enable and release and disable rule on driver. + + Pulses a driver when a switch is hit. When the switch is released + the pulse is canceled and the driver gets disabled. When the eos_switch is hit the pulse is canceled + and the driver becomes disabled. Typically used on the main coil for dual-wound coil flippers with eos switch. + """ + # TODO: implement + + def set_pulse_on_hit_and_enable_and_release_and_disable_rule(self, enable_switch: SwitchSettings, + eos_switch: SwitchSettings, coil: DriverSettings, + repulse_settings: Optional[RepulseSettings]): + """Set pulse on hit and enable and release and disable rule on driver. + + Pulses a driver when a switch is hit. Then enables the driver (may be with pwm). When the switch is released + the pulse is canceled and the driver becomes disabled. When the eos_switch is hit the pulse is canceled + and the driver becomes enabled (likely with PWM). + Typically used on the coil for single-wound coil flippers with eos switch. + """ + # TODO: implement + + def set_pulse_on_hit_and_release_rule(self, enable_switch: SwitchSettings, coil: DriverSettings): + """Set pulse on hit and release rule to driver.""" + # TODO: implement + + def set_pulse_on_hit_rule(self, enable_switch: SwitchSettings, coil: DriverSettings): + """Set pulse on hit rule on driver.""" + # TODO: implement + + def clear_hw_rule(self, switch: SwitchSettings, coil: DriverSettings): + """Clear hw rule for driver.""" + # TODO: implement + + def configure_light(self, number: str, subtype: str, config, platform_settings: dict) -> LightPlatformInterface: + """Configure light on Pinotaur.""" + del platform_settings, config + assert self._number_of_lamps is not None + assert self._number_of_modern_lights is not None + + if subtype is None or subtype == "gi": + if 0 < int(number) >= self._number_of_lamps: + raise AssertionError("Pinotaur only has {} lamps. Cannot configure lamp {}.". + format(self._number_of_lamps, number)) + + return PinotaurSimpleLamp(int(number), self) + if subtype == "light": + if 0 < int(number) >= self._number_of_modern_lights: + raise AssertionError("Pinotaur only has {} modern lights. Cannot configure light {}.". + format(self._number_of_modern_lights, number)) + return PinotaurModernLight(int(number), self, self._light_system) + + raise self.raise_config_error("Invalid subtype {}".format(subtype), 1) + + def parse_light_number_to_channels(self, number: str, subtype: str): + """Return a single light.""" + # TODO handle subtypes + return [ + { + "number": number, + } + ] + + def configure_switch(self, number: str, config: SwitchConfig, platform_config: dict) -> SwitchPlatformInterface: + """Configure a switch.""" + if number not in self._inputs: + raise AssertionError("Invalid switch number {}. Platform reports the following switches as " + "valid: {}".format(number, list(self._inputs.keys()))) + + return PinotaurSwitch(config=config, number=number) + + async def get_hw_switch_states(self): + """Return current switch states.""" + return self._inputs + + def configure_driver(self, config: DriverConfig, number: str, platform_settings: dict) -> DriverPlatformInterface: + """Configure a driver.""" + assert self._number_of_solenoids is not None + assert self._number_of_lamps is not None + + if 0 < int(number) > self._number_of_solenoids: + raise AssertionError("Pinotaur only has {} drivers. Cannot configure driver {}.". + format(self._number_of_solenoids, number)) + + driver = PinotaurDriver(config=config, number=number, platform=self) + recycle_time = config.default_pulse_ms * 2 if config.default_recycle else config.default_pulse_ms + if recycle_time > 255: + recycle_time = 255 + driver.configure_recycle(recycle_time) + return driver + + def send_command_background(self, cmd: int, payload: Optional[bytes] = None): + """Send command in the background.""" + future = asyncio.ensure_future(self.send_command_and_read_response(cmd, payload, 0)) + future.add_done_callback(Util.raise_exceptions) + + async def send_command(self, cmd: int, payload: Optional[bytes] = None): + """Send command to bus.""" + assert self._reader is not None + async with self._bus_lock: + self._send_command(cmd, payload) + + async def send_command_and_read_response(self, cmd: int, payload: Optional[bytes], + response_size: Optional[int]) -> bytes: + """Send command and wait for response.""" + assert self._reader is not None + async with self._bus_lock: + self._send_command(cmd, payload) + if response_size is not None: + return await self._read_response(cmd, response_size) + + return await self._read_string(cmd) + + def _send_command(self, cmd: int, payload: Optional[bytes] = None): + """Send command to bus without bus lock.""" + msg = bytearray() + msg.append(ord('<')) + msg.append(cmd) + msg.append((len(payload) << 1) | 0x81 if payload else 0x81) + if payload: + msg.extend(payload) + self._clear_read_buffer() + self._writer.write(bytes(msg)) + + async def _read_response(self, cmd: int, response_size: int) -> bytes: + """Read response from bus without bus lock.""" + response = await asyncio.wait_for(self._reader.readexactly(2 + response_size), timeout=0.1) + if self._debug: + self.debug_log("Received Response %s (%s)", bytes(response), "".join(" 0x%02x" % b for b in response)) + if response[0] != ord('>') or response[1] != cmd: + # TODO: handle this + raise AssertionError("Incorrect response: {} ({})".format( + response, "".join(" 0x%02x" % b for b in response))) + return response[2:] + + # pylint: disable-msg=inconsistent-return-statements + async def _read_until(self, separator, min_chars: int = 0): + """Read until separator. + + Args: + ---- + separator: Read until this separator byte. + min_chars: Minimum message length before separator + """ + assert self._reader is not None + + # asyncio StreamReader only supports this from python 3.5.2 on + buffer = b'' + while True: + char = await self._reader.readexactly(1) + buffer += char + if char == separator and len(buffer) > min_chars: + return buffer + + async def _read_string(self, cmd) -> bytes: + """Read zero terminated string.""" + response = await asyncio.wait_for(self._reader.readexactly(2), timeout=0.1) + if response[0] != ord('>') or response[1] != cmd: + # TODO: handle this + raise AssertionError("Incorrect response: {}".format(response)) + data = await asyncio.wait_for(self._read_until(b'\x00'), timeout=0.1) + # remove terminator + data = data[:-1] + self.debug_log("Received String %s", data) + return data + + def get_info_string(self): + """Dump info about Pinotaur platform.""" + info = "" + info += "Pinotaur connected via serial on {}\n".format(self.config['port']) + info += "Hardware: {} Firmware Version: {}\n".format( + self._hardware_name, self._firmware_version) + info += "Input map: {}\n".format(sorted(list(self._inputs.keys()), key=int)) + info += "Coil count: {}\n".format(self._number_of_solenoids) + info += "Modern lights count: {}\n".format(self._number_of_modern_lights) + info += "Traditional lights count: {}\n".format(self._number_of_lamps) + return info + + async def configure_servo(self, number: str) -> ServoPlatformInterface: + """Configure a servo.""" + # TODO: implement + + async def configure_stepper(self, number: str, config: dict) -> StepperPlatformInterface: + """Configure a stepper.""" + # TODO: implement diff --git a/mpf/tests/machine_files/pinotaur/config/config.yaml b/mpf/tests/machine_files/pinotaur/config/config.yaml new file mode 100644 index 000000000..1a3175084 --- /dev/null +++ b/mpf/tests/machine_files/pinotaur/config/config.yaml @@ -0,0 +1,66 @@ +#config_version=5 + +hardware: + platform: pinotaur + +pinotaur: + port: com1 + baud: 115200 + debug: true + +switches: + s_test00: + number: 0 + s_flipper: + number: 1 + s_flipper_eos: + number: 2 + s_slingshot: + number: 3 + s_test4: + number: 4 + s_test60_nc: + number: 60 + type: 'NC' + +coils: + c_test: + number: 0 + c_test_allow_enable: + number: 1 + default_hold_power: 1.0 + c_test_long_pulse: + number: 2 + default_pulse_ms: 2000 + c_flipper_main: + number: 5 + default_pulse_ms: 30 + c_flipper_hold: + number: 6 + allow_enable: True + c_slingshot: + number: 7 + +lights: + test_light0: + start_channel: 0 + type: rgb + subtype: light + test_light1: + previous: test_light0 + type: rgbw + subtype: light + +flippers: + f_test_hold_eos: + debug: true + main_coil: c_flipper_main + hold_coil: c_flipper_hold + activation_switch: s_flipper + eos_switch: s_flipper_eos + use_eos: true + +autofire_coils: + ac_slingshot: + coil: c_slingshot + switch: s_slingshot diff --git a/mpf/tests/test_Pinotaur.py b/mpf/tests/test_Pinotaur.py new file mode 100644 index 000000000..f03e9c00d --- /dev/null +++ b/mpf/tests/test_Pinotaur.py @@ -0,0 +1,344 @@ +import time + +from mpf.tests.MpfTestCase import MpfTestCase +from mpf.tests.loop import MockSerial, MockSocket + + +class MockPinotaurSocket(MockSocket, MockSerial): + + def read(self, length): + del length + if not self.queue: + return b"" + msg = self.queue.pop() + return msg + + def read_ready(self): + return bool(self.queue) + + def write_ready(self): + return True + + def write(self, msg): + """Write message.""" + # print("Serial received: " + "".join("\\x%02x" % b for b in msg) + " len: " + str(len(msg))) + total_msg_len = len(msg) + while msg: + if len(msg) < 3: + raise AssertionError("Msg needs to be at least 3 chars.") + command = msg[0] + if msg[0] != ord(b'<'): + raise AssertionError("Msg should start with <. Msg: {}".format("".join("\\x%02x" % b for b in msg))) + cmd = msg[1] + payload_length = (msg[2] ^ 0x81) >> 1 + + if len(msg) < 3 + payload_length: + raise AssertionError("Msg is too short. Length: {} Payload length: {} Msg: {}", + len(msg), payload_length, "".join("\\x%02x" % b for b in msg)) + + self._handle_msg(msg[0:3 + payload_length]) + msg = msg[3 + payload_length:] + + return total_msg_len + + def _handle_msg(self, msg): + if msg in self.permanent_commands and msg not in self.expected_commands: + if self.permanent_commands[msg] is not None: + self.queue.append(self.permanent_commands[msg]) + return len(msg) + + if msg not in self.expected_commands: + self.crashed = True + print("Unexpected command: " + "".join("\\x%02x" % b for b in msg) + " len: " + str(len(msg))) + raise AssertionError("Unexpected command: " + "".join("\\x%02x" % b for b in msg) + + " len: " + str(len(msg))) + + if self.expected_commands[msg] is not None: + self.queue.append(self.expected_commands[msg]) + + del self.expected_commands[msg] + return len(msg) + + def send(self, data): + return self.write(data) + + def recv(self, size): + return self.read(size) + + def __init__(self, api_version): + super().__init__() + self.name = "SerialMock" + self.expected_commands = {} + self.queue = [] + self.permanent_commands = {} + self.crashed = False + self.api_version = api_version + + +class TestPinotaur(MpfTestCase): + + def get_config_file(self): + return 'config.yaml' + + def get_machine_path(self): + return 'tests/machine_files/pinotaur/' + + def _mock_loop(self): + self.clock.mock_serial("com1", self.serialMock) + + def tearDown(self): + self.assertFalse(self.serialMock.crashed) + super().tearDown() + + def get_platform(self): + return False + + def _wait_for_processing(self): + start = time.time() + while self.serialMock.expected_commands and not self.serialMock.crashed and time.time() < start + 1: + self.advance_time_and_run(.01) + + def setUp(self): + self.expected_duration = 1.5 + self.serialMock = MockPinotaurSocket(api_version=8) + + self.serialMock.permanent_commands = { + b'<\x59\x81': b'>\x59\x7f', # changed switches? -> no + b'<\x65\x81': b'>\x65\x00' # watchdog + } + + self.serialMock.expected_commands = { + b'<\x64\x81': b'>\x64\x00', # reset + b'<\x00\x81': b'>\x00Pinotaur\00', # hw + b'<\x01\x81': b'>\x014.01\00', # version + # b'<\x02\x81': b'>\x020.08\00', # api version + b'<\x03\x81': b'>\x03\x28', # get number of lamps -> 40 + b'<\x04\x81': b'>\x04\x09', # get number of solenoids -> 9 + b'<\x09\x81': b'>\x09\x58', # get number of switches -> 88 + b'<\x5f\x81': False, # flush changes + b'<\x60\x83\x01': b'>\x60\x00', # enable relay + b'<\x19\x85\x00\x0a': b'>\x19\x00', # set recycle time + b'<\x19\x85\x01\x0a': b'>\x19\x00', # set recycle time + b'<\x19\x85\x02\xff': b'>\x19\x00', # set recycle time + b'<\x19\x85\x05\x1e': b'>\x19\x00', # set recycle time + b'<\x19\x85\x06\x0a': b'>\x19\x00', # set recycle time + b'<\x19\x85\x07\x0a': b'>\x19\x00', # set recycle time + } + + for number in range(128): + if number == 4: + self.serialMock.expected_commands[bytes([ord(b'<'), 0x58, 0x83, number])] = b'>\x58\x00\x01' + else: + self.serialMock.expected_commands[bytes([ord(b'<'), 0x58, 0x83, number])] = b'>\x58\x00\x00' + + super().setUp() + + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + def test_platform(self): + self.maxDiff = None + infos = """Pinotaur connected via serial on com1 +Hardware: Pinotaur Firmware Version: 4 +Input map: ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', '100', '101', '102', '103', '104', '105', '106', '107', '108', '109', '110', '111', '112', '113', '114', '115', '116', '117', '118', '119', '120', '121', '122', '123', '124', '125', '126', '127'] +Coil count: 9 +Modern lights count: 88 +Traditional lights count: 40 +""" + + self.assertEqual(self.machine.default_platform.get_info_string(), infos) + + # wait for watchdog + self.serialMock.expected_commands = { + b'<\x65\x81': b'>\x65\x00' # watchdog + } + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # test initial switch state + self.assertSwitchState("s_test00", False) + self.assertSwitchState("s_test4", True) + self.assertSwitchState("s_test60_nc", True) + + self.serialMock.expected_commands = { + b'<\x59\x81': b'>\x59\x04', # changed switches? -> 4 to off + } + self.advance_time_and_run(.1) + # turns inactive + self.assertSwitchState("s_test4", False) + + self.serialMock.expected_commands = { + b'<\x59\x81': b'>\x59\x84', # changed switches? -> 4 to on + } + self.advance_time_and_run(.1) + # turns active + self.assertSwitchState("s_test4", True) + + self.serialMock.expected_commands = { + b'<\x59\x81': b'>\x59\xBC', # changed switches? -> 60 to on + } + self.advance_time_and_run(.1) + # turns inactive (because of NC) + self.assertSwitchState("s_test60_nc", False) + self.assertFalse(self.serialMock.expected_commands) + + # pulse coil + self.serialMock.expected_commands = { + b'<\x12\x87\x00\x01\x01': b'>\x12\x00', # set pulse_pwm to full on + b'<\x17\x85\x00\x0a': b'>\x17\x00' # pulse for 10ms + } + self.machine.coils["c_test"].pulse() + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # pulse trough eject. enable and disable in software + self.serialMock.expected_commands = { + b'<\x12\x87\x02\x01\x01': b'>\x12\x00', # set pulse_pwm to full on + b'<\x11\x87\x02\xff\xff': b'>\x11\x00', # hold time = inf + b'<\x13\x8f\x02\x01\x01\x00\x00\x00\x00': b'>\x13\x00', # set hold pwm + b'<\x17\x85\x02\x01': b'>\x17\x00' # enable (looks like a pulse but this is actually enable) + } + self.machine.coils["c_test_long_pulse"].pulse() + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + self.advance_time_and_run(1.5) + self.serialMock.expected_commands = { + b'<\x16\x83\x02': b'>\x16\x00' # disable + } + self.advance_time_and_run() + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # enable coil + self.serialMock.expected_commands = { + b'<\x12\x87\x01\x01\x01': b'>\x12\x00', # set pulse_pwm to full on + b'<\x11\x87\x01\xff\xff': b'>\x11\x00', # hold time = inf + b'<\x13\x8f\x01\x01\x01\x00\x00\x00\x00': b'>\x13\x00', # set hold pwm + b'<\x17\x85\x01\x0a': b'>\x17\x00' # enable (looks like a pulse but this is actually enable) + } + self.machine.coils["c_test_allow_enable"].enable() + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # disable coil + self.serialMock.expected_commands = { + b'<\x16\x83\x01': b'>\x16\x00' # disable + } + self.machine.coils["c_test_allow_enable"].disable() + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # TODO: implement lights + # # test light enable (using light 3) + # self.serialMock.expected_commands = { + # b'\x0b\x03': None + # } + # self.machine.lights["test_light0"].on(key="test") + # self._wait_for_processing() + # self.assertFalse(self.serialMock.expected_commands) + # + # # disable light (using light 3) + # self.serialMock.expected_commands = { + # b'\x0c\x03': None + # } + # self.machine.lights["test_light0"].remove_from_stack_by_key("test") + # self._wait_for_processing() + # self.assertFalse(self.serialMock.expected_commands) + + def test_rules(self): + """Test HW Rules.""" + self.skipTest("Not implemented yet.") + return + # wait for watchdog + self.serialMock.expected_commands = { + b'\x65': b'\x00' # watchdog + } + self._wait_for_processing() + + self.serialMock.expected_commands = { + b'<\x3c>\x05\x01\x02\x00\x1e\xff\x00\x03\x02\x00': None, # create rule for main + b'<\x06\x01\x00\x00\x0a\xff\xff\x03\x00\x00': None, # create rule for hold + } + self.machine.flippers["f_test_hold_eos"].enable() + self.advance_time_and_run(.2) + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + self.serialMock.expected_commands = { + b'<\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00': None, # remove rule for main + b'<\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00': None, # remove rule for hold + } + self.machine.flippers["f_test_hold_eos"].disable() + self.advance_time_and_run(.2) + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + self.serialMock.expected_commands = { + b'\x3c\x07\x03\x00\x00\x0a\xff\x00\x01\x00\x00': None, # add rule for slingshot + b'\x19\x07\x14': None + } + self.machine.autofire_coils["ac_slingshot"].enable() + self.advance_time_and_run(.2) + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + self.serialMock.expected_commands = { + b'\x3c\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00': None, # remove rule for slingshot + } + self.machine.autofire_coils["ac_slingshot"].disable() + self.advance_time_and_run(.2) + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # test recycle + # pulse coil + self.serialMock.expected_commands = { + b'\x18\x00\x0a': None, # set pulse_ms to 10ms + b'\x17\x00': None + } + self.machine.coils["c_test"].pulse() + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + def test_lights(self): + """Test lights.""" + self.skipTest("Not implemented yet.") + return + # set color to one light without fade + self.serialMock.expected_commands = { + b'\x0d\x00\x00\x00\x00\x03\x11\x22\x33': None, # fade with 0ms fade time + } + self.machine.lights["test_light0"].color([0x11, 0x22, 0x33], key="test") + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # set color again (should do nothing) + self.machine.lights["test_light0"].remove_from_stack_by_key("test") + self.machine.lights["test_light0"].color([0x11, 0x22, 0x33], fade_ms=100) + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # set color to the second light without fade + self.serialMock.expected_commands = { + b'\x0d\x00\x03\x00\x00\x04\x11\x22\x33\x11': None, # fade with 0ms fade time starting at channel 3 + # 4 channels because this is a RGBW light + } + self.machine.lights["test_light1"].color([0x11, 0x22, 0x33]) + self._wait_for_processing() + self.assertFalse(self.serialMock.expected_commands) + + # fade both lights together (fade depending on serial timing) + self.serialMock.expected_commands = { + b'\x0d\x00\x00\x01\x18\x07\xaa\xbb\xcc\xdd\xee\xff\xdd': None, # fade with 300ms fade time + b'\x0d\x00\x00\x01\x19\x07\xaa\xbb\xcc\xdd\xee\xff\xdd': None, # fade with 300ms fade time + b'\x0d\x00\x00\x01\x20\x07\xaa\xbb\xcc\xdd\xee\xff\xdd': None, # fade with 300ms fade time + b'\x0d\x00\x00\x01\x21\x07\xaa\xbb\xcc\xdd\xee\xff\xdd': None, # fade with 300ms fade time + b'\x0d\x00\x00\x01\x22\x07\xaa\xbb\xcc\xdd\xee\xff\xdd': None, # fade with 300ms fade time + } + self.machine.lights["test_light0"].color([0xaa, 0xbb, 0xcc], fade_ms=300) + self.machine.lights["test_light1"].color([0xdd, 0xee, 0xff], fade_ms=300) + start = time.time() + while len(self.serialMock.expected_commands) > 4 and not self.serialMock.crashed and time.time() < start + 10: + self.advance_time_and_run(.01) + self.assertEqual(4, len(self.serialMock.expected_commands))