Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new option [messaging].prefix to configure prefix of RabbitMQ exchange/queue names #6282

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

cognifloyd
Copy link
Member

@cognifloyd cognifloyd commented Nov 23, 2024

This PR's commits were extracted from #6273 where I'm working on getting pants+pytest to run integration tests.

Overview

This adds a new option [messaging].prefix to configure the prefix used in RabbitMQ exchange and queue names.

Examples of how this affects our exchange/queue names:

prefix=st2 (default) prefix=st2dev (launchdev.sh) prefix=foobar Type
st2.execution st2dev.execution foobar.execution Exchange
st2.sensor st2.sensor foobar.sensor Exchange
st2.workflow.work st2dev.workflow.work foobar.workflow.work Queue

I recommend reviewing each commit separately.

Purpose

This is primarily designed to support safely running tests in parallel.

Implementation

This is where the [messaging].prefix option is defined with a default of st2 to keep the current behavior (There is an analogous definition in st2tests/st2tests/config.py):

cfg.StrOpt(
"prefix",
default="st2",
help="Prefix for all exchange and queue names.",
),
]
do_register_opts(messaging_opts, "messaging", ignore_errors)

Until a kombu.Exchange or a kombu.Queue is declared, the exchange/queue objects just hold the config (including the name) required to declare it. When kombu declares an exchange/queue, it calls the instance in something like this:

# in st2 code
FOOBAR_XCHG = Exchange("foobar")

# in kombu
FOOBAR_XCHG(*args, **kwargs)

ST2 exchanges/queues are mostly defined as module-level vars, which is fine since they merely collect the name and other declaration details. So, this is where we create a subclass that applies the prefix just before an exchange or queue gets declared:

class Exchange(kombu.Exchange):
def __call__(self, *args, **kwargs):
# update exchange name with prefix just before binding (as late as possible).
prefix = cfg.CONF.messaging.prefix
if self.name and prefix != "st2":
self.name = self.name.replace("st2.", f"{prefix}.", 1)
return super().__call__(*args, **kwargs)
class Queue(kombu.Queue):
def __call__(self, *args, **kwargs):
# update queue name with prefix just before binding (as late as possible).
prefix = cfg.CONF.messaging.prefix
if self.name and prefix != "st2":
self.name = self.name.replace("st2.", f"{prefix}.", 1)
return super().__call__(*args, **kwargs)

Then, I just had to swap the imports from kombu.Exchange and kombu.Queue to the new subclasses.

This is where the tests use the ST2TESTS_PARALLEL_SLOT env var (provided by pants when running tests) is used to modify the default prefix used in tests:

def _override_mq_opts():
mq_prefix = CONF.messaging.prefix
mq_prefix = "st2test" if mq_prefix == "st2" else mq_prefix
mq_prefix = mq_prefix + os.environ.get("ST2TESTS_PARALLEL_SLOT", "")
CONF.set_override(name="prefix", override=mq_prefix, group="messaging")

This method is what passes that config to integration test subprocesses using the new oslo_config env var support (similar to how the database test logic added in #6278 using the oslo_config env var support added in #6277).

def mq_opts_as_env_vars() -> Dict[str, str]:
return {
"ST2_MESSAGING__URL": CONF.messaging.url,
"ST2_MESSAGING__PREFIX": CONF.messaging.prefix,
}

Similar to #6278, we also configure pants to pass these vars to tests, updating pants-plugins/uses_services to advertise this in the error message that shows when attempting to run tests without a running rabbitmq-server.

st2/pants.toml

Lines 250 to 252 in b6d23c9

# Use these to override RabbitMQ connection details
"ST2_MESSAGING__URL",
"ST2_MESSAGING__PREFIX", # Tests will modify this to be "{prefix}{ST2TESTS_PARALLEL_SLOT}"

Finally, the kombu exchange names are also used in event streaming. Event streaming clients should use the well-known exchange names and not be aware of any configured prefix. So, we revert the prefix to st2. for event streaming here:

def processor(self, model=None):
exchange_prefix = cfg.CONF.messaging.prefix
def process(body, message):
meta = message.delivery_info
event_prefix = meta.get("exchange", "")
if exchange_prefix != "st2" and event_prefix.startswith(exchange_prefix):
# use well-known event names over configurable exchange names
event_prefix = event_prefix.replace(f"{exchange_prefix}.", "st2.", 1)
event_name = f"{event_prefix}__{meta.get('routing_key')}"

Alternatives

To prevent parallel tests from interacting with each other, I tried using RabbitMQ vhosts before settling on a configurable exchange/queue name prefix. Creating a vhost, however, requires out-of-band administration to create it. This is because vhost creation is not part of AMQP; Instead, it is part of the CLI and Management APIs of RabbitMQ. So, before running tests, the dev would have to run some kind of script (which we would have to maintain) that calculates how many tests might run in parallel and create a vhost for each parallel slot. At first glance, this would seem be limited by the number of CPU cores on the dev machine, but pants also supports remote execution to offload test runs to some Remote Executors which defaults to 128 slots. Creating 12 + 128 vhosts seems like quite a maintenance burden just to run tests.

Once I determined that vhost was not a great option, I tried several different ways to make the exchange/queue name prefix configurable. But, each of them left some tests failing because the exchanges/queues were not pre-declared when the test ran.

  1. First, I tried to just load the config from the global oslo_config.cfg.CONF, but most of our exchanges/queues are defined as module-level variables meaning they get imported before the oslo_config has been configured with options and before reading the conf file(s).
  2. Then, I tried to apply the prefix just before the exchanges/queues get declared in the service startup code. But, sensors and actionrunners use subprocesses that do not explicitly pre-declare the exchange/queue they need (The parent process would have already done that--So why should they?).
  3. Finally, I landed on letting kombu auto-declare exchanges/queues as needed and creating subclasses of kombu.Exchange and kombu.Queue that handle applying the configured prefix just before it's declared.

This, adds a new messaging.prefix option that gets modified
to include the test slot as needed.

Exchanges are defined as module level vars initialized on import.
But, oslo_config is not setup yet at import time, so delay applying the
new prefix setting until just before they get created in RabbitMQ.

Luckily, Exchange() objects are lightweight objects that just
hold the exchange name and type. They could do more, but we
don't bind them to a channel. Because the exchange objects
merely hold strings, and because they are basically singletons
(module-level vars), we can safely update the exchange name
just before declaring it in RabbitMQ. From that point on,
everything that uses a singleton exchange object will get
the updated name.
Now that the exchange name prefix is configurable, we have
to undo the prefix to maintain a backwards compatible API.
Also, having the event names vary based on config would be
a very bad UX, so we don't want to go there anyway.
Connection has a cache of which entities (exchange/queue)
have been declared, so this shouldn't have too much of a
performance impact. This does, however, make tests much more
reliable.
@pull-request-size pull-request-size bot added the size/L PR that changes 100-499 lines. Requires some effort to review. label Nov 23, 2024
@cognifloyd cognifloyd self-assigned this Nov 23, 2024
@cognifloyd cognifloyd added this to the pants milestone Nov 23, 2024
@cognifloyd cognifloyd requested a review from a team November 24, 2024 15:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pantsbuild rabbitmq size/L PR that changes 100-499 lines. Requires some effort to review. tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants