Skip to content

Commit

Permalink
feat(abr-testing): add module tests (#17127)
Browse files Browse the repository at this point in the history
# Overview
Module testing script for running predefined tests on modules.
(Currently only has heater shaker test)

---------

Co-authored-by: rclarke0 <[email protected]>
  • Loading branch information
AnthonyNASC20 and rclarke0 authored Dec 20, 2024
1 parent 04dea6c commit e458d06
Show file tree
Hide file tree
Showing 2 changed files with 293 additions and 0 deletions.
138 changes: 138 additions & 0 deletions abr-testing/abr_testing/tools/module_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Interface with opentrons modules!"""
from serial import Serial # type: ignore[import-untyped]
import asyncio
import subprocess
from typing import Any

# Generic
_READ_ALL = "readall"
_READ_LINE = "read"
_DONE = "done"

# TC commands
_MOVE_SEAL = "ms"
_MOVE_LID = "ml"
tc_gcode_shortcuts = {
"status": "M119",
_MOVE_SEAL: "M241.D", # move seal motor
_MOVE_LID: "M240.D", # move lid stepper motor
"ol": "M126", # open lid
"cl": "M127", # close lid
"sw": "M901.D", # status of all switches
"lt": "M141.D", # get lid temperature
"pt": "M105.D", # get plate temperature
}

# HS Commands
hs_gcode_shortcuts = {
"srpm": "M3 S{rpm}", # Set RPM
"grpm": "M123", # Get RPM
"home": "G28", # Home
"deactivate": "M106", # Deactivate
}

gcode_shortcuts = tc_gcode_shortcuts | hs_gcode_shortcuts


async def message_read(dev: Serial) -> Any:
"""Read message."""
response = dev.readline().decode()
while not response:
await asyncio.sleep(1)
response = dev.readline().decode()
return response


async def message_return(dev: Serial) -> Any:
"""Wait until message becomes available."""
try:
response = await asyncio.wait_for(message_read(dev), timeout=30)
return response
except asyncio.exceptions.TimeoutError:
print("response timed out.")
return ""


async def handle_module_gcode_shortcut(
dev: Serial, command: str, in_commands: bool, output: str = ""
) -> None:
"""Handle debugging commands that require followup."""
if in_commands:
if command == _MOVE_SEAL:
distance = input("enter distance in steps => ")
dev.write(
f"{gcode_shortcuts[command]} {distance}\n".encode()
) # (+) -> retract, (-) -> engage
# print(await message_return(dev))
elif command == _MOVE_LID:
distance = input(
"enter angular distance in degrees => "
) # (+) -> open, (-) -> close
dev.write(f"{gcode_shortcuts[command]} {distance}\n".encode())
# print(await message_return(dev))
# everything else
else:
dev.write(f"{gcode_shortcuts[command]}\n".encode())
else:
dev.write(f"{command}\n".encode())
try:
mr = await message_return(dev)
print(mr)
except TypeError:
print("Invalid input")
return

if output:
try:
with open(output, "a") as result_file:
if "OK" in mr:
status = command + ": SUCCESS"
else:
status = command + ": FAILURE"
result_file.write(status)
result_file.write(f" {mr}")
result_file.close()
except FileNotFoundError:
print(f"cannot open file: {output}")


async def comms_loop(dev: Serial, commands: list, output: str = "") -> bool:
"""Loop for commands."""
_exit = False
try:
command = commands.pop(0)
except IndexError:
command = input("\n>>> ")
if command == _READ_ALL:
print(dev.readlines())
elif command == _READ_LINE:
print(dev.readline())
elif command == _DONE:
_exit = True
elif command in gcode_shortcuts:
await handle_module_gcode_shortcut(dev, command, True, output)
else:
await handle_module_gcode_shortcut(dev, command, False, output)
return _exit


async def _main(module: str, commands: list = [], output: str = "") -> bool:
"""Main process."""
module_name = (
subprocess.check_output(["find", "/dev/", "-name", f"*{module}*"])
.decode()
.strip()
)
if not module_name:
print(f"{module} not found. Exiting.")
return False
dev = Serial(f"{module_name}", 9600, timeout=2)
_exit = False
while not _exit:
_exit = await comms_loop(dev, commands, output)
dev.close()
return True


if __name__ == "__main__":
asyncio.run(_main("heatershaker"))
155 changes: 155 additions & 0 deletions abr-testing/abr_testing/tools/test_modules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Modules Tests Script!"""
import asyncio
import time
from datetime import datetime
import os
import module_control # type: ignore
from typing import Any, Tuple, Dict
import traceback

# To run:
# SSH into robot
# cd /opt/opentrons-robot-server/abr-testing/tools
# python3 test_modules.py


async def tc_test_1(module: str, path_to_file: str) -> None:
"""Thermocycler Test 1 Open and Close Lid."""
duration = int(input("How long to run this test for? (in seconds): "))
start = time.time()
while time.time() - start < duration:
try:
await (tc_open_lid(module, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(5)
try:
await (tc_close_lid(module, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(5)


async def hs_test_1(module: str, path_to_file: str) -> None:
"""Heater Shaker Test 1. (Home and Shake)."""
duration = int(input("How long to run this test for? (in seconds): "))
rpm = input("Target RPM (200-3000): ")
start = time.time()
while time.time() - start < duration:
try:
await (hs_test_home(module, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(5)
try:
await (hs_test_set_shake(module, rpm, path_to_file))
except asyncio.TimeoutError:
return
time.sleep(10)
try:
await (hs_test_set_shake(module, "0", path_to_file))
except asyncio.TimeoutError:
return
time.sleep(10)


async def input_codes(module: str, path_to_file: str) -> None:
"""Opens serial for manual code input."""
await module_control._main(module, output=path_to_file)


hs_tests: Dict[str, Tuple[Any, str]] = {
"Test 1": (hs_test_1, "Repeatedly home heater shaker then set shake speed"),
"Input GCodes": (input_codes, "Input g codes"),
}

tc_tests: Dict[str, Tuple[Any, str]] = {
"Test 1": (tc_test_1, "Repeatedly open and close TC lid"),
"Input GCodes": (input_codes, "Input g codes"),
}

global modules

modules = {
"heatershaker": hs_tests,
"thermocycler": tc_tests,
}


async def main(module: str) -> None:
"""Select test to be run."""
# Select test to run
# Set directory for tests
BASE_DIRECTORY = "/userfs/data/testing_data/"
if not os.path.exists(BASE_DIRECTORY):
os.makedirs(BASE_DIRECTORY)
tests = modules[module]
for i, test in enumerate(tests.keys()):
function, description = tests[test]
print(f"{i}) {test} : {description}")
selected_test = int(input("Please select a test: "))
try:
function, description = tests[list(tests.keys())[selected_test]]
test_dir = BASE_DIRECTORY + f"{module}/test/{list(tests.keys())[selected_test]}"
print(f"{i}, {description}")
print(f"TEST DIR: {test_dir}")
date = datetime.now()
filename = f"results_{datetime.strftime(date, '%Y-%m-%d_%H:%M:%S')}.txt"
output_file = os.path.join(test_dir, filename)
try:
if not os.path.exists(test_dir):
os.makedirs(test_dir)
open(output_file, "a").close()
except Exception:
traceback.print_exc()
print(f"PATH: {output_file} ")
await (function(module, output_file))
except Exception:
print("Failed to run test")
traceback.print_exc()


# HS Test Functions
async def hs_test_home(module: str, path_to_file: str) -> None:
"""Home heater shaker."""
hs_gcodes = module_control.hs_gcode_shortcuts
home_gcode = hs_gcodes["home"]
await (module_control._main(module, [home_gcode, "done"], path_to_file))


async def hs_test_set_shake(module: str, rpm: str, path_to_file: str) -> None:
"""Shake heater shaker at specified speed."""
hs_gcodes = module_control.hs_gcode_shortcuts
set_shake_gcode = hs_gcodes["srpm"].format(rpm=rpm)
await (module_control._main(module, [set_shake_gcode, "done"], path_to_file))


async def hs_deactivate(module: str, path_to_file: str) -> None:
"""Deactivate Heater Shaker."""
hs_gcodes = module_control.hs_gcode_shortcuts
deactivate_gcode = hs_gcodes["deactivate"]
await (module_control._main(module, [deactivate_gcode, "done"], path_to_file))


# TC Test Functions
async def tc_open_lid(module: str, path_to_file: str) -> None:
"""Open thermocycler lid."""
tc_gcodes = module_control.tc_gcode_shortcuts
open_lid_gcode = tc_gcodes["ol"]
await (module_control._main(module, [open_lid_gcode, "done"], path_to_file))


async def tc_close_lid(module: str, path_to_file: str) -> None:
"""Open thermocycler lid."""
tc_gcodes = module_control.tc_gcode_shortcuts
close_lid_gcode = tc_gcodes["cl"]
await (module_control._main(module, [close_lid_gcode, "done"], path_to_file))


if __name__ == "__main__":
print("Modules:")
for i, module in enumerate(modules):
print(f"{i}) {module}")
module_int = int(input("Please select a module: "))
module = list(modules.keys())[module_int]
asyncio.run(main(module))

0 comments on commit e458d06

Please sign in to comment.