Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trs/wip/metrics-platform #246

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion lib/id3c/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import logging
from flask import Flask
from . import config
from . import config, metrics
from .routes import blueprints


Expand All @@ -17,6 +17,8 @@ def create_app():
for blueprint in blueprints:
app.register_blueprint(blueprint)

metrics.register_app(app)

LOG.debug(f"app root is {app.root_path}")
LOG.debug(f"app static directory is {app.static_folder}")

Expand Down
41 changes: 41 additions & 0 deletions lib/id3c/api/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Web API metrics.
"""
import os
from prometheus_client import CollectorRegistry, GCCollector, PlatformCollector, ProcessCollector
import prometheus_flask_exporter
import prometheus_flask_exporter.multiprocess

from ..metrics import MultiProcessWriter


if "prometheus_multiproc_dir" in os.environ:
FlaskMetrics = prometheus_flask_exporter.multiprocess.MultiprocessPrometheusMetrics
MULTIPROCESS = True

else:
FlaskMetrics = prometheus_flask_exporter.PrometheusMetrics
MULTIPROCESS = False


# This instance is used by both our routes and create_app().
XXX = FlaskMetrics(
app = None,
path = None,
defaults_prefix = prometheus_flask_exporter.NO_PREFIX,
default_latency_as_histogram = False)


def register_app(app):
XXX.init_app(app)

# XXX TODO FIXME needs to be postfork for a pre-forking server like uWSGI
if MULTIPROCESS:
registry = CollectorRegistry(auto_describe = True)

ProcessCollector(registry = registry)
PlatformCollector(registry = registry)
GCCollector(registry = registry)

writer = MultiProcessWriter(registry)
writer.start()
28 changes: 27 additions & 1 deletion lib/id3c/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import json
import logging
import pkg_resources
from flask import Blueprint, request, send_file
import prometheus_client
from flask import Blueprint, make_response, request, send_file

from ..metrics import DatabaseCollector
from . import datastore
from .metrics import XXX
from .utils.routes import authenticated_datastore_session_required, content_types_accepted, check_content_length


Expand All @@ -20,6 +24,28 @@
]


# Metrics exposition endpoint
@api_v1.route("/metrics", methods = ["GET"])
@XXX.do_not_track()
@authenticated_datastore_session_required
def expose_metrics(*, session):
"""
Exposes metrics for Prometheus.

Includes metrics collected from the Flask app, as well as the database.
"""
registry = prometheus_client.CollectorRegistry(auto_describe = True)

# Collect metrics from the server-wide registry, potentially from multiple
# server processes via files in prometheus_multiproc_dir.
registry.register(XXX.registry)

# Collect metrics from the database using the authenticated session.
registry.register(DatabaseCollector(session))

return make_response(prometheus_client.make_wsgi_app(registry))


@api_v1.route("/", methods = ['GET'])
@api_unversioned.route("/", methods = ['GET'])
def index():
Expand Down
95 changes: 95 additions & 0 deletions lib/id3c/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Metrics handling functions.
"""
import logging
import os
import threading
from prometheus_client import CollectorRegistry, REGISTRY as DEFAULT_REGISTRY
from prometheus_client.core import GaugeMetricFamily
from prometheus_client.values import ValueClass
from psycopg2.errors import InsufficientPrivilege
from time import sleep
from .db import DatabaseSession
from .utils import set_thread_name


LOG = logging.getLogger(__name__)


class DatabaseCollector:
"""
Collects metrics from the database, using an existing *session*.
"""
def __init__(self, session: DatabaseSession):
self.session = session


def collect(self):
with self.session:
yield from self.estimated_row_total()


def estimated_row_total(self):
family = GaugeMetricFamily(
"id3c_estimated_row_total",
"Estimated number of rows in an ID3C database table",
labels = ("schema", "table"))

try:
metrics = self.session.fetch_all(
"""
select
ns.nspname as schema,
c.relname as table,
c.reltuples::bigint as estimated_row_count
from
pg_catalog.pg_class c
join pg_catalog.pg_namespace ns on (c.relnamespace = ns.oid)
where
ns.nspname in ('receiving', 'warehouse') and
c.relkind = 'r'
order by
schema,
"table"
""")
except InsufficientPrivilege as error:
LOG.error(f"Permission denied when collecting id3c_estimated_row_total metrics: {error}")
return

for metric in metrics:
family.add_metric((metric.schema, metric.table), metric.estimated_row_count)

yield family


class MultiProcessWriter(threading.Thread):
def __init__(self, registry: CollectorRegistry = DEFAULT_REGISTRY, interval: int = 15):
super().__init__(name = "metrics writer", daemon = True)

self.registry = registry
self.interval = interval

def run(self):
set_thread_name(self)

while True:
for metric in self.registry.collect():
for sample in metric.samples:
if metric.type == "gauge":
# Metrics from GaugeMetricFamily will not have the
# attribute set, for example.
multiprocess_mode = getattr(metric, "_multiprocess_mode", "all")
else:
multiprocess_mode = ""

value = ValueClass(
metric.type,
metric.name,
sample.name,
sample.labels.keys(),
sample.labels.values(),
multiprocess_mode = multiprocess_mode)

value.set(sample.value)

sleep(self.interval)
20 changes: 20 additions & 0 deletions lib/id3c/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Utilities.
"""
import ctypes
import threading
from typing import Any, Sequence, Union


Expand Down Expand Up @@ -94,3 +96,21 @@ def shorten(text, length, placeholder):
return text[0:length - len(placeholder)] + placeholder
else:
return text


LIBCAP = None

def set_thread_name(thread: threading.Thread):
global LIBCAP

if LIBCAP is None:
try:
LIBCAP = ctypes.CDLL("libcap.so.2")
except:
LIBCAP = False

if not LIBCAP:
return

# From the prctl(2) manpage, PR_SET_NAME is 15.
LIBCAP.prctl(15, thread.name.encode())
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
"more-itertools",
"oauth2client >2.0.0,<4.0.0",
"pandas >=1.0.1,<2",
"prometheus_client",
"prometheus_flask_exporter",
"psycopg2 >=2.8,<3",
"pyyaml",
"requests",
Expand Down