Skip to content

Commit

Permalink
Event-Publisher Flush Queue on Shutdown (#767)
Browse files Browse the repository at this point in the history
vent-Publisher sidecar is not flushing the entire POSIX queue when receiving a SIGINT or SIGTERM. This PR creates a signal handler which will flush the queue whenever a SIGINT or SIGTERM is received. If this is not addressed, some applications that use the Event Publisher sidecar may experience dropped messages if their Event Publisher sidecar is terminated and the queue still has messages (ex: during a deployment).
  • Loading branch information
atai92 authored and KTAtkinson committed Mar 16, 2023
1 parent 86280f3 commit fcc0c45
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
4 changes: 4 additions & 0 deletions baseplate/sidecars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def age(self) -> float:
return 0
return time.time() - self.batch_start

@property
def is_ready(self) -> bool:
return self.age >= self.max_age

def add(self, item: Optional[bytes]) -> None:
if self.age >= self.max_age:
raise BatchFull
Expand Down
49 changes: 37 additions & 12 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import hashlib
import hmac
import logging
import signal
import sys

from types import FrameType
from typing import Any
from typing import List
from typing import Optional
Expand Down Expand Up @@ -164,6 +167,16 @@ def publish(self, payload: SerializedBatch) -> None:
SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}


def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None:
"""Serializes batch, publishes it using the publisher, and then resets the batch for more messages."""
serialized_batch = batcher.serialize()
try:
publisher.publish(serialized_batch)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()


def publish_events() -> None:
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
Expand Down Expand Up @@ -214,6 +227,28 @@ def publish_events() -> None:
batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE)
publisher = BatchPublisher(metrics_client, cfg)

def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None:
"""Signal handler for flushing messages from the queue and publishing them."""
message: Optional[bytes]
logger.info("Shutdown signal received. Flushing events...")

while True:
try:
message = event_queue.get(timeout=0.2)
except TimedOutError:
if len(batcher.serialize()) > 0:
serialize_and_publish_batch(publisher, batcher)
break

if batcher.is_ready:
serialize_and_publish_batch(publisher, batcher)
batcher.add(message)
sys.exit(0)

for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, flush_queue_signal_handler)
signal.siginterrupt(sig, False)

while True:
message: Optional[bytes]

Expand All @@ -222,18 +257,8 @@ def publish_events() -> None:
except TimedOutError:
message = None

try:
batcher.add(message)
continue
except BatchFull:
pass

serialized = batcher.serialize()
try:
publisher.publish(serialized)
except Exception:
logger.exception("Events publishing failed.")
batcher.reset()
if batcher.is_ready:
serialize_and_publish_batch(publisher, batcher)
batcher.add(message)


Expand Down

0 comments on commit fcc0c45

Please sign in to comment.