Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide alternative in-memory queue for publishing events #763

Open
wants to merge 79 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
4918353
add in memory queue option
Dec 7, 2022
7558da7
cleanup
Dec 7, 2022
be54be0
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Dec 7, 2022
b71fac2
lint, cleanup
Dec 7, 2022
7916749
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Dec 7, 2022
84c50c9
linting
Dec 7, 2022
8cd02e9
linting
Dec 7, 2022
719749a
linting :sob:
Dec 7, 2022
7e84998
linting
Dec 8, 2022
057f295
wip
Dec 14, 2022
3740cc6
wip
Dec 20, 2022
1b1d5cf
thrift updates
Dec 23, 2022
5fd17f5
working
Jan 3, 2023
1d9510b
cleanup, wipg
Jan 3, 2023
6628c1c
linting
Jan 3, 2023
6834daa
linting
Jan 4, 2023
efffbcb
linting, remove docker-compose change
Jan 4, 2023
afcbd9f
dockerfile
Jan 4, 2023
de2822d
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Jan 4, 2023
05a10a7
typo
Jan 4, 2023
6632221
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Jan 4, 2023
257c0bb
wip
Jan 9, 2023
c31eb31
updates
Jan 9, 2023
7b93a00
cleanup exceptions
Jan 9, 2023
f522fce
linting
Jan 10, 2023
11e498e
linting
Jan 10, 2023
27d4171
linting
Jan 10, 2023
78c3102
cleanup
Jan 10, 2023
2e364d1
updates
Jan 19, 2023
4c4c938
EventQueue takes n optionala queue
Jan 24, 2023
95a225e
clarify comments
Feb 13, 2023
018134e
linting
Feb 13, 2023
3ba5648
linting
Feb 13, 2023
e1345b0
linting
Feb 13, 2023
276e768
linting :sob:
Feb 13, 2023
50226db
wip need to test metrics
Feb 23, 2023
e28ed06
move timer start
Feb 24, 2023
fc95a77
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Feb 24, 2023
7f6538b
import order
Feb 24, 2023
72f5c1d
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Feb 24, 2023
406161f
linting
Feb 24, 2023
5a459a9
wip
Feb 27, 2023
3eee3d9
x
Mar 1, 2023
2f92bb0
remove thrift get
Mar 2, 2023
da30d5e
remove get
Mar 2, 2023
bf367e4
linting
Mar 2, 2023
2d17807
linting
Mar 2, 2023
3a62e2d
linting
Mar 2, 2023
eb71f4c
cleanup
Mar 2, 2023
1ccd7e2
lint
Mar 2, 2023
48a7624
comment
Mar 2, 2023
53a4df0
x
Mar 2, 2023
210cbb9
reduce pool timeout for pool test
Mar 2, 2023
654ad09
newline
Mar 2, 2023
272f80d
remove pool test
Mar 2, 2023
ffaf766
import
Mar 3, 2023
19f26cf
cleanup
Mar 6, 2023
d49b2d8
black
Mar 6, 2023
9e13d34
linting
Mar 6, 2023
6d8d967
lint
Mar 6, 2023
cc9714f
linting
Mar 6, 2023
c77c67f
remove create_queue
Mar 6, 2023
b8cfff8
l
Mar 6, 2023
442c20d
mypy
Mar 7, 2023
5ec4da3
black
Mar 7, 2023
ed0d328
cleanup and monkeypatch gevent
Mar 9, 2023
f171e10
exception
Mar 9, 2023
38464cf
lint
Mar 15, 2023
a456dc8
catch gevent errors
Mar 16, 2023
18242c7
lint
Mar 16, 2023
855de0f
remove from trace publisher and sidecar init
Mar 17, 2023
097d4d0
Merge branch 'develop' into replace_posix_queue_event_publishing
Mar 18, 2023
b2c2ff0
lint
Mar 20, 2023
8da60eb
lint
Mar 20, 2023
608a446
fix
Mar 20, 2023
adea15c
add scripts, remove signal handling for remote queue
Mar 20, 2023
14ecb2e
lint
Mar 20, 2023
86f3eca
typos
Mar 21, 2023
9ba9170
x
Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions baseplate/lib/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import RemoteMessageQueue
from baseplate.lib.message_queue import PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError


