Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a telemetry interface (including Prometheus) #406

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
107 changes: 70 additions & 37 deletions aiomonitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from types import TracebackType
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Coroutine,
Dict,
Final,
Expand All @@ -35,6 +37,7 @@

from .exceptions import MissingTask
from .task import TracedTask, persistent_coro
from .telemetry.app import init_telemetry
from .termui.commands import interact
from .types import (
CancellationChain,
Expand Down Expand Up @@ -68,6 +71,7 @@
MONITOR_TERMUI_PORT: Final = 20101
MONITOR_WEBUI_PORT: Final = 20102
CONSOLE_PORT: Final = 20103
TELEMETRY_PORT: Final = 20104

T = TypeVar("T")
T_co = TypeVar("T_co", covariant=True)
Expand Down Expand Up @@ -120,6 +124,8 @@ def __init__(
webui_port: int = MONITOR_WEBUI_PORT,
console_port: int = CONSOLE_PORT,
console_enabled: bool = True,
telemetry_port: int = TELEMETRY_PORT,
telemetry_enabled: bool = True,
hook_task_factory: bool = False,
max_termination_history: int = 1000,
locals: Optional[Dict[str, Any]] = None,
Expand All @@ -134,16 +140,27 @@ def __init__(
self.console_locals = {"__name__": "__console__", "__doc__": None}
else:
self.console_locals = locals
self._telemetry_port = telemetry_port
self._telemetry_enabled = telemetry_enabled

self.prompt = "monitor >>> "
log.info(
"Starting aiomonitor at telnet://%(host)s:%(tport)d and http://%(host)s:%(wport)d",
{
"host": host,
"tport": termui_port,
"wport": webui_port,
},
)
if console_enabled:
log.info(
"Starting aiomonitor console at telnet://%(host)s:%(tport)d and http://%(host)s:%(wport)d",
{
"host": host,
"tport": termui_port,
"wport": webui_port,
},
)
if telemetry_enabled:
log.info(
"Starting aiomonitor telemetry service at http://%(host)s:%(port)d",
{
"host": host,
"port": telemetry_port,
},
)

self._closed = False
self._started = False
Expand Down Expand Up @@ -545,6 +562,26 @@ def _create_task(
def _ui_main(self) -> None:
asyncio.run(self._ui_main_async())

@contextlib.asynccontextmanager
async def _webapp_ctx(
self,
app_factory: Callable[[], Awaitable[web.Application]],
port: int,
) -> AsyncIterator[None]:
runner = web.AppRunner(await app_factory())
await runner.setup()
site = web.TCPSite(
runner,
str(self._host),
port,
reuse_port=True,
)
await site.start()
try:
yield
finally:
await runner.cleanup()

async def _ui_main_async(self) -> None:
loop = asyncio.get_running_loop()
self._termination_info_queue = janus.Queue()
Expand All @@ -562,36 +599,28 @@ async def _ui_main_async(self) -> None:
host=self._host,
port=self._termui_port,
)
webui_app = await init_webui(self)
webui_runner = web.AppRunner(webui_app)
await webui_runner.setup()
webui_site = web.TCPSite(
webui_runner,
str(self._host),
self._webui_port,
reuse_port=True,
)
await webui_site.start()
telnet_server.start()
await asyncio.sleep(0)
self._ui_started.set()
try:
await self._ui_forever_future
except asyncio.CancelledError:
pass
finally:
termui_tasks = {*self._termui_tasks}
for termui_task in termui_tasks:
termui_task.cancel()
await asyncio.gather(*termui_tasks, return_exceptions=True)
self._ui_termination_handler_task.cancel()
self._ui_cancellation_handler_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._ui_termination_handler_task
with contextlib.suppress(asyncio.CancelledError):
await self._ui_cancellation_handler_task
await telnet_server.stop()
await webui_runner.cleanup()
async with self._webapp_ctx(
lambda: init_webui(self), self._webui_port
), self._webapp_ctx(lambda: init_telemetry(self), self._telemetry_port):
await asyncio.sleep(0)
self._ui_started.set()
try:
await self._ui_forever_future
except asyncio.CancelledError:
pass
finally:
termui_tasks = {*self._termui_tasks}
for termui_task in termui_tasks:
termui_task.cancel()
await asyncio.gather(*termui_tasks, return_exceptions=True)
self._ui_termination_handler_task.cancel()
self._ui_cancellation_handler_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._ui_termination_handler_task
with contextlib.suppress(asyncio.CancelledError):
await self._ui_cancellation_handler_task
await telnet_server.stop()

async def _ui_handle_termination_updates(self) -> None:
while True:
Expand Down Expand Up @@ -633,7 +662,9 @@ def start_monitor(
port: int = MONITOR_TERMUI_PORT, # kept the name for backward compatibility
console_port: int = CONSOLE_PORT,
webui_port: int = MONITOR_WEBUI_PORT,
telemetry_port: int = TELEMETRY_PORT,
console_enabled: bool = True,
telemetry_enabled: bool = True,
hook_task_factory: bool = False,
max_termination_history: Optional[int] = None,
locals: Optional[Dict[str, Any]] = None,
Expand All @@ -659,6 +690,8 @@ def start_monitor(
webui_port=webui_port,
console_port=console_port,
console_enabled=console_enabled,
telemetry_port=telemetry_port,
telemetry_enabled=telemetry_enabled,
hook_task_factory=hook_task_factory,
max_termination_history=(
max_termination_history
Expand Down
Empty file.
25 changes: 25 additions & 0 deletions aiomonitor/telemetry/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

import dataclasses
from typing import TYPE_CHECKING

from .prometheus import prometheus_exporter

if TYPE_CHECKING:
from ..monitor import Monitor

from aiohttp import web


@dataclasses.dataclass
class TelemetryContext:
monitor: Monitor


async def init_telemetry(monitor: Monitor) -> web.Application:
app = web.Application()
app["ctx"] = TelemetryContext(
monitor=monitor,
)
app.router.add_route("GET", "/prometheus", prometheus_exporter)
return app
26 changes: 26 additions & 0 deletions aiomonitor/telemetry/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

import asyncio
import io
import os
import time
from typing import TYPE_CHECKING

from aiohttp import web

if TYPE_CHECKING:
from .app import TelemetryContext


async def prometheus_exporter(request: web.Request) -> web.Response:
ctx: TelemetryContext = request.app["ctx"]
out = io.StringIO()
print("# TYPE asyncio_tasks gauge", file=out)
now = time.time_ns() // 1000 # unix timestamp in msec
pid = os.getpid() # we use threads to run the aiomonitor UI thread, so the pid is same to the monitored program.
all_task_count = len(asyncio.all_tasks(ctx.monitor._monitored_loop))
print(
'asyncio_running_tasks{pid="%d"} %d %d' % (pid, all_task_count, now), file=out
)
# TODO: count per name of explicitly named tasks (using labels)
return web.Response(body=out.getvalue(), content_type="text/plain")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ known-first-party = ["aiomonitor"]
split-on-trailing-comma = true

[tool.ruff.format]
preview = true
preview = true # enable the black's preview style

[tool.mypy]
ignore_missing_imports = true
Expand Down
Loading