diff --git a/depthai_helpers/app_manager.py b/depthai_helpers/app_manager.py new file mode 100644 index 000000000..cb3f22f43 --- /dev/null +++ b/depthai_helpers/app_manager.py @@ -0,0 +1,84 @@ +import os +import shutil +import signal +import subprocess +import sys +import time +from pathlib import Path + +initEnv = os.environ.copy() +if "PYTHONPATH" in initEnv: + initEnv["PYTHONPATH"] += ":" + str(Path(__file__).parent.parent.absolute()) +else: + initEnv["PYTHONPATH"] = str(Path(__file__).parent.parent.absolute()) + + + +def quoted(val): + return '"' + str(val) + '"' + +class App: + def __init__(self, appName, appPath=None, appRequirements=None, appEntrypoint=None): + self.appName = appName + self.appPath = appPath or Path(__file__).parent.parent / "apps" / self.appName + self.venvPath = self.appPath / "venv" + self.appPip = str(self.venvPath / "bin" / "pip") if os.name != 'nt' else (self.venvPath / "Scripts" / "pip.exe") + self.appInterpreter = str(self.venvPath / "bin" / "python") if os.name != 'nt' else (self.venvPath / "Scripts" / "python.exe") + self.appRequirements = appRequirements or self.appPath / "requirements.txt" + self.appEntrypoint = appEntrypoint or self.appPath / "main.py" + + def createVenv(self, force=False): + try: + subprocess.check_call(' '.join([quoted(sys.executable), '-m', 'venv', '-h']), env=initEnv, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except: + print(f"Error accessing \"venv\" module! Please try to install \"python3.{sys.version_info[1]}-venv\" or see oficial docs here - https://docs.python.org/3/library/venv.html", file=sys.stderr) + sys.exit(1) + try: + subprocess.check_call(' '.join([quoted(sys.executable), '-m', 'pip', '-h']), env=initEnv, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except: + print("Error accessing \"pip\" module! Please try to install \"python3-pip\" or see oficial docs here - https://pip.pypa.io/en/stable/installation/", file=sys.stderr) + sys.exit(1) + + if not force and Path(self.appInterpreter).exists() and Path(self.appPip).exists(): + print("Existing venv found.") + else: + if self.venvPath.exists(): + print("Recreating venv...") + shutil.rmtree(self.venvPath) + else: + print("Creating venv...") + try: + subprocess.check_call(' '.join([quoted(sys.executable), '-m', 'venv', quoted(str(self.venvPath.absolute()))]), shell=True, env=initEnv, cwd=self.appPath) + except: + print(f"Error creating a new virtual environment using \"venv\" module! Please try to install \"python3.{sys.version_info[1]}-venv\" again", file=sys.stderr) + sys.exit(1) + print("Installing requirements...") + subprocess.check_call(' '.join([quoted(self.appInterpreter), '-m', 'pip', 'install', '-U', 'pip']), env=initEnv, shell=True, cwd=self.appPath) + subprocess.check_call(' '.join([quoted(self.appInterpreter), '-m', 'pip', 'install', '--prefer-binary', '-r', quoted(str(self.appRequirements))]), env=initEnv, shell=True, cwd=self.appPath) + + def runApp(self, shouldRun = lambda: True): + # Passthrough args to the app + args = [quoted(arg) for arg in sys.argv[1:]] + args.insert(0, quoted(str(self.appEntrypoint))) + args.insert(0, quoted(self.appInterpreter)) + if os.name == 'nt': + pro = subprocess.Popen(' '.join(args), env=initEnv, shell=True, cwd=self.appPath) + else: + pro = subprocess.Popen(' '.join(args), env=initEnv, shell=True, cwd=self.appPath, preexec_fn=os.setsid) + while shouldRun() and pro.poll() is None: + try: + time.sleep(1) + except KeyboardInterrupt: + break + + # if pro.poll() is not None: + try: + if os.name == 'nt': + subprocess.call(['taskkill', '/F', '/T', '/PID', str(pro.pid)]) + else: + os.killpg(os.getpgid(pro.pid), signal.SIGTERM) + + except ProcessLookupError: + pass + + diff --git a/depthai_helpers/cli_utils.py b/depthai_helpers/cli_utils.py new file mode 100644 index 000000000..1122ff31c --- /dev/null +++ b/depthai_helpers/cli_utils.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +from types import SimpleNamespace + + +class RangeFloat(object): + def __init__(self, start, end): + self.start = start + self.end = end + + def __eq__(self, other): + return self.start <= other <= self.end + + def __contains__(self, item): + return self.__eq__(item) + + def __iter__(self): + yield self + + def __str__(self): + return '[{0},{1}]'.format(self.start, self.end) + + +PrintColors = SimpleNamespace( + HEADER="\033[95m", + BLUE="\033[94m", + GREEN="\033[92m", + RED="\033[91m", + WARNING="\033[1;5;31m", + FAIL="\033[91m", + ENDC="\033[0m", + BOLD="\033[1m", + UNDERLINE="\033[4m", + BLACK_BG_RED="\033[1;31;40m", + BLACK_BG_GREEN="\033[1;32;40m", + BLACK_BG_BLUE="\033[1;34;40m", +) + + +def cliPrint(msg, print_color): + print("{0}{1}{2}".format(print_color, msg, PrintColors.ENDC)) diff --git a/depthai_helpers/config_manager.py b/depthai_helpers/config_manager.py new file mode 100644 index 000000000..26d5d3987 --- /dev/null +++ b/depthai_helpers/config_manager.py @@ -0,0 +1,289 @@ +import os +import platform +import subprocess +from pathlib import Path +import cv2 +import depthai as dai +import numpy as np + +from depthai_helpers.cli_utils import cliPrint, PrintColors +from depthai_sdk.previews import Previews + + +DEPTHAI_ZOO = Path(__file__).parent.parent / Path(f"resources/nn/") +DEPTHAI_VIDEOS = Path(__file__).parent.parent / Path(f"videos/") +DEPTHAI_VIDEOS.mkdir(exist_ok=True) + + +class ConfigManager: + labels = "" + customFwCommit = '' + + def __init__(self, args): + self.args = args + + # Get resolution width as it's required by some functions + self.rgbResWidth = self.rgbResolutionWidth(self.args.rgbResolution) + + self.args.encode = dict(self.args.encode) + self.args.cameraOrientation = dict(self.args.cameraOrientation) + if (Previews.left.name in self.args.cameraOrientation or Previews.right.name in self.args.cameraOrientation) and self.useDepth: + print("[WARNING] Changing mono cameras orientation may result in incorrect depth/disparity maps") + + def rgbResolutionWidth(self, res: dai.ColorCameraProperties.SensorResolution) -> int: + if res == dai.ColorCameraProperties.SensorResolution.THE_720_P: return 720 + elif res == dai.ColorCameraProperties.SensorResolution.THE_800_P: return 800 + elif res == dai.ColorCameraProperties.SensorResolution.THE_1080_P: return 1080 + elif res == dai.ColorCameraProperties.SensorResolution.THE_4_K: return 2160 + elif res == dai.ColorCameraProperties.SensorResolution.THE_12_MP: return 3040 + elif res == dai.ColorCameraProperties.SensorResolution.THE_13_MP: return 3120 + else: raise Exception('Resolution not supported!') + + # Not needed, but might be useful for SDK in the future + # def _monoResWidth(self, res: dai.MonoCameraProperties.SensorResolution) -> int: + # if res == dai.MonoCameraProperties.SensorResolution.THE_400_P: return 400 + # elif res == dai.MonoCameraProperties.SensorResolution.THE_480_P: return 480 + # elif res == dai.MonoCameraProperties.SensorResolution.THE_720_P: return 720 + # elif res == dai.MonoCameraProperties.SensorResolution.THE_800_P: return 800 + # else: raise Exception('Resolution not supported!') + + @property + def debug(self): + return not self.args.noDebug + + @property + def useCamera(self): + return not self.args.video + + @property + def useNN(self): + return not self.args.disableNeuralNetwork + + @property + def useDepth(self): + return not self.args.disableDepth and self.useCamera + + @property + def maxDisparity(self): + maxDisparity = 95 + if (self.args.extendedDisparity): + maxDisparity *= 2 + if (self.args.subpixel): + maxDisparity *= 32 + + return maxDisparity + + def getModelSource(self): + if not self.useCamera: + return "host" + if self.args.camera == "left": + if self.useDepth: + return "rectifiedLeft" + return "left" + if self.args.camera == "right": + if self.useDepth: + return "rectifiedRight" + return "right" + if self.args.camera == "color": + return "color" + + def irEnabled(self, device): + try: + drivers = device.getIrDrivers() + return len(drivers) > 0 + except RuntimeError: + return False + + def getModelName(self): + if self.args.cnnModel: + return self.args.cnnModel + modelDir = self.getModelDir() + if modelDir is not None: + return Path(modelDir).stem + + def getModelDir(self): + if self.args.cnnPath: + return self.args.cnnPath + if self.args.cnnModel is not None and (DEPTHAI_ZOO / self.args.cnnModel).exists(): + return DEPTHAI_ZOO / self.args.cnnModel + + def getAvailableZooModels(self): + def verify(path: Path): + return path.parent.name == path.stem + + def convert(path: Path): + return path.stem + + return list(map(convert, filter(verify, DEPTHAI_ZOO.rglob("**/*.json")))) + + def getColorMap(self): + cvColorMap = cv2.applyColorMap(np.arange(256, dtype=np.uint8), getattr(cv2, "COLORMAP_{}".format(self.args.colorMap))) + cvColorMap[0] = [0, 0, 0] + return cvColorMap + + def getUsb2Mode(self): + if self.args['forceUsb2']: + cliPrint("FORCE USB2 MODE", PrintColors.WARNING) + usb2Mode = True + else: + usb2Mode = False + return usb2Mode + + def adjustPreviewToOptions(self): + if len(self.args.show) != 0: + depthPreviews = [Previews.rectifiedRight.name, Previews.rectifiedLeft.name, Previews.depth.name, + Previews.depthRaw.name, Previews.disparity.name, Previews.disparityColor.name] + + if len([preview for preview in self.args.show if preview in depthPreviews]) == 0 and not self.useNN: + print("No depth-related previews chosen, disabling depth...") + self.args.disableDepth = True + return + + self.args.show.append(Previews.color.name) + if self.useDepth: + self.args.show.append(Previews.disparityColor.name) + + if self.args.guiType == "qt": + if self.useNN: + self.args.show.append(Previews.nnInput.name) + + if self.useDepth: + if self.lowBandwidth: + self.args.show.append(Previews.disparityColor.name) + else: + self.args.show.append(Previews.depthRaw.name) + self.args.show.append(Previews.rectifiedLeft.name) + self.args.show.append(Previews.rectifiedRight.name) + else: + self.args.show.append(Previews.left.name) + self.args.show.append(Previews.right.name) + + def adjustParamsToDevice(self, device): + deviceInfo = device.getDeviceInfo() + cams = device.getConnectedCameras() + depthEnabled = dai.CameraBoardSocket.LEFT in cams and dai.CameraBoardSocket.RIGHT in cams + + sensorNames = device.getCameraSensorNames() + if dai.CameraBoardSocket.RGB in cams: + name = sensorNames[dai.CameraBoardSocket.RGB] + if name == 'OV9782': + if self.rgbResWidth not in [720, 800]: + self.args.rgbResolution = dai.ColorCameraProperties.SensorResolution.THE_800_P + cliPrint(f'{name} requires 720 or 800 resolution, defaulting to {self.args.rgbResolution}', + PrintColors.RED) + else: + if self.rgbResWidth in [720, 800]: + self.args.rgbResolution = dai.ColorCameraProperties.SensorResolution.THE_1080_P + cliPrint(f'{name} doesn\'t support 720 / 800 resolutions, defaulting to {self.args.rgbResolution}', + PrintColors.RED) + + if not depthEnabled: + if not self.args.disableDepth: + print("Disabling depth...") + self.args.disableDepth = True + if self.args.spatialBoundingBox: + print("Disabling spatial bounding boxes...") + self.args.spatialBoundingBox = False + if self.args.camera != 'color': + print("Switching source to RGB camera...") + self.args.camera = 'color' + updatedShowArg = [] + for name in self.args.show: + if name in ("nnInput", "color"): + updatedShowArg.append(name) + else: + print("Disabling {} preview...".format(name)) + if len(updatedShowArg) == 0: + print("No previews available, adding defaults...") + updatedShowArg.append("color") + if self.useNN: + updatedShowArg.append("nnInput") + self.args.show = updatedShowArg + + if self.args.bandwidth == "auto": + if deviceInfo.protocol != dai.XLinkProtocol.X_LINK_USB_VSC: + print("Enabling low-bandwidth mode due to connection mode... (protocol: {})".format(deviceInfo.protocol)) + self.args.bandwidth = "low" + print("Setting PoE video quality to 50 to reduce latency...") + self.args.poeQuality = 50 + elif device.getUsbSpeed() not in [dai.UsbSpeed.SUPER, dai.UsbSpeed.SUPER_PLUS]: + print("Enabling low-bandwidth mode due to low USB speed... (speed: {})".format(device.getUsbSpeed())) + self.args.bandwidth = "low" + else: + self.args.bandwidth = "high" + + def linuxCheckApplyUsbRules(self): + if platform.system() == 'Linux': + ret = subprocess.call(['grep', '-irn', 'ATTRS{idVendor}=="03e7"', '/etc/udev/rules.d'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if(ret != 0): + cliPrint("WARNING: Usb rules not found", PrintColors.WARNING) + cliPrint(""" +Run the following commands to set USB rules: + +$ echo 'SUBSYSTEM=="usb", ATTRS{idVendor}=="03e7", MODE="0666"' | sudo tee /etc/udev/rules.d/80-movidius.rules +$ sudo udevadm control --reload-rules && sudo udevadm trigger + +After executing these commands, disconnect and reconnect USB cable to your OAK device""", PrintColors.RED) + os._exit(1) + + def getCountLabel(self, nnetManager): + if self.args.countLabel is None: + return None + + if self.args.countLabel.isdigit(): + obj = nnetManager.getLabelText(int(self.args.countLabel)).lower() + print(f"Counting number of {obj} in the frame") + return obj + else: return self.args.countLabel.lower() + + @property + def leftCameraEnabled(self): + return (self.args.camera == Previews.left.name and self.useNN) or \ + Previews.left.name in self.args.show or \ + Previews.rectifiedLeft.name in self.args.show or \ + self.useDepth + + @property + def rightCameraEnabled(self): + return (self.args.camera == Previews.right.name and self.useNN) or \ + Previews.right.name in self.args.show or \ + Previews.rectifiedRight.name in self.args.show or \ + self.useDepth + + @property + def rgbCameraEnabled(self): + return (self.args.camera == Previews.color.name and self.useNN) or \ + Previews.color.name in self.args.show + + @property + def inputSize(self): + return tuple(map(int, self.args.cnnInputSize.split('x'))) if self.args.cnnInputSize else None + + @property + def previewSize(self): + return (576, 320) + + @property + def lowBandwidth(self): + return self.args.bandwidth == "low" + + @property + def lowCapabilities(self): + return platform.machine().startswith("arm") or platform.machine().startswith("aarch") + + @property + def shaves(self): + if self.args.shaves is not None: + return self.args.shaves + if not self.useCamera: + return 8 + if self.rgbResWidth > 1080: + return 5 + return 6 + + @property + def dispMultiplier(self): + val = 255 / self.maxDisparity + return val + + diff --git a/depthai_helpers/supervisor.py b/depthai_helpers/supervisor.py new file mode 100644 index 000000000..97388e882 --- /dev/null +++ b/depthai_helpers/supervisor.py @@ -0,0 +1,75 @@ +import atexit +import importlib.util +import os +import signal +import subprocess +import sys +import time +from pathlib import Path + + +def createNewArgs(args): + def removeArg(name, withValue=True): + if name in sys.argv: + idx = sys.argv.index(name) + if withValue: + del sys.argv[idx + 1] + del sys.argv[idx] + + removeArg("-gt") + removeArg("--guiType") + removeArg("--noSupervisor") + return sys.argv[1:] + ["--noSupervisor", "--guiType", args.guiType] + + +class Supervisor: + child = None + + def __init__(self): + signal.signal(signal.SIGINT, self.cleanup) + signal.signal(signal.SIGTERM, self.cleanup) + atexit.register(self.cleanup) + + def runDemo(self, args): + repo_root = Path(__file__).parent.parent + args.noSupervisor = True + new_args = createNewArgs(args) + env = os.environ.copy() + + if args.guiType == "qt": + new_env = env.copy() + new_env["QT_QUICK_BACKEND"] = "software" + new_env["LD_LIBRARY_PATH"] = str(Path(importlib.util.find_spec("PyQt5").origin).parent / "Qt5/lib") + new_env["DEPTHAI_INSTALL_SIGNAL_HANDLER"] = "0" + try: + cmd = ' '.join([f'"{sys.executable}"', "depthai_demo.py"] + new_args) + self.child = subprocess.Popen(cmd, shell=True, env=new_env, cwd=str(repo_root.resolve())) + self.child.communicate() + if self.child.returncode != 0: + raise subprocess.CalledProcessError(self.child.returncode, cmd) + except subprocess.CalledProcessError as ex: + print("Error while running demo script... {}".format(ex)) + print("Waiting 5s for the device to be discoverable again...") + time.sleep(5) + args.guiType = "cv" + if args.guiType == "cv": + new_env = env.copy() + new_env["DEPTHAI_INSTALL_SIGNAL_HANDLER"] = "0" + new_args = createNewArgs(args) + cmd = ' '.join([f'"{sys.executable}"', "depthai_demo.py"] + new_args) + self.child = subprocess.Popen(cmd, shell=True, env=new_env, cwd=str(repo_root.resolve())) + self.child.communicate() + + def checkQtAvailability(self): + return importlib.util.find_spec("PyQt5") is not None + + def cleanup(self, *args, **kwargs): + if self.child is not None and self.child.poll() is None: + self.child.terminate() + try: + self.child.wait(1) + except subprocess.TimeoutExpired: + pass + + + diff --git a/depthai_helpers/version_check.py b/depthai_helpers/version_check.py new file mode 100644 index 000000000..932fbfb42 --- /dev/null +++ b/depthai_helpers/version_check.py @@ -0,0 +1,68 @@ +import sys + +import depthai +from pathlib import Path + + +def getVersionFromRequirements(package_name, req_path): + with req_path.resolve().open() as f: + for line in f.readlines(): + if package_name in line: + #not commented out and has version indicator (==) + if not line.startswith('#') and '==' in line: + try: + split = line.split('==') + name = split[0] + if name != package_name: + continue + version = split[1] + version = version.split(';')[0] + #remove any whitespace + version = version.strip() + except: + version = None + return version + + +def getVersion(module_name): + try: + import importlib + module = importlib.import_module(module_name) + if hasattr(module, '__version__'): + return module.__version__ + if hasattr(module, 'version'): + return module.version + except: + pass + try: + import pkg_resources + return pkg_resources.get_distribution(module_name).version + except: + pass + + try: + from importlib.metadata import version + return version(module_name) + except: + pass + + return None + +def checkRequirementsVersion(): + daiVersionRequired = getVersionFromRequirements('depthai', Path(__file__).parent / Path('../requirements.txt')) + if daiVersionRequired is not None: + if daiVersionRequired != getVersion('depthai'): + print(f"\033[1;5;31mVersion mismatch\033[0m\033[91m between installed depthai lib and the required one by the script.\033[0m \n\ + Required: {daiVersionRequired}\n\ + Installed: {getVersion('depthai')}\n\ + \033[91mRun: python3 install_requirements.py \033[0m") + sys.exit(42) + + daiSdkVersionRequired = getVersionFromRequirements('depthai-sdk', Path(__file__).parent / Path('../requirements.txt')) + if daiSdkVersionRequired is not None: + if daiSdkVersionRequired != getVersion('depthai-sdk'): + print(f"\033[1;5;31mVersion mismatch\033[0m\033[91m between installed depthai-sdk lib and the required one by the script.\033[0m \n\ + Required: {daiSdkVersionRequired}\n\ + Installed: {getVersion('depthai_sdk')}\n\ + \033[91mRun: python3 install_requirements.py \033[0m") + sys.exit(42)