Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide alternative in-memory queue for publishing events #763

Open
wants to merge 79 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
4918353
add in memory queue option
Dec 7, 2022
7558da7
cleanup
Dec 7, 2022
be54be0
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Dec 7, 2022
b71fac2
lint, cleanup
Dec 7, 2022
7916749
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Dec 7, 2022
84c50c9
linting
Dec 7, 2022
8cd02e9
linting
Dec 7, 2022
719749a
linting :sob:
Dec 7, 2022
7e84998
linting
Dec 8, 2022
057f295
wip
Dec 14, 2022
3740cc6
wip
Dec 20, 2022
1b1d5cf
thrift updates
Dec 23, 2022
5fd17f5
working
Jan 3, 2023
1d9510b
cleanup, wipg
Jan 3, 2023
6628c1c
linting
Jan 3, 2023
6834daa
linting
Jan 4, 2023
efffbcb
linting, remove docker-compose change
Jan 4, 2023
afcbd9f
dockerfile
Jan 4, 2023
de2822d
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Jan 4, 2023
05a10a7
typo
Jan 4, 2023
6632221
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Jan 4, 2023
257c0bb
wip
Jan 9, 2023
c31eb31
updates
Jan 9, 2023
7b93a00
cleanup exceptions
Jan 9, 2023
f522fce
linting
Jan 10, 2023
11e498e
linting
Jan 10, 2023
27d4171
linting
Jan 10, 2023
78c3102
cleanup
Jan 10, 2023
2e364d1
updates
Jan 19, 2023
4c4c938
EventQueue takes n optionala queue
Jan 24, 2023
95a225e
clarify comments
Feb 13, 2023
018134e
linting
Feb 13, 2023
3ba5648
linting
Feb 13, 2023
e1345b0
linting
Feb 13, 2023
276e768
linting :sob:
Feb 13, 2023
50226db
wip need to test metrics
Feb 23, 2023
e28ed06
move timer start
Feb 24, 2023
fc95a77
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Feb 24, 2023
7f6538b
import order
Feb 24, 2023
72f5c1d
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Feb 24, 2023
406161f
linting
Feb 24, 2023
5a459a9
wip
Feb 27, 2023
3eee3d9
x
Mar 1, 2023
2f92bb0
remove thrift get
Mar 2, 2023
da30d5e
remove get
Mar 2, 2023
bf367e4
linting
Mar 2, 2023
2d17807
linting
Mar 2, 2023
3a62e2d
linting
Mar 2, 2023
eb71f4c
cleanup
Mar 2, 2023
1ccd7e2
lint
Mar 2, 2023
48a7624
comment
Mar 2, 2023
53a4df0
x
Mar 2, 2023
210cbb9
reduce pool timeout for pool test
Mar 2, 2023
654ad09
newline
Mar 2, 2023
272f80d
remove pool test
Mar 2, 2023
ffaf766
import
Mar 3, 2023
19f26cf
cleanup
Mar 6, 2023
d49b2d8
black
Mar 6, 2023
9e13d34
linting
Mar 6, 2023
6d8d967
lint
Mar 6, 2023
cc9714f
linting
Mar 6, 2023
c77c67f
remove create_queue
Mar 6, 2023
b8cfff8
l
Mar 6, 2023
442c20d
mypy
Mar 7, 2023
5ec4da3
black
Mar 7, 2023
ed0d328
cleanup and monkeypatch gevent
Mar 9, 2023
f171e10
exception
Mar 9, 2023
38464cf
lint
Mar 15, 2023
a456dc8
catch gevent errors
Mar 16, 2023
18242c7
lint
Mar 16, 2023
855de0f
remove from trace publisher and sidecar init
Mar 17, 2023
097d4d0
Merge branch 'develop' into replace_posix_queue_event_publishing
Mar 18, 2023
b2c2ff0
lint
Mar 20, 2023
8da60eb
lint
Mar 20, 2023
608a446
fix
Mar 20, 2023
adea15c
add scripts, remove signal handling for remote queue
Mar 20, 2023
14ecb2e
lint
Mar 20, 2023
86f3eca
typos
Mar 21, 2023
9ba9170
x
Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions baseplate/lib/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import InMemoryMessageQueue, PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError


