Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new option [messaging].prefix to configure prefix of RabbitMQ exchange/queue names #6282

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Added
working on StackStorm, improve our security posture, and improve CI reliability thanks in part
to pants' use of PEX lockfiles. This is not a user-facing addition.
#6118 #6141 #6133 #6120 #6181 #6183 #6200 #6237 #6229 #6240 #6241 #6244 #6251 #6253
#6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278
#6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278 #6282
Contributed by @cognifloyd
* Build of ST2 EL9 packages #6153
Contributed by @amanda11
Expand All @@ -93,6 +93,12 @@ Added
If you experience any issues when using this experimental feature, please file an issue. #6277
Contributed by @cognifloyd

* Add new option `[messaging].prefix` to configure the prefix used in RabbitMQ exchanges and queues.
The default is `st2` (resulting in exchange names like `st2.execution` and `st2.sensor`).
This is primarily designed to support safely running tests in parallel where creating a vhost for
each parallel test run would be a maintenance burden. #6282
Contributed by @cognifloyd

3.8.1 - December 13, 2023
-------------------------
Fixed
Expand Down
2 changes: 2 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ connection_retries = 10
connection_retry_wait = 10000
# Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).
login_method = None
# Prefix for all exchange and queue names.
prefix = st2
# Use SSL / TLS to connect to the messaging server. Same as appending "?ssl=true" at the end of the connection URL string.
ssl = False
# ca_certs file contains a set of concatenated CA certificates, which are used to validate certificates passed from RabbitMQ.
Expand Down
1 change: 1 addition & 0 deletions conf/st2.dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ ssh_key_file = /home/vagrant/.ssh/stanley_rsa

[messaging]
url = amqp://guest:[email protected]:5672/
prefix = st2dev
# Uncomment to test SSL options
#url = amqp://guest:[email protected]:5671/
#ssl = True
Expand Down
32 changes: 26 additions & 6 deletions pants-plugins/uses_services/rabbitmq_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from dataclasses import dataclass
from textwrap import dedent
from typing import Tuple

from pants.backend.python.goals.pytest_runner import (
PytestPluginSetupRequest,
Expand All @@ -27,6 +28,8 @@
VenvPexProcess,
rules as pex_rules,
)
from pants.core.goals.test import TestExtraEnv
from pants.engine.env_vars import EnvironmentVars
from pants.engine.fs import CreateDigest, Digest, FileContent
from pants.engine.rules import collect_rules, Get, MultiGet, rule
from pants.engine.process import FallibleProcessResult, ProcessCacheScope
Expand Down Expand Up @@ -54,13 +57,17 @@ class UsesRabbitMQRequest:
# These config opts for integration tests are in:
# conf/st2.tests*.conf st2tests/st2tests/fixtures/conf/st2.tests*.conf
# (changed by setting ST2_CONFIG_PATH env var inside the tests)
# TODO: for unit tests: modify code to pull mq connect settings from env vars
# TODO: for int tests: modify st2.tests*.conf on the fly to set the per-pantsd-slot vhost
# and either add env vars for mq connect settings or modify conf files as well
# These can also be updated via the ST2_MESSAGING_* env vars (which oslo_config reads).
# Integration tests should pass these changes onto subprocesses via the same env vars.

# with our version of oslo.config (newer are slower) we can't directly override opts w/ environment variables.
mq_urls: Tuple[str] = ("amqp://guest:[email protected]:5672//",)

mq_urls: tuple[str] = ("amqp://guest:[email protected]:5672//",)
@classmethod
def from_env(cls, env: EnvironmentVars) -> UsesRabbitMQRequest:
default = cls()
url = env.get("ST2_MESSAGING__URL", None)
mq_urls = (url,) if url else default.mq_urls
return UsesRabbitMQRequest(mq_urls=mq_urls)


@dataclass(frozen=True)
Expand All @@ -83,9 +90,12 @@ def is_applicable(cls, target: Target) -> bool:
)
async def rabbitmq_is_running_for_pytest(
request: PytestUsesRabbitMQRequest,
test_extra_env: TestExtraEnv,
) -> PytestPluginSetup:
# this will raise an error if rabbitmq is not running
_ = await Get(RabbitMQIsRunning, UsesRabbitMQRequest())
_ = await Get(
RabbitMQIsRunning, UsesRabbitMQRequest.from_env(env=test_extra_env.env)
)

return PytestPluginSetup()

