diff --git a/tests/__init__.py b/tests/__init__.py index 8415dab..5c4459f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,12 +1,53 @@ # Copyright (c) 2020 OpenCyphal # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +# pylint: disable=wrong-import-position -import pathlib +from pathlib import Path +from typing import Callable, TypeVar, Any # Please maintain these carefully if you're changing the project's directory structure. -TEST_DIR = pathlib.Path(__file__).resolve().parent +TEST_DIR = Path(__file__).resolve().parent ROOT_DIR = TEST_DIR.parent DEPS_DIR = TEST_DIR / "deps" assert DEPS_DIR.is_dir() + + +T = TypeVar("T") + + +def timeout(seconds: int) -> Callable[[Callable[..., T]], Callable[..., T]]: + """ + This is a decorator that makes the function raise :class:`TimeoutError` if it takes more than ``seconds`` + seconds to complete. + + >>> import time + >>> @timeout(3) + ... def runas(delay: float) -> None: + ... time.sleep(delay) + >>> runas(1) + >>> runas(10) + Traceback (most recent call last): + ... + TimeoutError: ... + """ + import signal + from functools import wraps + + def decorator(fun: Callable[..., T]) -> Callable[..., T]: + @wraps(fun) + def wrapper(*args: Any, **kwargs: Any) -> T: + def signal_handler(_signum: int, _frame: Any) -> None: + raise TimeoutError(f"Function {fun} took more than {seconds:.0f}s to complete") + + signal.signal(signal.SIGALRM, signal_handler) + signal.alarm(seconds) + try: + return fun(*args, **kwargs) + finally: + signal.alarm(0) + + return wrapper + + return decorator diff --git a/tests/cmd/monitor.py b/tests/cmd/monitor.py index 29314b3..4611a61 100755 --- a/tests/cmd/monitor.py +++ b/tests/cmd/monitor.py @@ -18,10 +18,12 @@ from pycyphal.transport.udp import UDPTransport from tests.subprocess import Subprocess from tests.dsdl import OUTPUT_DIR +from tests import timeout import yakut # noinspection SpellCheckingInspection +@timeout(300) @pytest.mark.asyncio async def _unittest_monitor_nodes(compiled_dsdl: Any) -> None: _ = compiled_dsdl @@ -110,6 +112,7 @@ async def _unittest_monitor_nodes(compiled_dsdl: Any) -> None: # noinspection SpellCheckingInspection +@timeout(60) @pytest.mark.asyncio async def _unittest_monitor_errors(compiled_dsdl: Any) -> None: _ = compiled_dsdl