Expand Down Expand Up @@ -93,10 +93,13 @@ class EventQueue(ContextFactory, config.Parser, Generic[T]):

"""

def __init__(self, name: str, event_serializer: Callable[[T], bytes]):
self.queue = MessageQueue(
"/events-" + name, max_messages=MAX_QUEUE_SIZE, max_message_size=MAX_EVENT_SIZE
)
def __init__(self, name: str, event_serializer: Callable[[T], bytes], use_in_memory_queue: bool = False):
if use_in_memory_queue:
self.queue = InMemoryMessageQueue("/events-" + name, max_messages=MAX_QUEUE_SIZE)
else:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
self.queue = PosixMessageQueue(
"/events-" + name, max_messages=MAX_QUEUE_SIZE, max_message_size=MAX_EVENT_SIZE
)
self.serialize_event = event_serializer

def put(self, event: T) -> None:
Expand Down
98 changes: 94 additions & 4 deletions baseplate/lib/message_queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""A Gevent-friendly POSIX message queue."""
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
from abc import abstractmethod
import queue
import select

from typing import Optional
Expand Down Expand Up @@ -34,9 +36,37 @@ class MessageQueueOSError(OSError):
def __init__(self, inner: Exception):
super().__init__(f"{inner} (check `ulimit -q`?)")


class MessageQueue:
"""A Gevent-friendly (but not required) inter process message queue.
@abstractmethod
def __init__(self, name: str, max_messages: int, max_message_size: int):
pass

@abstractmethod
def get(self, timeout: Optional[float] = None) -> bytes:
"""Read a message from the queue.

:param timeout: If the queue is empty, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was empty for the allowed
duration of the call.

"""
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
pass

@abstractmethod
def put(self, message: bytes, timeout: Optional[float] = None) -> None:
"""Add a message to the queue.

:param timeout: If the queue is full, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was full for the allowed
duration of the call.

"""
pass

class InMemoryMessageQueue(MessageQueue):
"""An in-memory inter process message queue.

``name`` should be a string of up to 255 characters consisting of an
initial slash, followed by one or more characters, none of which are
Expand All @@ -47,7 +77,61 @@ class MessageQueue:
support this.

"""
def __init__(self, name: str, max_messages: int):
self.queue = queue.Queue(max_messages)
self.max_messages = max_messages
self.name = name

def get(self, timeout: Optional[float] = None) -> bytes:
"""Read a message from the queue.

:param timeout: If the queue is empty, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was empty for the allowed
duration of the call.

"""
try:
message = self.queue.get(timeout=timeout)
self.queue.task_done()
return message
except queue.Empty:
raise TimedOutError

def put(self, message: bytes, timeout: Optional[float] = None) -> None:
"""Add a message to the queue.

:param timeout: If the queue is full, the call will block up to
``timeout`` seconds or forever if ``None``.
:raises: :py:exc:`TimedOutError` The queue was full for the allowed
duration of the call.

"""
try:
return self.queue.put(message, timeout=timeout)
except queue.Full:
raise TimedOutError

def unlink(self) -> None:
"""Not implemented for in-memory queue"""
pass

def close(self) -> None:
"""Not implemented for in-memory queue"""
pass

class PosixMessageQueue(MessageQueue):
"""A Gevent-friendly (but not required) inter process message queue.

``name`` should be a string of up to 255 characters consisting of an
initial slash, followed by one or more characters, none of which are
slashes.

Note: This relies on POSIX message queues being available and
select(2)-able like other file descriptors. Not all operating systems
support this.

"""
def __init__(self, name: str, max_messages: int, max_message_size: int):
try:
self.queue = posix_ipc.MessageQueue(
Expand All @@ -62,6 +146,7 @@ def __init__(self, name: str, max_messages: int, max_message_size: int):
except OSError as exc:
raise MessageQueueOSError(exc)
self.queue.block = False
self.name = name
sydjryan marked this conversation as resolved.
Show resolved Hide resolved

def get(self, timeout: Optional[float] = None) -> bytes:
"""Read a message from the queue.
Expand Down Expand Up @@ -101,7 +186,7 @@ def put(self, message: bytes, timeout: Optional[float] = None) -> None:
select.select([], [self.queue.mqd], [], time_remaining)