Expand Down Expand Up @@ -93,10 +94,15 @@ class EventQueue(ContextFactory, config.Parser, Generic[T]):

"""

def __init__(self, name: str, event_serializer: Callable[[T], bytes]):
self.queue = MessageQueue(
"/events-" + name, max_messages=MAX_QUEUE_SIZE, max_message_size=MAX_EVENT_SIZE
)
def __init__(
self, name: str, event_serializer: Callable[[T], bytes], use_in_memory_queue: bool = False
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
):
if use_in_memory_queue:
self.queue = RemoteMessageQueue("/events-" + name, max_messages=MAX_QUEUE_SIZE)
else:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
self.queue = PosixMessageQueue( # type: ignore
"/events-" + name, max_messages=MAX_QUEUE_SIZE, max_message_size=MAX_EVENT_SIZE
)
self.serialize_event = event_serializer

def put(self, event: T) -> None:
Expand Down
144 changes: 116 additions & 28 deletions baseplate/lib/message_queue.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
"""A Gevent-friendly POSIX message queue."""
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
import abc
import queue as q
import select

from typing import Optional
from baseplate.thrift import RemoteMessageQueueService

import posix_ipc

from baseplate.lib.retry import RetryPolicy

from thrift.Thrift import TBinaryProtocol
from thrift.Thrift import TSocket
from thrift.transport import TTransport


class MessageQueueError(Exception):
"""Base exception for message queue related errors."""
Expand Down Expand Up @@ -35,7 +42,51 @@ def __init__(self, inner: Exception):
super().__init__(f"{inner} (check `ulimit -q`?)")


class MessageQueue:
class MessageQueue(abc.ABC):
"""Abstract class for an inter-process message queue."""

@abc.abstractmethod
def get(self, timeout: Optional[float] = None) -> bytes:
"""Read a message from the queue.

:param timeout: If the queue is empty, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was empty for the allowed
duration of the call.

"""
sydjryan marked this conversation as resolved.
Show resolved Hide resolved

@abc.abstractmethod
def put(self, message: bytes, timeout: Optional[float] = None) -> None:
"""Add a message to the queue.

:param timeout: If the queue is full, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was full for the allowed
duration of the call.

"""

@abc.abstractmethod
def unlink(self) -> None:
"""Remove the queue from the system.

The queue will not leave until the last active user closes it.

"""

@abc.abstractmethod
def close(self) -> None:
"""Close the queue, freeing related resources.

This must be called explicitly if queues are created/destroyed on the
fly. It is not automatically called when the object is reclaimed by
Python.

"""


class PosixMessageQueue(MessageQueue):
"""A Gevent-friendly (but not required) inter process message queue.

``name`` should be a string of up to 255 characters consisting of an
Expand All @@ -62,16 +113,9 @@ def __init__(self, name: str, max_messages: int, max_message_size: int):
except OSError as exc:
raise MessageQueueOSError(exc)
self.queue.block = False
self.name = name
sydjryan marked this conversation as resolved.
Show resolved Hide resolved

def get(self, timeout: Optional[float] = None) -> bytes:
"""Read a message from the queue.

:param timeout: If the queue is empty, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was empty for the allowed
duration of the call.

"""
for time_remaining in RetryPolicy.new(budget=timeout):
try:
message, _ = self.queue.receive()
Expand All @@ -84,14 +128,6 @@ def get(self, timeout: Optional[float] = None) -> bytes:
raise TimedOutError

def put(self, message: bytes, timeout: Optional[float] = None) -> None:
"""Add a message to the queue.

:param timeout: If the queue is full, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was full for the allowed
duration of the call.

"""
for time_remaining in RetryPolicy.new(budget=timeout):
try:
return self.queue.send(message=message)
Expand All @@ -103,22 +139,64 @@ def put(self, message: bytes, timeout: Optional[float] = None) -> None:
raise TimedOutError

