Skip to content

Commit

Permalink
Move poller and test it
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Aug 29, 2023
1 parent f0190b9 commit cb43123
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 49 deletions.
50 changes: 1 addition & 49 deletions duetector/monitors/base.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,14 @@
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional

from duetector.collectors.base import Collector
from duetector.config import Configuable
from duetector.filters.base import Filter
from duetector.log import logger
from duetector.tools.poller import Poller
from duetector.tracers.base import Tracer


class Poller(Configuable):
config_scope = "poller"
default_config = {
"interval_ms": 500,
}

def __init__(
self,
config: Optional[Dict[str, Any]] = None,
*args,
**kwargs,
):
super().__init__(config=config, *args, **kwargs)
self._thread: Optional[threading.Thread] = None
self.shutdown_event = threading.Event()

@property
def interval_ms(self):
return self.config.interval_ms

def start(self, func, *args, **kwargs):
if self._thread:
raise RuntimeError("Poller thread is already started, try shutdown and wait first.")

def _poll():
while not self.shutdown_event.is_set():
func(*args, **kwargs)
self.shutdown_event.wait(timeout=self.interval_ms / 1000)

self._thread = threading.Thread(target=_poll)
self.shutdown_event.clear()
self._thread.start()

def shutdown(self):
self.shutdown_event.set()

def wait(self, timeout_ms=None):
timeout = (timeout_ms or self.interval_ms * 3) / 1000
self._thread.join(timeout=timeout)
if self._thread.is_alive():
# FIXME: should we raise an exception here?
logger.warning("Poller thread is still alive after timeout")
self.shutdown()
else:
self._thread = None
self.shutdown_event.clear()


class Monitor(Configuable):
"""
A base class for all monitors
Expand Down
56 changes: 56 additions & 0 deletions duetector/tools/poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import threading
from typing import Any, Dict, Optional

from duetector.config import Configuable
from duetector.log import logger


class Poller(Configuable):
config_scope = "poller"
default_config = {
"interval_ms": 500,
}

def __init__(
self,
config: Optional[Dict[str, Any]] = None,
*args,
**kwargs,
):
super().__init__(config=config, *args, **kwargs)
self._thread: Optional[threading.Thread] = None
self.shutdown_event = threading.Event()

@property
def interval_ms(self):
return self.config.interval_ms

def start(self, func, *args, **kwargs):
if self._thread:
raise RuntimeError("Poller thread is already started, try shutdown and wait first.")

def _poll():
while not self.shutdown_event.is_set():
func(*args, **kwargs)
self.shutdown_event.wait(timeout=self.interval_ms / 1000)

self._thread = threading.Thread(target=_poll)
self.shutdown_event.clear()
self._thread.start()

def shutdown(self):
self.shutdown_event.set()

def wait(self, timeout_ms=None):
if not self._thread:
return

timeout = (timeout_ms or self.interval_ms * 3) / 1000
self._thread.join(timeout=timeout)
if self._thread.is_alive():
# FIXME: should we raise an exception here?
logger.warning("Poller thread is still alive after timeout")
self.shutdown()
else:
self._thread = None
self.shutdown_event.clear()
32 changes: 32 additions & 0 deletions tests/test_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import time

import pytest

from duetector.tools.poller import Poller


@pytest.fixture
def config():
yield {
"poller": {
"interval_ms": 500,
}
}


@pytest.fixture
def poller(config):
return Poller(config)


def test_poller(poller: Poller, capsys):
poller.start(lambda: print("hello"))
time.sleep(poller.interval_ms / 1000 * 1.5)
poller.shutdown()
poller.wait()
captured = capsys.readouterr()
assert captured.out == "hello\nhello\n"


if __name__ == "__main__":
pytest.main(["-vv", "-s", __file__])

0 comments on commit cb43123

Please sign in to comment.