diff --git a/aiomonitor/monitor.py b/aiomonitor/monitor.py index 5a2ad70..fc465b2 100644 --- a/aiomonitor/monitor.py +++ b/aiomonitor/monitor.py @@ -16,7 +16,9 @@ from types import TracebackType from typing import ( Any, + AsyncIterator, Awaitable, + Callable, Coroutine, Dict, Final, @@ -35,6 +37,7 @@ from .exceptions import MissingTask from .task import TracedTask, persistent_coro +from .telemetry.app import init_telemetry from .termui.commands import interact from .types import ( CancellationChain, @@ -68,6 +71,7 @@ MONITOR_TERMUI_PORT: Final = 20101 MONITOR_WEBUI_PORT: Final = 20102 CONSOLE_PORT: Final = 20103 +TELEMETRY_PORT: Final = 20104 T = TypeVar("T") T_co = TypeVar("T_co", covariant=True) @@ -120,6 +124,8 @@ def __init__( webui_port: int = MONITOR_WEBUI_PORT, console_port: int = CONSOLE_PORT, console_enabled: bool = True, + telemetry_port: int = TELEMETRY_PORT, + telemetry_enabled: bool = True, hook_task_factory: bool = False, max_termination_history: int = 1000, locals: Optional[Dict[str, Any]] = None, @@ -134,16 +140,27 @@ def __init__( self.console_locals = {"__name__": "__console__", "__doc__": None} else: self.console_locals = locals + self._telemetry_port = telemetry_port + self._telemetry_enabled = telemetry_enabled self.prompt = "monitor >>> " - log.info( - "Starting aiomonitor at telnet://%(host)s:%(tport)d and http://%(host)s:%(wport)d", - { - "host": host, - "tport": termui_port, - "wport": webui_port, - }, - ) + if console_enabled: + log.info( + "Starting aiomonitor console at telnet://%(host)s:%(tport)d and http://%(host)s:%(wport)d", + { + "host": host, + "tport": termui_port, + "wport": webui_port, + }, + ) + if telemetry_enabled: + log.info( + "Starting aiomonitor telemetry service at http://%(host)s:%(port)d", + { + "host": host, + "port": telemetry_port, + }, + ) self._closed = False self._started = False @@ -545,6 +562,26 @@ def _create_task( def _ui_main(self) -> None: asyncio.run(self._ui_main_async()) + @contextlib.asynccontextmanager + async def _webapp_ctx( + self, + app_factory: Callable[[], Awaitable[web.Application]], + port: int, + ) -> AsyncIterator[None]: + runner = web.AppRunner(await app_factory()) + await runner.setup() + site = web.TCPSite( + runner, + str(self._host), + port, + reuse_port=True, + ) + await site.start() + try: + yield + finally: + await runner.cleanup() + async def _ui_main_async(self) -> None: loop = asyncio.get_running_loop() self._termination_info_queue = janus.Queue() @@ -562,36 +599,28 @@ async def _ui_main_async(self) -> None: host=self._host, port=self._termui_port, ) - webui_app = await init_webui(self) - webui_runner = web.AppRunner(webui_app) - await webui_runner.setup() - webui_site = web.TCPSite( - webui_runner, - str(self._host), - self._webui_port, - reuse_port=True, - ) - await webui_site.start() telnet_server.start() - await asyncio.sleep(0) - self._ui_started.set() - try: - await self._ui_forever_future - except asyncio.CancelledError: - pass - finally: - termui_tasks = {*self._termui_tasks} - for termui_task in termui_tasks: - termui_task.cancel() - await asyncio.gather(*termui_tasks, return_exceptions=True) - self._ui_termination_handler_task.cancel() - self._ui_cancellation_handler_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await self._ui_termination_handler_task - with contextlib.suppress(asyncio.CancelledError): - await self._ui_cancellation_handler_task - await telnet_server.stop() - await webui_runner.cleanup() + async with self._webapp_ctx( + lambda: init_webui(self), self._webui_port + ), self._webapp_ctx(lambda: init_telemetry(self), self._telemetry_port): + await asyncio.sleep(0) + self._ui_started.set() + try: + await self._ui_forever_future + except asyncio.CancelledError: + pass + finally: + termui_tasks = {*self._termui_tasks} + for termui_task in termui_tasks: + termui_task.cancel() + await asyncio.gather(*termui_tasks, return_exceptions=True) + self._ui_termination_handler_task.cancel() + self._ui_cancellation_handler_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._ui_termination_handler_task + with contextlib.suppress(asyncio.CancelledError): + await self._ui_cancellation_handler_task + await telnet_server.stop() async def _ui_handle_termination_updates(self) -> None: while True: @@ -633,7 +662,9 @@ def start_monitor( port: int = MONITOR_TERMUI_PORT, # kept the name for backward compatibility console_port: int = CONSOLE_PORT, webui_port: int = MONITOR_WEBUI_PORT, + telemetry_port: int = TELEMETRY_PORT, console_enabled: bool = True, + telemetry_enabled: bool = True, hook_task_factory: bool = False, max_termination_history: Optional[int] = None, locals: Optional[Dict[str, Any]] = None, @@ -659,6 +690,8 @@ def start_monitor( webui_port=webui_port, console_port=console_port, console_enabled=console_enabled, + telemetry_port=telemetry_port, + telemetry_enabled=telemetry_enabled, hook_task_factory=hook_task_factory, max_termination_history=( max_termination_history diff --git a/aiomonitor/telemetry/__init__.py b/aiomonitor/telemetry/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aiomonitor/telemetry/app.py b/aiomonitor/telemetry/app.py new file mode 100644 index 0000000..d6c96a0 --- /dev/null +++ b/aiomonitor/telemetry/app.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import dataclasses +from typing import TYPE_CHECKING + +from .prometheus import prometheus_exporter + +if TYPE_CHECKING: + from ..monitor import Monitor + +from aiohttp import web + + +@dataclasses.dataclass +class TelemetryContext: + monitor: Monitor + + +async def init_telemetry(monitor: Monitor) -> web.Application: + app = web.Application() + app["ctx"] = TelemetryContext( + monitor=monitor, + ) + app.router.add_route("GET", "/prometheus", prometheus_exporter) + return app diff --git a/aiomonitor/telemetry/prometheus.py b/aiomonitor/telemetry/prometheus.py new file mode 100644 index 0000000..de432a5 --- /dev/null +++ b/aiomonitor/telemetry/prometheus.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import asyncio +import io +import os +import time +from typing import TYPE_CHECKING + +from aiohttp import web + +if TYPE_CHECKING: + from .app import TelemetryContext + + +async def prometheus_exporter(request: web.Request) -> web.Response: + ctx: TelemetryContext = request.app["ctx"] + out = io.StringIO() + print("# TYPE asyncio_tasks gauge", file=out) + now = time.time_ns() // 1000 # unix timestamp in msec + pid = os.getpid() # we use threads to run the aiomonitor UI thread, so the pid is same to the monitored program. + all_task_count = len(asyncio.all_tasks(ctx.monitor._monitored_loop)) + print( + 'asyncio_running_tasks{pid="%d"} %d %d' % (pid, all_task_count, now), file=out + ) + # TODO: count per name of explicitly named tasks (using labels) + return web.Response(body=out.getvalue(), content_type="text/plain") diff --git a/pyproject.toml b/pyproject.toml index c90e699..7131b4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ known-first-party = ["aiomonitor"] split-on-trailing-comma = true [tool.ruff.format] -preview = true +preview = true # enable the black's preview style [tool.mypy] ignore_missing_imports = true