Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue flushing signal handler #792

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
47 changes: 41 additions & 6 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import hashlib
import hmac
import logging
import signal
import sys

from types import FrameType
from typing import Any
from typing import List
from typing import Optional
Expand Down Expand Up @@ -164,6 +167,16 @@ def publish(self, payload: SerializedBatch) -> None:
SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}


def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None:
"""Serializes batch, publishes it using the publisher, and then resets the batch for more messages."""
serialized_batch = batcher.serialize()
try:
publisher.publish(serialized_batch)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()


def publish_events() -> None:
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
Expand Down Expand Up @@ -214,6 +227,33 @@ def publish_events() -> None:
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(metrics_client, cfg)

def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
"""Signal handler for flushing messages from the queue and publishing them."""
message: Optional[bytes]
logger.info("Shutdown signal received. Flushing events...")

while True:
try:
message = event_queue.get(timeout=0.2)
except TimedOutError:
if len(batcher.serialize()) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length here will always be greater than 0 because we add a header. We might want to add a items_count function to the batcher to count the number of items currently in the queue. It could use the length of batcher._items.

serialize_and_publish_batch(publisher, batcher)
break

try:
batcher.add(message)
continue
except BatchFull:
pass

serialize_and_publish_batch(publisher, batcher)
batcher.add(message)
sys.exit(0)

for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, flush_queue_signal_handler)
signal.siginterrupt(sig, False)

while True:
message: Optional[bytes]

Expand All @@ -228,12 +268,7 @@ def publish_events() -> None:
except BatchFull:
pass

serialized = batcher.serialize()
try:
publisher.publish(serialized)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()
serialize_and_publish_batch(publisher, batcher)
batcher.add(message)


Expand Down