raise TimedOutError

def unlink(self) -> None:
"""Remove the queue from the system.

Expand All @@ -119,6 +204,7 @@ def close(self) -> None:

"""
self.queue.close()



def queue_tool() -> None:
Expand All @@ -140,6 +226,7 @@ def queue_tool() -> None:
help="if creating the queue, what to set the maximum message size to",
)
parser.add_argument("queue_name", help="the name of the queue to consume")
parser.add_argument("use_in_memory_queue", default=False, help="whether to use an in-memory queue or a posix queue")

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
Expand All @@ -166,7 +253,10 @@ def queue_tool() -> None:

args = parser.parse_args()

queue = MessageQueue(args.queue_name, args.max_messages, args.max_message_size)
if args.use_in_memory_queue:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
queue = InMemoryMessageQueue(args.queue_name, args.max_messages)
else:
queue = PosixMessageQueue(args.queue_name, args.max_messages, args.max_message_size)

if args.mode == "read":
while True:
Expand Down
26 changes: 16 additions & 10 deletions baseplate/observers/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from baseplate import SpanObserver
from baseplate.lib import config
from baseplate.lib import warn_deprecated
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import InMemoryMessageQueue, PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError
from baseplate.observers.timeout import ServerTimeout

Expand Down Expand Up @@ -539,18 +539,24 @@ class TraceQueueFullError(Exception):


class SidecarRecorder(Recorder):
"""Interface for recording spans to a POSIX message queue.
"""Interface for recording spans to a message queue.