def unlink(self) -> None:
"""Remove the queue from the system.
self.queue.unlink()

The queue will not leave until the last active user closes it.
def close(self) -> None:
self.queue.close()

"""
self.queue.unlink()

class InMemoryMessageQueue(MessageQueue):
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
"""An in-memory inter process message queue.""" # Used by the sidecar

def __init__(self, name: str, max_messages: int):
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
self.queue: q.Queue = q.Queue(max_messages)
self.max_messages = max_messages
self.name = name
sydjryan marked this conversation as resolved.
Show resolved Hide resolved

def get(self, timeout: Optional[float] = None) -> bytes:
try:
message = self.queue.get(timeout=timeout)
self.queue.task_done()
return message
except q.Empty:
raise TimedOutError

def put(self, message: bytes, timeout: Optional[float] = None) -> None:
KTAtkinson marked this conversation as resolved.
Show resolved Hide resolved
try:
self.queue.put(message, timeout=timeout)
except q.Full:
raise TimedOutError

def unlink(self) -> None:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
"""Not implemented for in-memory queue"""

def close(self) -> None:
"""Close the queue, freeing related resources.
"""Not implemented for in-memory queue"""

This must be called explicitly if queues are created/destroyed on the
fly. It is not automatically called when the object is reclaimed by
Python.

"""
self.queue.close()
class RemoteMessageQueue(MessageQueue):
def __init__(self, max_messages: int): # Connect to the sidecar and instantiate a remote queue
transport = TSocket.TSocket( "localhost" , 9090) # todo: how do I connect to the sidecar? I dont think this is right
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)

self.client = RemoteMessageQueueService.Client(protocol)
# todo: create queue?

# Connect to server
self.transport.open()

def get(self, timeout: Optional[float] = None) -> bytes:
return self.client.get(timeout)
sydjryan marked this conversation as resolved.
Show resolved Hide resolved

def put(self, message: bytes, timeout: Optional[float] = None) -> None:
return self.client.put(message, timeout)

def unlink(self) -> None:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
"""Not implemented for remote queue"""

def close(self) -> None:
self.transport.close()


def queue_tool() -> None:
Expand All @@ -140,6 +218,11 @@ def queue_tool() -> None:
help="if creating the queue, what to set the maximum message size to",
)
parser.add_argument("queue_name", help="the name of the queue to consume")
parser.add_argument(
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
"use_in_memory_queue",
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
default=False,
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
help="whether to use an in-memory queue or a posix queue",
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
)

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
Expand All @@ -166,7 +249,12 @@ def queue_tool() -> None:

args = parser.parse_args()

queue = MessageQueue(args.queue_name, args.max_messages, args.max_message_size)
if args.use_in_memory_queue:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
queue = RemoteMessageQueue(args.queue_name, args.max_messages)
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
else:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
queue = PosixMessageQueue( # type: ignore
args.queue_name, args.max_messages, args.max_message_size
)

if args.mode == "read":
while True:
Expand Down
29 changes: 17 additions & 12 deletions baseplate/observers/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from baseplate import SpanObserver
from baseplate.lib import config
from baseplate.lib import warn_deprecated
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import RemoteMessageQueue
from baseplate.lib.message_queue import PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError
from baseplate.observers.timeout import ServerTimeout

Expand Down Expand Up @@ -539,18 +540,24 @@ class TraceQueueFullError(Exception):


class SidecarRecorder(Recorder):
"""Interface for recording spans to a POSIX message queue.
"""Interface for recording spans to a message queue.

