diff --git a/abr-testing/abr_testing/tools/module_control.py b/abr-testing/abr_testing/tools/module_control.py new file mode 100644 index 00000000000..5bc1f5cfb1d --- /dev/null +++ b/abr-testing/abr_testing/tools/module_control.py @@ -0,0 +1,138 @@ +"""Interface with opentrons modules!""" +from serial import Serial # type: ignore[import-untyped] +import asyncio +import subprocess +from typing import Any + +# Generic +_READ_ALL = "readall" +_READ_LINE = "read" +_DONE = "done" + +# TC commands +_MOVE_SEAL = "ms" +_MOVE_LID = "ml" +tc_gcode_shortcuts = { + "status": "M119", + _MOVE_SEAL: "M241.D", # move seal motor + _MOVE_LID: "M240.D", # move lid stepper motor + "ol": "M126", # open lid + "cl": "M127", # close lid + "sw": "M901.D", # status of all switches + "lt": "M141.D", # get lid temperature + "pt": "M105.D", # get plate temperature +} + +# HS Commands +hs_gcode_shortcuts = { + "srpm": "M3 S{rpm}", # Set RPM + "grpm": "M123", # Get RPM + "home": "G28", # Home + "deactivate": "M106", # Deactivate +} + +gcode_shortcuts = tc_gcode_shortcuts | hs_gcode_shortcuts + + +async def message_read(dev: Serial) -> Any: + """Read message.""" + response = dev.readline().decode() + while not response: + await asyncio.sleep(1) + response = dev.readline().decode() + return response + + +async def message_return(dev: Serial) -> Any: + """Wait until message becomes available.""" + try: + response = await asyncio.wait_for(message_read(dev), timeout=30) + return response + except asyncio.exceptions.TimeoutError: + print("response timed out.") + return "" + + +async def handle_module_gcode_shortcut( + dev: Serial, command: str, in_commands: bool, output: str = "" +) -> None: + """Handle debugging commands that require followup.""" + if in_commands: + if command == _MOVE_SEAL: + distance = input("enter distance in steps => ") + dev.write( + f"{gcode_shortcuts[command]} {distance}\n".encode() + ) # (+) -> retract, (-) -> engage + # print(await message_return(dev)) + elif command == _MOVE_LID: + distance = input( + "enter angular distance in degrees => " + ) # (+) -> open, (-) -> close + dev.write(f"{gcode_shortcuts[command]} {distance}\n".encode()) + # print(await message_return(dev)) + # everything else + else: + dev.write(f"{gcode_shortcuts[command]}\n".encode()) + else: + dev.write(f"{command}\n".encode()) + try: + mr = await message_return(dev) + print(mr) + except TypeError: + print("Invalid input") + return + + if output: + try: + with open(output, "a") as result_file: + if "OK" in mr: + status = command + ": SUCCESS" + else: + status = command + ": FAILURE" + result_file.write(status) + result_file.write(f" {mr}") + result_file.close() + except FileNotFoundError: + print(f"cannot open file: {output}") + + +async def comms_loop(dev: Serial, commands: list, output: str = "") -> bool: + """Loop for commands.""" + _exit = False + try: + command = commands.pop(0) + except IndexError: + command = input("\n>>> ") + if command == _READ_ALL: + print(dev.readlines()) + elif command == _READ_LINE: + print(dev.readline()) + elif command == _DONE: + _exit = True + elif command in gcode_shortcuts: + await handle_module_gcode_shortcut(dev, command, True, output) + else: + await handle_module_gcode_shortcut(dev, command, False, output) + return _exit + + +async def _main(module: str, commands: list = [], output: str = "") -> bool: + """Main process.""" + module_name = ( + subprocess.check_output(["find", "/dev/", "-name", f"*{module}*"]) + .decode() + .strip() + ) + if not module_name: + print(f"{module} not found. Exiting.") + return False + dev = Serial(f"{module_name}", 9600, timeout=2) + _exit = False + while not _exit: + _exit = await comms_loop(dev, commands, output) + dev.close() + return True + + +if __name__ == "__main__": + asyncio.run(_main("heatershaker")) diff --git a/abr-testing/abr_testing/tools/test_modules.py b/abr-testing/abr_testing/tools/test_modules.py new file mode 100644 index 00000000000..8c372fbff53 --- /dev/null +++ b/abr-testing/abr_testing/tools/test_modules.py @@ -0,0 +1,155 @@ +"""Modules Tests Script!""" +import asyncio +import time +from datetime import datetime +import os +import module_control # type: ignore +from typing import Any, Tuple, Dict +import traceback + +# To run: +# SSH into robot +# cd /opt/opentrons-robot-server/abr-testing/tools +# python3 test_modules.py + + +async def tc_test_1(module: str, path_to_file: str) -> None: + """Thermocycler Test 1 Open and Close Lid.""" + duration = int(input("How long to run this test for? (in seconds): ")) + start = time.time() + while time.time() - start < duration: + try: + await (tc_open_lid(module, path_to_file)) + except asyncio.TimeoutError: + return + time.sleep(5) + try: + await (tc_close_lid(module, path_to_file)) + except asyncio.TimeoutError: + return + time.sleep(5) + + +async def hs_test_1(module: str, path_to_file: str) -> None: + """Heater Shaker Test 1. (Home and Shake).""" + duration = int(input("How long to run this test for? (in seconds): ")) + rpm = input("Target RPM (200-3000): ") + start = time.time() + while time.time() - start < duration: + try: + await (hs_test_home(module, path_to_file)) + except asyncio.TimeoutError: + return + time.sleep(5) + try: + await (hs_test_set_shake(module, rpm, path_to_file)) + except asyncio.TimeoutError: + return + time.sleep(10) + try: + await (hs_test_set_shake(module, "0", path_to_file)) + except asyncio.TimeoutError: + return + time.sleep(10) + + +async def input_codes(module: str, path_to_file: str) -> None: + """Opens serial for manual code input.""" + await module_control._main(module, output=path_to_file) + + +hs_tests: Dict[str, Tuple[Any, str]] = { + "Test 1": (hs_test_1, "Repeatedly home heater shaker then set shake speed"), + "Input GCodes": (input_codes, "Input g codes"), +} + +tc_tests: Dict[str, Tuple[Any, str]] = { + "Test 1": (tc_test_1, "Repeatedly open and close TC lid"), + "Input GCodes": (input_codes, "Input g codes"), +} + +global modules + +modules = { + "heatershaker": hs_tests, + "thermocycler": tc_tests, +} + + +async def main(module: str) -> None: + """Select test to be run.""" + # Select test to run + # Set directory for tests + BASE_DIRECTORY = "/userfs/data/testing_data/" + if not os.path.exists(BASE_DIRECTORY): + os.makedirs(BASE_DIRECTORY) + tests = modules[module] + for i, test in enumerate(tests.keys()): + function, description = tests[test] + print(f"{i}) {test} : {description}") + selected_test = int(input("Please select a test: ")) + try: + function, description = tests[list(tests.keys())[selected_test]] + test_dir = BASE_DIRECTORY + f"{module}/test/{list(tests.keys())[selected_test]}" + print(f"{i}, {description}") + print(f"TEST DIR: {test_dir}") + date = datetime.now() + filename = f"results_{datetime.strftime(date, '%Y-%m-%d_%H:%M:%S')}.txt" + output_file = os.path.join(test_dir, filename) + try: + if not os.path.exists(test_dir): + os.makedirs(test_dir) + open(output_file, "a").close() + except Exception: + traceback.print_exc() + print(f"PATH: {output_file} ") + await (function(module, output_file)) + except Exception: + print("Failed to run test") + traceback.print_exc() + + +# HS Test Functions +async def hs_test_home(module: str, path_to_file: str) -> None: + """Home heater shaker.""" + hs_gcodes = module_control.hs_gcode_shortcuts + home_gcode = hs_gcodes["home"] + await (module_control._main(module, [home_gcode, "done"], path_to_file)) + + +async def hs_test_set_shake(module: str, rpm: str, path_to_file: str) -> None: + """Shake heater shaker at specified speed.""" + hs_gcodes = module_control.hs_gcode_shortcuts + set_shake_gcode = hs_gcodes["srpm"].format(rpm=rpm) + await (module_control._main(module, [set_shake_gcode, "done"], path_to_file)) + + +async def hs_deactivate(module: str, path_to_file: str) -> None: + """Deactivate Heater Shaker.""" + hs_gcodes = module_control.hs_gcode_shortcuts + deactivate_gcode = hs_gcodes["deactivate"] + await (module_control._main(module, [deactivate_gcode, "done"], path_to_file)) + + +# TC Test Functions +async def tc_open_lid(module: str, path_to_file: str) -> None: + """Open thermocycler lid.""" + tc_gcodes = module_control.tc_gcode_shortcuts + open_lid_gcode = tc_gcodes["ol"] + await (module_control._main(module, [open_lid_gcode, "done"], path_to_file)) + + +async def tc_close_lid(module: str, path_to_file: str) -> None: + """Open thermocycler lid.""" + tc_gcodes = module_control.tc_gcode_shortcuts + close_lid_gcode = tc_gcodes["cl"] + await (module_control._main(module, [close_lid_gcode, "done"], path_to_file)) + + +if __name__ == "__main__": + print("Modules:") + for i, module in enumerate(modules): + print(f"{i}) {module}") + module_int = int(input("Please select a module: ")) + module = list(modules.keys())[module_int] + asyncio.run(main(module))