From d31fc3c27d6b6c7e6b36fa6897c1231cf60a5cb5 Mon Sep 17 00:00:00 2001 From: ghirsch-reddit <110993165+ghirsch-reddit@users.noreply.github.com> Date: Mon, 5 Dec 2022 15:02:15 -0800 Subject: [PATCH] revert sidecar changes to expose Prometheus metrics (#762) * Revert "expose prometheus metrics for Zookeeper in live-data sidecar (#747)" This reverts commit e49779e0fbea1b2e0787b064920bccb28c6e6247. * Revert "add Prometheus to remaining sidecar types (#752)" This reverts commit eb7d24b22ef9488005dd65df1113deb94887ce3e. --- baseplate/lib/live_data/zookeeper.py | 39 +-------- baseplate/sidecars/event_publisher.py | 59 ++++---------- baseplate/sidecars/live_data_watcher.py | 19 +---- baseplate/sidecars/secrets_fetcher.py | 19 +---- baseplate/sidecars/trace_publisher.py | 79 ++++++------------- tests/unit/lib/events/publisher_tests.py | 16 ++-- .../unit/observers/tracing/publisher_tests.py | 19 ++--- 7 files changed, 55 insertions(+), 195 deletions(-) diff --git a/baseplate/lib/live_data/zookeeper.py b/baseplate/lib/live_data/zookeeper.py index 76ba92559..98f07a1c0 100644 --- a/baseplate/lib/live_data/zookeeper.py +++ b/baseplate/lib/live_data/zookeeper.py @@ -2,44 +2,12 @@ from typing import Optional from kazoo.client import KazooClient -from kazoo.client import KazooState from kazoo.handlers.gevent import SequentialGeventHandler -from prometheus_client import Counter from baseplate.lib import config from baseplate.lib.secrets import SecretsStore from baseplate.server.monkey import gevent_is_patched -SESSION_LOST_TOTAL = Counter( - "live_data_fetcher_lost_sessions_total", - "The number of times a Zookeeper client has had a session be lost.", -) - -SESSION_SUSPENDED_TOTAL = Counter( - "live_data_fetcher_suspended_sessions_total", - "The number of times a Zookeeper client has had a session be suspended.", -) - -SESSION_SUCCESSFUL_TOTAL = Counter( - "live_data_fetcher_connected_sessions_total", - "The number of times a Zookeeper client has successfully established a session.", -) - - -class SessionStatsListener: - """A Kazoo listener that monitors changes in connection state. - Increments an event counter whenever connection state changes in a - Zookeeper connection. - """ - - def __call__(self, state: KazooState) -> None: - if state == KazooState.LOST: - SESSION_LOST_TOTAL.inc() - elif state == KazooState.SUSPENDED: - SESSION_SUSPENDED_TOTAL.inc() - else: - SESSION_SUCCESSFUL_TOTAL.inc() - def zookeeper_client_from_config( secrets: SecretsStore, app_config: config.RawConfig, read_only: Optional[bool] = None @@ -93,7 +61,7 @@ def zookeeper_client_from_config( else: handler = None - client = KazooClient( + return KazooClient( cfg.hosts, timeout=cfg.timeout.total_seconds(), auth_data=auth_data, @@ -122,8 +90,3 @@ def zookeeper_client_from_config( max_delay=60, # never wait longer than this ), ) - - listener = SessionStatsListener() - client.add_listener(listener) - - return client diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index 5d51e5995..9c3dcb2eb 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -1,16 +1,3 @@ -# pylint: disable=wrong-import-position,wrong-import-order -from gevent.monkey import patch_all - -from baseplate.server.monkey import patch_stdlib_queues - -# In order to allow Prometheus to scrape metrics, we need to concurrently -# handle requests to '/metrics' along with the sidecar's execution. -# Monkey patching is used to replace the stdlib sequential versions of functions -# with concurrent versions. It must happen as soon as possible, before the -# sequential versions are imported. -patch_all() -patch_stdlib_queues() - import argparse import configparser import email.utils @@ -23,13 +10,9 @@ from typing import List from typing import Optional -import gevent import requests -from prometheus_client import Counter - -from baseplate import Baseplate -from baseplate.clients.requests import ExternalRequestsClient +from baseplate import __version__ as baseplate_version from baseplate.lib import config from baseplate.lib import metrics from baseplate.lib.events import MAX_EVENT_SIZE @@ -39,7 +22,6 @@ from baseplate.lib.metrics import metrics_client_from_config from baseplate.lib.retry import RetryPolicy from baseplate.server import EnvironmentInterpolation -from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar from baseplate.sidecars import Batch from baseplate.sidecars import BatchFull from baseplate.sidecars import SerializedBatch @@ -61,8 +43,6 @@ # maximum size (in bytes) of a batch of events MAX_BATCH_SIZE = 500 * 1024 -PUBLISHES_COUNT_TOTAL = Counter("eventv2_publishes_total", "total count of published events") - class MaxRetriesError(Exception): pass @@ -119,12 +99,15 @@ def serialize(self) -> SerializedBatch: class BatchPublisher: - def __init__(self, bp: Baseplate, metrics_client: metrics.Client, cfg: Any): - self.baseplate = bp + def __init__(self, metrics_client: metrics.Client, cfg: Any): self.metrics = metrics_client self.url = f"{cfg.collector.scheme}://{cfg.collector.hostname}/v{cfg.collector.version}" self.key_name = cfg.key.name self.key_secret = cfg.key.secret + self.session = requests.Session() + self.session.headers[ + "User-Agent" + ] = f"baseplate.py-{self.__class__.__name__}/{baseplate_version}" def _sign_payload(self, payload: bytes) -> str: digest = hmac.new(self.key_secret, payload, hashlib.sha256).hexdigest() @@ -146,14 +129,15 @@ def publish(self, payload: SerializedBatch) -> None: for _ in RetryPolicy.new(budget=MAX_RETRY_TIME, backoff=RETRY_BACKOFF): try: - with self.baseplate.server_context("post") as context: - with self.metrics.timer("post"): - response = context.http_client.post( - self.url, - headers=headers, - data=compressed_payload, - timeout=POST_TIMEOUT, - ) + with self.metrics.timer("post"): + response = self.session.post( + self.url, + headers=headers, + data=compressed_payload, + timeout=POST_TIMEOUT, + # http://docs.python-requests.org/en/latest/user/advanced/#keep-alive + stream=False, + ) response.raise_for_status() except requests.HTTPError as exc: self.metrics.counter("error.http").increment() @@ -171,7 +155,6 @@ def publish(self, payload: SerializedBatch) -> None: self.metrics.counter("error.io").increment() logger.exception("HTTP Request failed") else: - PUBLISHES_COUNT_TOTAL.inc(payload.item_count) self.metrics.counter("sent").increment(payload.item_count) return @@ -226,21 +209,12 @@ def publish_events() -> None: max_message_size=MAX_EVENT_SIZE, ) - bp = Baseplate() - bp.configure_context( - { - "http_client": ExternalRequestsClient("event_collector"), - } - ) - # pylint: disable=maybe-no-member serializer = SERIALIZER_BY_VERSION[cfg.collector.version]() batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE) - publisher = BatchPublisher(bp, metrics_client, cfg) + publisher = BatchPublisher(metrics_client, cfg) while True: - # allow other routines to execute (specifically handling requests to /metrics) - gevent.sleep(0) message: Optional[bytes] try: @@ -264,5 +238,4 @@ def publish_events() -> None: if __name__ == "__main__": - start_prometheus_exporter_for_sidecar() publish_events() diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 2e6c9a8b4..781f3dfeb 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -1,23 +1,11 @@ """Watch nodes in ZooKeeper and sync their contents to disk on change.""" -# pylint: disable=wrong-import-position,wrong-import-order -from gevent.monkey import patch_all - -from baseplate.server.monkey import patch_stdlib_queues - -# In order to allow Prometheus to scrape metrics, we need to concurrently -# handle requests to '/metrics' along with the sidecar's execution. -# Monkey patching is used to replace the stdlib sequential versions of functions -# with concurrent versions. It must happen as soon as possible, before the -# sequential versions are imported. -patch_all() -patch_stdlib_queues() - import argparse import configparser import json import logging import os import sys +import time from enum import Enum from pathlib import Path @@ -26,7 +14,6 @@ from typing import Optional import boto3 # type: ignore -import gevent from botocore import UNSIGNED # type: ignore from botocore.client import ClientError # type: ignore @@ -39,7 +26,6 @@ from baseplate.lib.live_data.zookeeper import zookeeper_client_from_config from baseplate.lib.secrets import secrets_store_from_config from baseplate.server import EnvironmentInterpolation -from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar logger = logging.getLogger(__name__) @@ -201,7 +187,7 @@ def watch_zookeeper_nodes(zookeeper: KazooClient, nodes: Any) -> NoReturn: # all the interesting stuff is now happening in the Kazoo worker thread # and so we'll just spin and periodically heartbeat to prove we're alive. while True: - gevent.sleep(HEARTBEAT_INTERVAL) + time.sleep(HEARTBEAT_INTERVAL) # see the comment in baseplate.live_data.zookeeper for explanation of # how reconnects work with the background thread. @@ -272,5 +258,4 @@ def main() -> NoReturn: if __name__ == "__main__": - start_prometheus_exporter_for_sidecar() main() diff --git a/baseplate/sidecars/secrets_fetcher.py b/baseplate/sidecars/secrets_fetcher.py index b3ba331f3..2eae5d546 100644 --- a/baseplate/sidecars/secrets_fetcher.py +++ b/baseplate/sidecars/secrets_fetcher.py @@ -60,19 +60,6 @@ write to a new file in whatever format needed, and restart other services if necessary. """ -# pylint: disable=wrong-import-position,wrong-import-order -from gevent.monkey import patch_all - -from baseplate.server.monkey import patch_stdlib_queues - -# In order to allow Prometheus to scrape metrics, we need to concurrently -# handle requests to '/metrics' along with the sidecar's execution. -# Monkey patching is used to replace the stdlib sequential versions of functions -# with concurrent versions. It must happen as soon as possible, before the -# sequential versions are imported. -patch_all() -patch_stdlib_queues() - import argparse import configparser import datetime @@ -81,6 +68,7 @@ import os import posixpath import subprocess +import time import urllib.parse import uuid @@ -90,13 +78,11 @@ from typing import Optional from typing import Tuple -import gevent import requests from baseplate import __version__ as baseplate_version from baseplate.lib import config from baseplate.server import EnvironmentInterpolation -from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar logger = logging.getLogger(__name__) @@ -441,9 +427,8 @@ def main() -> None: last_proc = trigger_callback(cfg.callback, cfg.output.path, last_proc) time_til_expiration = soonest_expiration - datetime.datetime.utcnow() time_to_sleep = time_til_expiration - VAULT_TOKEN_PREFETCH_TIME - gevent.sleep(max(int(time_to_sleep.total_seconds()), 1)) + time.sleep(max(int(time_to_sleep.total_seconds()), 1)) if __name__ == "__main__": - start_prometheus_exporter_for_sidecar() main() diff --git a/baseplate/sidecars/trace_publisher.py b/baseplate/sidecars/trace_publisher.py index b6d426d29..b7d91859c 100644 --- a/baseplate/sidecars/trace_publisher.py +++ b/baseplate/sidecars/trace_publisher.py @@ -1,29 +1,13 @@ -# pylint: disable=wrong-import-position,wrong-import-order -from gevent.monkey import patch_all - -from baseplate.server.monkey import patch_stdlib_queues - -# In order to allow Prometheus to scrape metrics, we need to concurrently -# handle requests to '/metrics' along with the sidecar's execution. -# Monkey patching is used to replace the stdlib sequential versions of functions -# with concurrent versions. It must happen as soon as possible, before the -# sequential versions are imported. -patch_all() -patch_stdlib_queues() - import argparse import configparser import logging +import urllib.parse from typing import Optional -import gevent import requests -from prometheus_client import Counter - -from baseplate import Baseplate -from baseplate.clients.requests import InternalRequestsClient +from baseplate import __version__ as baseplate_version from baseplate.lib import config from baseplate.lib import metrics from baseplate.lib.message_queue import MessageQueue @@ -33,7 +17,6 @@ from baseplate.observers.tracing import MAX_QUEUE_SIZE from baseplate.observers.tracing import MAX_SPAN_SIZE from baseplate.server import EnvironmentInterpolation -from baseplate.server.prometheus import start_prometheus_exporter_for_sidecar from baseplate.sidecars import BatchFull from baseplate.sidecars import RawJSONBatch from baseplate.sidecars import SerializedBatch @@ -51,14 +34,9 @@ # messages to batch MAX_BATCH_AGE = 1 -# max number of connections to Zipkin server -MAX_NUM_CONNS = 5 - # maximum number of retries when publishing traces RETRY_LIMIT_DEFAULT = 10 -PUBLISHES_COUNT_TOTAL = Counter("zipkin_trace_publishes_total", "total count of published traces") - class MaxRetriesError(Exception): pass @@ -74,13 +52,20 @@ class ZipkinPublisher: def __init__( self, - bp: Baseplate, zipkin_api_url: str, metrics_client: metrics.Client, post_timeout: int = POST_TIMEOUT_DEFAULT, retry_limit: int = RETRY_LIMIT_DEFAULT, + num_conns: int = 5, ): - self.baseplate = bp + + adapter = requests.adapters.HTTPAdapter(pool_connections=num_conns, pool_maxsize=num_conns) + parsed_url = urllib.parse.urlparse(zipkin_api_url) + self.session = requests.Session() + self.session.headers[ + "User-Agent" + ] = f"baseplate.py-{self.__class__.__name__}/{baseplate_version}" + self.session.mount(f"{parsed_url.scheme}://", adapter) self.endpoint = f"{zipkin_api_url}/spans" self.metrics = metrics_client self.post_timeout = post_timeout @@ -101,14 +86,14 @@ def publish(self, payload: SerializedBatch) -> None: } for _ in RetryPolicy.new(attempts=self.retry_limit): try: - with self.baseplate.server_context("post") as context: - with self.metrics.timer("post"): - response = context.http_client.post( - self.endpoint, - data=payload.serialized, - headers=headers, - timeout=self.post_timeout, - ) + with self.metrics.timer("post"): + response = self.session.post( + self.endpoint, + data=payload.serialized, + headers=headers, + timeout=self.post_timeout, + stream=False, + ) response.raise_for_status() except requests.HTTPError as exc: self.metrics.counter("error.http").increment() @@ -125,7 +110,6 @@ def publish(self, payload: SerializedBatch) -> None: self.metrics.counter("error.io").increment() logger.exception("HTTP Request failed") else: - PUBLISHES_COUNT_TOTAL.inc(payload.item_count) self.metrics.counter("sent").increment(payload.item_count) return @@ -182,29 +166,17 @@ def publish_traces() -> None: max_message_size=MAX_SPAN_SIZE, ) - bp = Baseplate() - bp.configure_context( - { - "http_client": InternalRequestsClient( - "trace_collector", pool_connections=MAX_NUM_CONNS, pool_maxsize=MAX_NUM_CONNS - ), - } - ) - # pylint: disable=maybe-no-member inner_batch = TraceBatch(max_size=publisher_cfg.max_batch_size) batcher = TimeLimitedBatch(inner_batch, MAX_BATCH_AGE) metrics_client = metrics_client_from_config(publisher_raw_cfg) publisher = ZipkinPublisher( - bp, publisher_cfg.zipkin_api_url.address, metrics_client, post_timeout=publisher_cfg.post_timeout, ) while True: - # allow other routines to execute (specifically handling requests to /metrics) - gevent.sleep(0) message: Optional[bytes] try: @@ -214,19 +186,12 @@ def publish_traces() -> None: try: batcher.add(message) - continue except BatchFull: - pass - - serialized = batcher.serialize() - try: + serialized = batcher.serialize() publisher.publish(serialized) - except Exception: - logger.exception("Traces publishing failed.") - batcher.reset() - batcher.add(message) + batcher.reset() + batcher.add(message) if __name__ == "__main__": - start_prometheus_exporter_for_sidecar() publish_traces() diff --git a/tests/unit/lib/events/publisher_tests.py b/tests/unit/lib/events/publisher_tests.py index 28f225cf5..fc1d84f8b 100644 --- a/tests/unit/lib/events/publisher_tests.py +++ b/tests/unit/lib/events/publisher_tests.py @@ -76,10 +76,8 @@ def test_v2j(self): class PublisherTests(unittest.TestCase): - @mock.patch("baseplate.clients.requests.BaseplateSession", autospec=True) - @mock.patch("baseplate.RequestContext", autospec=True) - @mock.patch("baseplate.Baseplate", autospec=True) - def setUp(self, bp, context, session): + @mock.patch("requests.Session", autospec=True) + def setUp(self, Session): self.config = config.ConfigNamespace() self.config.collector = config.ConfigNamespace() self.config.collector.hostname = "test.local" @@ -89,14 +87,12 @@ def setUp(self, bp, context, session): self.config.key.name = "TestKey" self.config.key.secret = b"hunter2" - bp.server_context.return_value.__enter__.return_value = context - context.http_client = session - self.baseplate = bp - self.session = session + self.session = Session.return_value + self.session.headers = {} self.metrics_client = mock.MagicMock(autospec=metrics.Client) - self.publisher = event_publisher.BatchPublisher(bp, self.metrics_client, self.config) + self.publisher = event_publisher.BatchPublisher(self.metrics_client, self.config) def test_empty_batch(self): self.publisher.publish(SerializedBatch(item_count=0, serialized=b"")) @@ -118,7 +114,7 @@ def test_publish_success(self): "key=TestKey, mac=7c46d56b99cd4cb05e08238c1d4c10a2f330795e9d7327f17cc66fd206bf1179", ) - @mock.patch("gevent.sleep") + @mock.patch("time.sleep") def test_fail_on_client_error(self, mock_sleep): self.session.post.side_effect = [ requests.HTTPError(400, response=mock.Mock(status_code=400)) diff --git a/tests/unit/observers/tracing/publisher_tests.py b/tests/unit/observers/tracing/publisher_tests.py index 41142b5d4..f2f53bedc 100644 --- a/tests/unit/observers/tracing/publisher_tests.py +++ b/tests/unit/observers/tracing/publisher_tests.py @@ -10,24 +10,17 @@ class ZipkinPublisherTest(unittest.TestCase): - @mock.patch("baseplate.clients.requests.BaseplateSession", autospec=True) - @mock.patch("baseplate.RequestContext", autospec=True) - @mock.patch("baseplate.Baseplate", autospec=True) - def setUp(self, bp, context, session): + @mock.patch("requests.Session", autospec=True) + def setUp(self, mock_Session): + self.session = mock_Session.return_value + self.session.headers = {} self.metrics_client = mock.MagicMock(autospec=metrics.Client) self.zipkin_api_url = "http://test.local/api/v2" - - bp.server_context.return_value.__enter__.return_value = context - context.http_client = session - self.baseplate = bp - self.session = session - - self.publisher = trace_publisher.ZipkinPublisher( - bp, self.zipkin_api_url, self.metrics_client - ) + self.publisher = trace_publisher.ZipkinPublisher(self.zipkin_api_url, self.metrics_client) def test_initialization(self): self.assertEqual(self.publisher.endpoint, f"{self.zipkin_api_url}/spans") + self.publisher.session.mount.assert_called_with("http://", mock.ANY) def test_empty_batch(self): self.publisher.publish(SerializedBatch(item_count=0, serialized=b""))