The SidecarRecorder serializes spans to a string representation before
adding them to the queue.
"""

def __init__(self, queue_name: str):
self.queue = MessageQueue(
"/traces-" + queue_name,
max_messages=MAX_QUEUE_SIZE,
max_message_size=MAX_SPAN_SIZE,
)
def __init__(self, queue_name: str, use_in_memory_queue: bool = False):
if use_in_memory_queue:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
self.queue = RemoteMessageQueue(
"/traces-" + queue_name,
max_messages=MAX_QUEUE_SIZE,
)
else:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
self.queue = PosixMessageQueue( # type: ignore
"/traces-" + queue_name,
max_messages=MAX_QUEUE_SIZE,
max_message_size=MAX_SPAN_SIZE,
)

def send(self, span: TraceSpanObserver) -> None:
# Don't raise exceptions from here. This is called in the
Expand All @@ -561,17 +568,15 @@ def send(self, span: TraceSpanObserver) -> None:
"Trace too big. Traces published to %s are not allowed to be larger "
"than %d bytes. Received trace is %d bytes. This can be caused by "
"an excess amount of tags or a large amount of child spans.",
self.queue.queue.name,
self.queue.name,
MAX_SPAN_SIZE,
len(serialized_str),
)
return
try:
self.queue.put(serialized_str, timeout=0)
except TimedOutError:
logger.warning(
"Trace queue %s is full. Is trace sidecar healthy?", self.queue.queue.name
)
logger.warning("Trace queue %s is full. Is trace sidecar healthy?", self.queue.name)


def tracing_client_from_config(
Expand Down
56 changes: 49 additions & 7 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,28 @@

import requests

from baseplate import __version__ as baseplate_version
from baseplate import RequestContext, __version__ as baseplate_version
from baseplate.lib import config
from baseplate.lib import metrics
from baseplate.lib.events import MAX_EVENT_SIZE
from baseplate.lib.events import MAX_QUEUE_SIZE
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import InMemoryMessageQueue
from baseplate.lib.message_queue import PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError
from baseplate.lib.metrics import metrics_client_from_config
from baseplate.lib.retry import RetryPolicy
from baseplate.server import EnvironmentInterpolation
from baseplate.thrift import RemoteMessageQueueService
from baseplate.sidecars import Batch
from baseplate.sidecars import BatchFull
from baseplate.sidecars import SerializedBatch
from baseplate.sidecars import TimeLimitedBatch

from thrift.Thrift import TServer
from thrift.Thrift import TBinaryProtocol
from thrift.Thrift import TSocket
from thrift.transport import TTransport


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,6 +168,21 @@ def publish(self, payload: SerializedBatch) -> None:
raise MaxRetriesError("could not sent batch")



class RemoteMessageQueueHandler: # From the sidecar, create the queue and define get/put using the InMemoryQueue implementation
def is_healthy(self, context: RequestContext) -> bool:
return True

def __init__(self, name: str, max_messages: int):
self.queue = InMemoryMessageQueue(name, max_messages)

def get(self, timeout: Optional[float] = None) -> bytes:
return self.queue.get(timeout)

def put(self, message: bytes, timeout: Optional[float] = None) -> None:
self.queue.put(message, timeout)


SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch}


Expand All @@ -174,6 +196,11 @@ def publish_events() -> None:
default="main",
help="name of event queue / publisher config (default: main)",
)
arg_parser.add_argument(
"--use-in-memory-queue",
default=False,
help="use an in memory queue instead of a posix queue",
)
arg_parser.add_argument(
"--debug", default=False, action="store_true", help="enable debug logging"
)
Expand Down Expand Up @@ -203,11 +230,26 @@ def publish_events() -> None:

metrics_client = metrics_client_from_config(raw_config)

event_queue = MessageQueue(
"/events-" + args.queue_name,
max_messages=cfg.max_queue_size,
max_message_size=MAX_EVENT_SIZE,
)
if args.use_in_memory_queue:
# start a thrift server that will be used to communicate with the main baseplate app
# this should not happen here, where are sidecars running?
processor = RemoteMessageQueueService.processor(RemoteMessageQueueHandler())
transport = TSocket.TServerSocket(host='127.0.0.1', port=9090)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This port number should proably use a constant that is shared with the client code, to remove the chance of them using different values.

tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
print('Starting the Message Queue server...')
server.serve()
print('done.')


else:
event_queue = PosixMessageQueue( # type: ignore
"/events-" + args.queue_name,
max_messages=cfg.max_queue_size,
max_message_size=MAX_EVENT_SIZE,
)

# pylint: disable=maybe-no-member
serializer = SERIALIZER_BY_VERSION[cfg.collector.version]()
Expand Down
Loading