Expand Down Expand Up @@ -167,6 +177,16 @@ async def rabbitmq_is_running(
"""
),
service_start_cmd_generic="systemctl start rabbitmq-server",
env_vars_hint=dedent(
"""\
You can also export the ST2_MESSAGING__URL env var to automatically use any
RabbitMQ host, local or remote, while running unit and integration tests.
If needed, you can also override the default exchange/queue name prefix
by exporting ST2_MESSAGING__PREFIX. Note that tests always add a numeric
suffix to the exchange/queue name prefix so that tests can safely run
in parallel.
"""
),
),
)

Expand Down
11 changes: 9 additions & 2 deletions pants-plugins/uses_services/rabbitmq_rules_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ def run_rabbitmq_is_running(
"--backend-packages=uses_services",
*(extra_args or ()),
],
env_inherit={"PATH", "PYENV_ROOT", "HOME"},
env_inherit={
"PATH",
"PYENV_ROOT",
"HOME",
"ST2_MESSAGING__URL",
"ST2_MESSAGING__PREFIX",
"ST2TESTS_PARALLEL_SLOT",
},
)
result = rule_runner.request(
RabbitMQIsRunning,
Expand All @@ -62,7 +69,7 @@ def run_rabbitmq_is_running(

# Warning this requires that rabbitmq be running
def test_rabbitmq_is_running(rule_runner: RuleRunner) -> None:
request = UsesRabbitMQRequest()
request = UsesRabbitMQRequest.from_env(env=rule_runner.environment)
mock_platform = platform(os="TestMock")

# we are asserting that this does not raise an exception
Expand Down
3 changes: 3 additions & 0 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ extra_env_vars = [
"ST2_DATABASE__CONNECTION_TIMEOUT",
"ST2_DATABASE__USERNAME",
"ST2_DATABASE__PASSWORD",
# Use these to override RabbitMQ connection details
"ST2_MESSAGING__URL",
"ST2_MESSAGING__PREFIX", # Tests will modify this to be "{prefix}{ST2TESTS_PARALLEL_SLOT}"
# Use these to override the redis host and port
"ST2TESTS_REDIS_HOST",
"ST2TESTS_REDIS_PORT",
Expand Down
5 changes: 2 additions & 3 deletions st2actions/tests/integration/test_actions_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
import random
import eventlet

from kombu import Exchange
from kombu import Queue
from unittest import TestCase

from st2common.transport.consumers import ActionsQueueConsumer
from st2common.transport.kombu import Exchange, Queue
from st2common.transport.publishers import PoolPublisher
from st2common.transport import utils as transport_utils
from st2common.models.db.liveaction import LiveActionDB
Expand All @@ -35,7 +34,7 @@ class ActionsQueueConsumerTestCase(TestCase):

def test_stop_consumption_on_shutdown(self):
exchange = Exchange("st2.execution.test", type="topic")
queue_name = "test-" + str(random.randint(1, 10000))
queue_name = f"st2.test-{random.randint(1, 10000)}"
queue = Queue(
name=queue_name, exchange=exchange, routing_key="#", auto_delete=True
)
Expand Down
2 changes: 1 addition & 1 deletion st2common/benchmarks/micro/test_publisher_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

monkey_patch()

from kombu import Exchange
from kombu.serialization import pickle

import os
Expand All @@ -27,6 +26,7 @@

from st2common.models.db.liveaction import LiveActionDB
from st2common.transport import publishers
from st2common.transport.kombu import Exchange

from common import FIXTURES_DIR
from common import PYTEST_FIXTURE_FILE_PARAM_DECORATOR
Expand Down
5 changes: 5 additions & 0 deletions st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ def register_opts(ignore_errors=False):
help="Compression algorithm to use for compressing the payloads which are sent over "
"the message bus. Defaults to no compression.",
),
cfg.StrOpt(
"prefix",
default="st2",
help="Prefix for all exchange and queue names.",
),
]

do_register_opts(messaging_opts, "messaging", ignore_errors)
Expand Down
8 changes: 7 additions & 1 deletion st2common/st2common/stream/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ def get_consumers(self, consumer, channel):
raise NotImplementedError("get_consumers() is not implemented")

def processor(self, model=None):
exchange_prefix = cfg.CONF.messaging.prefix

def process(body, message):
meta = message.delivery_info
event_name = "%s__%s" % (meta.get("exchange"), meta.get("routing_key"))
event_prefix = meta.get("exchange", "")
if exchange_prefix != "st2" and event_prefix.startswith(exchange_prefix):
# use well-known event names over configurable exchange names
event_prefix = event_prefix.replace(f"{exchange_prefix}.", "st2.", 1)
event_name = f"{event_prefix}__{meta.get('routing_key')}"

try:
if model:
Expand Down
3 changes: 2 additions & 1 deletion st2common/st2common/transport/actionalias.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# All Exchanges and Queues related to liveaction.

from __future__ import absolute_import
from kombu import Exchange, Queue

from st2common.transport import publishers
from st2common.transport.kombu import Exchange, Queue

__all__ = [
"ActionAliasPublisher",
Expand Down
3 changes: 1 addition & 2 deletions st2common/st2common/transport/actionexecutionstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

from __future__ import absolute_import

from kombu import Exchange, Queue

from st2common.transport import publishers
from st2common.transport.kombu import Exchange, Queue

__all__ = ["ActionExecutionStatePublisher"]

Expand Down
3 changes: 1 addition & 2 deletions st2common/st2common/transport/announcement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

from __future__ import absolute_import

from kombu import Exchange, Queue

from st2common import log as logging
from st2common.constants.trace import TRACE_CONTEXT
from st2common.models.api.trace import TraceContext
from st2common.transport import publishers
from st2common.transport.kombu import Exchange, Queue

__all__ = ["AnnouncementPublisher", "AnnouncementDispatcher", "get_queue"]

Expand Down
3 changes: 2 additions & 1 deletion st2common/st2common/transport/bootstrap_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG
from st2common.transport.announcement import ANNOUNCEMENT_XCHG
from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper
from st2common.transport.execution import EXECUTION_XCHG
from st2common.transport.execution import EXECUTION_XCHG, EXECUTION_OUTPUT_XCHG
from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG
from st2common.transport.reactor import SENSOR_CUD_XCHG
from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG
Expand Down Expand Up @@ -67,6 +67,7 @@
ACTIONEXECUTIONSTATE_XCHG,
ANNOUNCEMENT_XCHG,
EXECUTION_XCHG,
EXECUTION_OUTPUT_XCHG,
LIVEACTION_XCHG,
LIVEACTION_STATUS_MGMT_XCHG,
TRIGGER_CUD_XCHG,
Expand Down
3 changes: 2 additions & 1 deletion st2common/st2common/transport/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
# All Exchanges and Queues related to liveaction.

from __future__ import absolute_import
from kombu import Exchange, Queue

from st2common.transport import publishers
from st2common.transport.kombu import Exchange, Queue

__all__ = [
"ActionExecutionPublisher",
Expand Down
36 changes: 36 additions & 0 deletions st2common/st2common/transport/kombu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2024 The StackStorm Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import kombu
from oslo_config import cfg


class Exchange(kombu.Exchange):
def __call__(self, *args, **kwargs):
# update exchange name with prefix just before binding (as late as possible).
prefix = cfg.CONF.messaging.prefix
if self.name and prefix != "st2":
self.name = self.name.replace("st2.", f"{prefix}.", 1)
return super().__call__(*args, **kwargs)


class Queue(kombu.Queue):
def __call__(self, *args, **kwargs):
# update queue name with prefix just before binding (as late as possible).
prefix = cfg.CONF.messaging.prefix
if self.name and prefix != "st2":
self.name = self.name.replace("st2.", f"{prefix}.", 1)
return super().__call__(*args, **kwargs)
3 changes: 1 addition & 2 deletions st2common/st2common/transport/liveaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

from __future__ import absolute_import

from kombu import Exchange, Queue

from st2common.transport import publishers
from st2common.transport.kombu import Exchange, Queue

__all__ = ["LiveActionPublisher", "get_queue", "get_status_management_queue"]

Expand Down
5 changes: 3 additions & 2 deletions st2common/st2common/transport/publishers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ def do_publish(connection, channel):
# completely invalidating this ConnectionPool. Also, a ConnectionPool for
# producer does not really solve any problems for us so better to create a
# Producer for each publish.
producer = Producer(channel)
# passing exchange to Producer __init__ allows auto_declare to declare
# anything that's missing (especially useful for tests).
producer = Producer(channel, exchange=exchange)
kwargs = {
"body": payload,
"exchange": exchange,
"routing_key": routing_key,
"serializer": "pickle",
"compression": compression,
Expand Down
3 changes: 1 addition & 2 deletions st2common/st2common/transport/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

from __future__ import absolute_import

from kombu import Queue

from st2common.constants import action as action_constants
from st2common.transport import actionalias
from st2common.transport import actionexecutionstate
Expand All @@ -33,6 +31,7 @@
from st2common.transport import publishers
from st2common.transport import reactor
from st2common.transport import workflow
from st2common.transport.kombu import Queue

__all__ = [
"ACTIONSCHEDULER_REQUEST_QUEUE",
Expand Down
2 changes: 1 addition & 1 deletion st2common/st2common/transport/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# limitations under the License.

from __future__ import absolute_import
from kombu import Exchange, Queue

from st2common import log as logging
from st2common.constants.trace import TRACE_CONTEXT
from st2common.models.api.trace import TraceContext
from st2common.transport import publishers
from st2common.transport.kombu import Exchange, Queue

__all__ = [
"TriggerCUDPublisher",
Expand Down
Loading
Loading