From cb43123e1eb1c95000d55502cd26c0ccea479e8d Mon Sep 17 00:00:00 2001 From: wunder957 Date: Tue, 29 Aug 2023 14:40:16 +0800 Subject: [PATCH] Move poller and test it --- duetector/monitors/base.py | 50 +--------------------------------- duetector/tools/poller.py | 56 ++++++++++++++++++++++++++++++++++++++ tests/test_poller.py | 32 ++++++++++++++++++++++ 3 files changed, 89 insertions(+), 49 deletions(-) create mode 100644 duetector/tools/poller.py create mode 100644 tests/test_poller.py diff --git a/duetector/monitors/base.py b/duetector/monitors/base.py index 08e8793..767504b 100644 --- a/duetector/monitors/base.py +++ b/duetector/monitors/base.py @@ -1,4 +1,3 @@ -import threading from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional @@ -6,57 +5,10 @@ from duetector.config import Configuable from duetector.filters.base import Filter from duetector.log import logger +from duetector.tools.poller import Poller from duetector.tracers.base import Tracer -class Poller(Configuable): - config_scope = "poller" - default_config = { - "interval_ms": 500, - } - - def __init__( - self, - config: Optional[Dict[str, Any]] = None, - *args, - **kwargs, - ): - super().__init__(config=config, *args, **kwargs) - self._thread: Optional[threading.Thread] = None - self.shutdown_event = threading.Event() - - @property - def interval_ms(self): - return self.config.interval_ms - - def start(self, func, *args, **kwargs): - if self._thread: - raise RuntimeError("Poller thread is already started, try shutdown and wait first.") - - def _poll(): - while not self.shutdown_event.is_set(): - func(*args, **kwargs) - self.shutdown_event.wait(timeout=self.interval_ms / 1000) - - self._thread = threading.Thread(target=_poll) - self.shutdown_event.clear() - self._thread.start() - - def shutdown(self): - self.shutdown_event.set() - - def wait(self, timeout_ms=None): - timeout = (timeout_ms or self.interval_ms * 3) / 1000 - self._thread.join(timeout=timeout) - if self._thread.is_alive(): - # FIXME: should we raise an exception here? - logger.warning("Poller thread is still alive after timeout") - self.shutdown() - else: - self._thread = None - self.shutdown_event.clear() - - class Monitor(Configuable): """ A base class for all monitors diff --git a/duetector/tools/poller.py b/duetector/tools/poller.py new file mode 100644 index 0000000..5e9478c --- /dev/null +++ b/duetector/tools/poller.py @@ -0,0 +1,56 @@ +import threading +from typing import Any, Dict, Optional + +from duetector.config import Configuable +from duetector.log import logger + + +class Poller(Configuable): + config_scope = "poller" + default_config = { + "interval_ms": 500, + } + + def __init__( + self, + config: Optional[Dict[str, Any]] = None, + *args, + **kwargs, + ): + super().__init__(config=config, *args, **kwargs) + self._thread: Optional[threading.Thread] = None + self.shutdown_event = threading.Event() + + @property + def interval_ms(self): + return self.config.interval_ms + + def start(self, func, *args, **kwargs): + if self._thread: + raise RuntimeError("Poller thread is already started, try shutdown and wait first.") + + def _poll(): + while not self.shutdown_event.is_set(): + func(*args, **kwargs) + self.shutdown_event.wait(timeout=self.interval_ms / 1000) + + self._thread = threading.Thread(target=_poll) + self.shutdown_event.clear() + self._thread.start() + + def shutdown(self): + self.shutdown_event.set() + + def wait(self, timeout_ms=None): + if not self._thread: + return + + timeout = (timeout_ms or self.interval_ms * 3) / 1000 + self._thread.join(timeout=timeout) + if self._thread.is_alive(): + # FIXME: should we raise an exception here? + logger.warning("Poller thread is still alive after timeout") + self.shutdown() + else: + self._thread = None + self.shutdown_event.clear() diff --git a/tests/test_poller.py b/tests/test_poller.py new file mode 100644 index 0000000..d4ece31 --- /dev/null +++ b/tests/test_poller.py @@ -0,0 +1,32 @@ +import time + +import pytest + +from duetector.tools.poller import Poller + + +@pytest.fixture +def config(): + yield { + "poller": { + "interval_ms": 500, + } + } + + +@pytest.fixture +def poller(config): + return Poller(config) + + +def test_poller(poller: Poller, capsys): + poller.start(lambda: print("hello")) + time.sleep(poller.interval_ms / 1000 * 1.5) + poller.shutdown() + poller.wait() + captured = capsys.readouterr() + assert captured.out == "hello\nhello\n" + + +if __name__ == "__main__": + pytest.main(["-vv", "-s", __file__])