Skip to content

Commit

Permalink
revert sidecar changes to expose Prometheus metrics (#762)
Browse files Browse the repository at this point in the history
* Revert "expose prometheus metrics for Zookeeper in live-data sidecar (#747)"

This reverts commit e49779e.

* Revert "add Prometheus to remaining sidecar types (#752)"

This reverts commit eb7d24b.
  • Loading branch information
ghirsch-reddit authored Dec 5, 2022
1 parent 2b53adc commit d31fc3c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 195 deletions.
39 changes: 1 addition & 38 deletions baseplate/lib/live_data/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,12 @@
from typing import Optional

from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.handlers.gevent import SequentialGeventHandler
from prometheus_client import Counter

from baseplate.lib import config
from baseplate.lib.secrets import SecretsStore
from baseplate.server.monkey import gevent_is_patched

SESSION_LOST_TOTAL = Counter(
"live_data_fetcher_lost_sessions_total",
"The number of times a Zookeeper client has had a session be lost.",
)

SESSION_SUSPENDED_TOTAL = Counter(
"live_data_fetcher_suspended_sessions_total",
"The number of times a Zookeeper client has had a session be suspended.",
)

SESSION_SUCCESSFUL_TOTAL = Counter(
"live_data_fetcher_connected_sessions_total",
"The number of times a Zookeeper client has successfully established a session.",
)


class SessionStatsListener:
"""A Kazoo listener that monitors changes in connection state.
Increments an event counter whenever connection state changes in a
Zookeeper connection.
"""

def __call__(self, state: KazooState) -> None:
if state == KazooState.LOST:
SESSION_LOST_TOTAL.inc()
elif state == KazooState.SUSPENDED:
SESSION_SUSPENDED_TOTAL.inc()
else:
SESSION_SUCCESSFUL_TOTAL.inc()


def zookeeper_client_from_config(
secrets: SecretsStore, app_config: config.RawConfig, read_only: Optional[bool] = None
Expand Down Expand Up @@ -93,7 +61,7 @@ def zookeeper_client_from_config(
else:
handler = None

client = KazooClient(
return KazooClient(
cfg.hosts,
timeout=cfg.timeout.total_seconds(),
auth_data=auth_data,
Expand Down Expand Up @@ -122,8 +90,3 @@ def zookeeper_client_from_config(
max_delay=60, # never wait longer than this
),
)

listener = SessionStatsListener()
client.add_listener(listener)

return client
59 changes: 16 additions & 43 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,3 @@
# pylint: disable=wrong-import-position,wrong-import-order
from gevent.monkey import patch_all

from baseplate.server.monkey import patch_stdlib_queues

# In order to allow Prometheus to scrape metrics, we need to concurrently
# handle requests to '/metrics' along with the sidecar's execution.
# Monkey patching is used to replace the stdlib sequential versions of functions
# with concurrent versions. It must happen as soon as possible, before the
# sequential versions are imported.
patch_all()
patch_stdlib_queues()

import argparse
import configparser
import email.utils
Expand All @@ -23,13 +10,9 @@
from typing import List
from typing import Optional

import gevent
import requests

from prometheus_client import Counter

from baseplate import Baseplate
from baseplate.clients.requests import ExternalRequestsClient
from baseplate import __version__ as baseplate_version
from baseplate.lib import config
from baseplate.lib import metrics
from baseplate.lib.events import MAX_EVENT_SIZE
Expand All @@ -39,7 +22,6 @@
from baseplate.lib.metrics import metrics_client_from_config
from baseplate.lib.retry import RetryPolicy
from baseplate.server import EnvironmentInterpolation
from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar
from baseplate.sidecars import Batch
from baseplate.sidecars import BatchFull
from baseplate.sidecars import SerializedBatch
Expand All @@ -61,8 +43,6 @@
# maximum size (in bytes) of a batch of events
MAX_BATCH_SIZE = 500 * 1024

PUBLISHES_COUNT_TOTAL = Counter("eventv2_publishes_total", "total count of published events")


class MaxRetriesError(Exception):
pass
Expand Down Expand Up @@ -119,12 +99,15 @@ def serialize(self) -> SerializedBatch:


class BatchPublisher:
def __init__(self, bp: Baseplate, metrics_client: metrics.Client, cfg: Any):
self.baseplate = bp
def __init__(self, metrics_client: metrics.Client, cfg: Any):
self.metrics = metrics_client
self.url = f"{cfg.collector.scheme}://{cfg.collector.hostname}/v{cfg.collector.version}"
self.key_name = cfg.key.name
self.key_secret = cfg.key.secret
self.session = requests.Session()
self.session.headers[
"User-Agent"
] = f"baseplate.py-{self.__class__.__name__}/{baseplate_version}"

def _sign_payload(self, payload: bytes) -> str:
digest = hmac.new(self.key_secret, payload, hashlib.sha256).hexdigest()
Expand All @@ -146,14 +129,15 @@ def publish(self, payload: SerializedBatch) -> None:

for _ in RetryPolicy.new(budget=MAX_RETRY_TIME, backoff=RETRY_BACKOFF):
try:
with self.baseplate.server_context("post") as context:
with self.metrics.timer("post"):
response = context.http_client.post(
self.url,
headers=headers,
data=compressed_payload,
timeout=POST_TIMEOUT,
)
with self.metrics.timer("post"):
response = self.session.post(
self.url,
headers=headers,
data=compressed_payload,
timeout=POST_TIMEOUT,
# http://docs.python-requests.org/en/latest/user/advanced/#keep-alive
stream=False,
)
response.raise_for_status()
except requests.HTTPError as exc:
self.metrics.counter("error.http").increment()
Expand All @@ -171,7 +155,6 @@ def publish(self, payload: SerializedBatch) -> None:
self.metrics.counter("error.io").increment()
logger.exception("HTTP Request failed")
else:
PUBLISHES_COUNT_TOTAL.inc(payload.item_count)
self.metrics.counter("sent").increment(payload.item_count)
return

Expand Down Expand Up @@ -226,21 +209,12 @@ def publish_events() -> None:
max_message_size=MAX_EVENT_SIZE,
)

bp = Baseplate()
bp.configure_context(
{
"http_client": ExternalRequestsClient("event_collector"),
}
)

# pylint: disable=maybe-no-member
serializer = SERIALIZER_BY_VERSION[cfg.collector.version]()
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(bp, metrics_client, cfg)
publisher = BatchPublisher(metrics_client, cfg)

while True:
# allow other routines to execute (specifically handling requests to /metrics)
gevent.sleep(0)
message: Optional[bytes]

try:
Expand All @@ -264,5 +238,4 @@ def publish_events() -> None:


if __name__ == "__main__":
start_prometheus_exporter_for_sidecar()
publish_events()
19 changes: 2 additions & 17 deletions baseplate/sidecars/live_data_watcher.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
"""Watch nodes in ZooKeeper and sync their contents to disk on change."""
# pylint: disable=wrong-import-position,wrong-import-order
from gevent.monkey import patch_all

from baseplate.server.monkey import patch_stdlib_queues

# In order to allow Prometheus to scrape metrics, we need to concurrently
# handle requests to '/metrics' along with the sidecar's execution.
# Monkey patching is used to replace the stdlib sequential versions of functions
# with concurrent versions. It must happen as soon as possible, before the
# sequential versions are imported.
patch_all()
patch_stdlib_queues()

import argparse
import configparser
import json
import logging
import os
import sys
import time

from enum import Enum
from pathlib import Path
Expand All @@ -26,7 +14,6 @@
from typing import Optional

import boto3 # type: ignore
import gevent

from botocore import UNSIGNED # type: ignore
from botocore.client import ClientError # type: ignore
Expand All @@ -39,7 +26,6 @@
from baseplate.lib.live_data.zookeeper import zookeeper_client_from_config
from baseplate.lib.secrets import secrets_store_from_config
from baseplate.server import EnvironmentInterpolation
from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -201,7 +187,7 @@ def watch_zookeeper_nodes(zookeeper: KazooClient, nodes: Any) -> NoReturn:
# all the interesting stuff is now happening in the Kazoo worker thread
# and so we'll just spin and periodically heartbeat to prove we're alive.
while True:
gevent.sleep(HEARTBEAT_INTERVAL)
time.sleep(HEARTBEAT_INTERVAL)

# see the comment in baseplate.live_data.zookeeper for explanation of
# how reconnects work with the background thread.
Expand Down Expand Up @@ -272,5 +258,4 @@ def main() -> NoReturn:


if __name__ == "__main__":
start_prometheus_exporter_for_sidecar()
main()
19 changes: 2 additions & 17 deletions baseplate/sidecars/secrets_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,6 @@
write to a new file in whatever format needed, and restart other services if necessary.
"""
# pylint: disable=wrong-import-position,wrong-import-order
from gevent.monkey import patch_all

from baseplate.server.monkey import patch_stdlib_queues

# In order to allow Prometheus to scrape metrics, we need to concurrently
# handle requests to '/metrics' along with the sidecar's execution.
# Monkey patching is used to replace the stdlib sequential versions of functions
# with concurrent versions. It must happen as soon as possible, before the
# sequential versions are imported.
patch_all()
patch_stdlib_queues()

import argparse
import configparser
import datetime
Expand All @@ -81,6 +68,7 @@
import os
import posixpath
import subprocess
import time
import urllib.parse
import uuid

Expand All @@ -90,13 +78,11 @@
from typing import Optional
from typing import Tuple

import gevent
import requests

from baseplate import __version__ as baseplate_version
from baseplate.lib import config
from baseplate.server import EnvironmentInterpolation
from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -441,9 +427,8 @@ def main() -> None:
last_proc = trigger_callback(cfg.callback, cfg.output.path, last_proc)
time_til_expiration = soonest_expiration - datetime.datetime.utcnow()
time_to_sleep = time_til_expiration - VAULT_TOKEN_PREFETCH_TIME
gevent.sleep(max(int(time_to_sleep.total_seconds()), 1))
time.sleep(max(int(time_to_sleep.total_seconds()), 1))


if __name__ == "__main__":
start_prometheus_exporter_for_sidecar()
main()
Loading

0 comments on commit d31fc3c

Please sign in to comment.