The SidecarRecorder serializes spans to a string representation before
adding them to the queue.
"""

def __init__(self, queue_name: str):
self.queue = MessageQueue(
"/traces-" + queue_name,
max_messages=MAX_QUEUE_SIZE,
max_message_size=MAX_SPAN_SIZE,
)
def __init__(self, queue_name: str, use_in_memory_queue: bool = False):
if use_in_memory_queue:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
self.queue = InMemoryMessageQueue(
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
"/traces-" + queue_name,
max_messages=MAX_QUEUE_SIZE,
)
else:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
self.queue = PosixMessageQueue(
"/traces-" + queue_name,
max_messages=MAX_QUEUE_SIZE,
max_message_size=MAX_SPAN_SIZE,
)

def send(self, span: TraceSpanObserver) -> None:
# Don't raise exceptions from here. This is called in the
Expand All @@ -561,7 +567,7 @@ def send(self, span: TraceSpanObserver) -> None:
"Trace too big. Traces published to %s are not allowed to be larger "
"than %d bytes. Received trace is %d bytes. This can be caused by "
"an excess amount of tags or a large amount of child spans.",
self.queue.queue.name,
self.queue.name,
MAX_SPAN_SIZE,
len(serialized_str),
)
Expand All @@ -570,7 +576,7 @@ def send(self, span: TraceSpanObserver) -> None:
self.queue.put(serialized_str, timeout=0)
except TimedOutError:
logger.warning(
"Trace queue %s is full. Is trace sidecar healthy?", self.queue.queue.name
"Trace queue %s is full. Is trace sidecar healthy?", self.queue.name
)


Expand Down
23 changes: 17 additions & 6 deletions baseplate/sidecars/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from baseplate.lib import metrics
from baseplate.lib.events import MAX_EVENT_SIZE
from baseplate.lib.events import MAX_QUEUE_SIZE
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import InMemoryMessageQueue, PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError
from baseplate.lib.metrics import metrics_client_from_config
from baseplate.lib.retry import RetryPolicy
Expand Down Expand Up @@ -174,6 +174,11 @@ def publish_events() -> None:
default="main",
help="name of event queue / publisher config (default: main)",
)
arg_parser.add_argument(
"--use-in-memory-queue",
default=False,
help="use an in memory queue instead of a posix queue",
)
arg_parser.add_argument(
"--debug", default=False, action="store_true", help="enable debug logging"
)
Expand Down Expand Up @@ -203,11 +208,17 @@ def publish_events() -> None:

metrics_client = metrics_client_from_config(raw_config)

event_queue = MessageQueue(
"/events-" + args.queue_name,
max_messages=cfg.max_queue_size,
max_message_size=MAX_EVENT_SIZE,
)
if args.use_in_memory_queue:
event_queue = InMemoryMessageQueue(
"/events-" + args.queue_name,
max_messages=cfg.max_queue_size,
)
else:
event_queue = PosixMessageQueue(
"/events-" + args.queue_name,
max_messages=cfg.max_queue_size,
max_message_size=MAX_EVENT_SIZE,
)

# pylint: disable=maybe-no-member
serializer = SERIALIZER_BY_VERSION[cfg.collector.version]()
Expand Down
23 changes: 17 additions & 6 deletions baseplate/sidecars/trace_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from baseplate import __version__ as baseplate_version
from baseplate.lib import config
from baseplate.lib import metrics
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import InMemoryMessageQueue, PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError
from baseplate.lib.metrics import metrics_client_from_config
from baseplate.lib.retry import RetryPolicy
Expand Down Expand Up @@ -128,6 +128,11 @@ def publish_traces() -> None:
default="main",
help="name of trace queue / publisher config (default: main)",
)
arg_parser.add_argument(
"--use-in-memory-queue",
default=False,
help="use in memory queue instead of a posix queue",
)
arg_parser.add_argument(
"--debug", default=False, action="store_true", help="enable debug logging"
)
Expand Down Expand Up @@ -160,11 +165,17 @@ def publish_traces() -> None:
},
)

trace_queue = MessageQueue(
"/traces-" + args.queue_name,
max_messages=publisher_cfg.max_queue_size,
max_message_size=MAX_SPAN_SIZE,
)
if args.use_in_memory_queue:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
trace_queue = InMemoryMessageQueue(
"/traces-" + args.queue_name,
max_messages=publisher_cfg.max_queue_size,
)
else:
trace_queue = PosixMessageQueue(
"/traces-" + args.queue_name,
max_messages=publisher_cfg.max_queue_size,
max_message_size=MAX_SPAN_SIZE,
)

# pylint: disable=maybe-no-member
inner_batch = TraceBatch(max_size=publisher_cfg.max_batch_size)
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ services:
- "redis"
- "zookeeper"
- "redis-cluster-node"
test-baseplate:
build:
context: "."
dockerfile: "Dockerfile"
command: "pytest -v tests/"
volumes:
- ".:/src"
sydjryan marked this conversation as resolved.
Show resolved Hide resolved
cassandra:
image: "cassandra:3.11"
environment:
Expand Down
17 changes: 10 additions & 7 deletions docs/api/baseplate/lib/message_queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ process pair (run the producer then the consumer):
.. testcode::

# producer.py
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import PosixMessageQueue

# If the queue doesn't already exist, we'll create it.
mq = MessageQueue(
mq = PosixMessageQueue(
"/baseplate-testing", max_messages=1, max_message_size=1)
message = "1"
mq.put(message)
Expand All @@ -48,9 +48,9 @@ POSIX message queue. Next up, run the consumer:
.. testcode::

# consumer.py
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import PosixMessageQueue

mq = MessageQueue(
mq = PosixMessageQueue(
"/baseplate-testing", max_messages=1, max_message_size=1)
# Unless a `timeout` kwarg is passed, this will block until
# we can pop a message from the queue.
Expand Down Expand Up @@ -97,8 +97,8 @@ up seeing a vague ``ValueError`` exception. Here's an example:

.. code-block:: pycon

>>> from baseplate.lib.message_queue import MessageQueue
>>> mq = MessageQueue(
>>> from baseplate.lib.message_queue import PosixMessageQueue
>>> mq = PosixMessageQueue(
"/over-the-limit", max_messages=11, max_message_size=8096)
Traceback (most recent call last):
File "<input>", line 2, in <module>
Expand Down Expand Up @@ -138,7 +138,10 @@ See ``--help`` for more info.

.. automodule:: baseplate.lib.message_queue

.. autoclass:: MessageQueue
.. autoclass:: PosixMessageQueue
:members:

.. autoclass:: InMemoryMessageQueue
:members:
sydjryan marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
Loading