diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a94babb48b..39122fb1c1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -69,7 +69,7 @@ Added working on StackStorm, improve our security posture, and improve CI reliability thanks in part to pants' use of PEX lockfiles. This is not a user-facing addition. #6118 #6141 #6133 #6120 #6181 #6183 #6200 #6237 #6229 #6240 #6241 #6244 #6251 #6253 - #6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278 + #6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278 #6282 Contributed by @cognifloyd * Build of ST2 EL9 packages #6153 Contributed by @amanda11 @@ -93,6 +93,12 @@ Added If you experience any issues when using this experimental feature, please file an issue. #6277 Contributed by @cognifloyd +* Add new option `[messaging].prefix` to configure the prefix used in RabbitMQ exchanges and queues. + The default is `st2` (resulting in exchange names like `st2.execution` and `st2.sensor`). + This is primarily designed to support safely running tests in parallel where creating a vhost for + each parallel test run would be a maintenance burden. #6282 + Contributed by @cognifloyd + 3.8.1 - December 13, 2023 ------------------------- Fixed diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 5eed1ffd4c..3c9d005d0a 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -230,6 +230,8 @@ connection_retries = 10 connection_retry_wait = 10000 # Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.). login_method = None +# Prefix for all exchange and queue names. +prefix = st2 # Use SSL / TLS to connect to the messaging server. Same as appending "?ssl=true" at the end of the connection URL string. ssl = False # ca_certs file contains a set of concatenated CA certificates, which are used to validate certificates passed from RabbitMQ. diff --git a/conf/st2.dev.conf b/conf/st2.dev.conf index cf2b5b6596..6755403cf6 100644 --- a/conf/st2.dev.conf +++ b/conf/st2.dev.conf @@ -103,6 +103,7 @@ ssh_key_file = /home/vagrant/.ssh/stanley_rsa [messaging] url = amqp://guest:guest@127.0.0.1:5672/ +prefix = st2dev # Uncomment to test SSL options #url = amqp://guest:guest@127.0.0.1:5671/ #ssl = True diff --git a/pants-plugins/uses_services/rabbitmq_rules.py b/pants-plugins/uses_services/rabbitmq_rules.py index 3d5fd13024..89df3976f9 100644 --- a/pants-plugins/uses_services/rabbitmq_rules.py +++ b/pants-plugins/uses_services/rabbitmq_rules.py @@ -15,6 +15,7 @@ from dataclasses import dataclass from textwrap import dedent +from typing import Tuple from pants.backend.python.goals.pytest_runner import ( PytestPluginSetupRequest, @@ -27,6 +28,8 @@ VenvPexProcess, rules as pex_rules, ) +from pants.core.goals.test import TestExtraEnv +from pants.engine.env_vars import EnvironmentVars from pants.engine.fs import CreateDigest, Digest, FileContent from pants.engine.rules import collect_rules, Get, MultiGet, rule from pants.engine.process import FallibleProcessResult, ProcessCacheScope @@ -54,13 +57,17 @@ class UsesRabbitMQRequest: # These config opts for integration tests are in: # conf/st2.tests*.conf st2tests/st2tests/fixtures/conf/st2.tests*.conf # (changed by setting ST2_CONFIG_PATH env var inside the tests) - # TODO: for unit tests: modify code to pull mq connect settings from env vars - # TODO: for int tests: modify st2.tests*.conf on the fly to set the per-pantsd-slot vhost - # and either add env vars for mq connect settings or modify conf files as well + # These can also be updated via the ST2_MESSAGING_* env vars (which oslo_config reads). + # Integration tests should pass these changes onto subprocesses via the same env vars. - # with our version of oslo.config (newer are slower) we can't directly override opts w/ environment variables. + mq_urls: Tuple[str] = ("amqp://guest:guest@127.0.0.1:5672//",) - mq_urls: tuple[str] = ("amqp://guest:guest@127.0.0.1:5672//",) + @classmethod + def from_env(cls, env: EnvironmentVars) -> UsesRabbitMQRequest: + default = cls() + url = env.get("ST2_MESSAGING__URL", None) + mq_urls = (url,) if url else default.mq_urls + return UsesRabbitMQRequest(mq_urls=mq_urls) @dataclass(frozen=True) @@ -83,9 +90,12 @@ def is_applicable(cls, target: Target) -> bool: ) async def rabbitmq_is_running_for_pytest( request: PytestUsesRabbitMQRequest, + test_extra_env: TestExtraEnv, ) -> PytestPluginSetup: # this will raise an error if rabbitmq is not running - _ = await Get(RabbitMQIsRunning, UsesRabbitMQRequest()) + _ = await Get( + RabbitMQIsRunning, UsesRabbitMQRequest.from_env(env=test_extra_env.env) + ) return PytestPluginSetup() @@ -167,6 +177,16 @@ async def rabbitmq_is_running( """ ), service_start_cmd_generic="systemctl start rabbitmq-server", + env_vars_hint=dedent( + """\ + You can also export the ST2_MESSAGING__URL env var to automatically use any + RabbitMQ host, local or remote, while running unit and integration tests. + If needed, you can also override the default exchange/queue name prefix + by exporting ST2_MESSAGING__PREFIX. Note that tests always add a numeric + suffix to the exchange/queue name prefix so that tests can safely run + in parallel. + """ + ), ), ) diff --git a/pants-plugins/uses_services/rabbitmq_rules_test.py b/pants-plugins/uses_services/rabbitmq_rules_test.py index 9eb4ae5055..c1945bda43 100644 --- a/pants-plugins/uses_services/rabbitmq_rules_test.py +++ b/pants-plugins/uses_services/rabbitmq_rules_test.py @@ -51,7 +51,14 @@ def run_rabbitmq_is_running( "--backend-packages=uses_services", *(extra_args or ()), ], - env_inherit={"PATH", "PYENV_ROOT", "HOME"}, + env_inherit={ + "PATH", + "PYENV_ROOT", + "HOME", + "ST2_MESSAGING__URL", + "ST2_MESSAGING__PREFIX", + "ST2TESTS_PARALLEL_SLOT", + }, ) result = rule_runner.request( RabbitMQIsRunning, @@ -62,7 +69,7 @@ def run_rabbitmq_is_running( # Warning this requires that rabbitmq be running def test_rabbitmq_is_running(rule_runner: RuleRunner) -> None: - request = UsesRabbitMQRequest() + request = UsesRabbitMQRequest.from_env(env=rule_runner.environment) mock_platform = platform(os="TestMock") # we are asserting that this does not raise an exception diff --git a/pants.toml b/pants.toml index 23a2c5f4a2..8d5568daab 100644 --- a/pants.toml +++ b/pants.toml @@ -247,6 +247,9 @@ extra_env_vars = [ "ST2_DATABASE__CONNECTION_TIMEOUT", "ST2_DATABASE__USERNAME", "ST2_DATABASE__PASSWORD", + # Use these to override RabbitMQ connection details + "ST2_MESSAGING__URL", + "ST2_MESSAGING__PREFIX", # Tests will modify this to be "{prefix}{ST2TESTS_PARALLEL_SLOT}" # Use these to override the redis host and port "ST2TESTS_REDIS_HOST", "ST2TESTS_REDIS_PORT", diff --git a/st2actions/tests/integration/test_actions_queue_consumer.py b/st2actions/tests/integration/test_actions_queue_consumer.py index 149e94b0f3..aa37b5e589 100644 --- a/st2actions/tests/integration/test_actions_queue_consumer.py +++ b/st2actions/tests/integration/test_actions_queue_consumer.py @@ -17,11 +17,10 @@ import random import eventlet -from kombu import Exchange -from kombu import Queue from unittest import TestCase from st2common.transport.consumers import ActionsQueueConsumer +from st2common.transport.kombu import Exchange, Queue from st2common.transport.publishers import PoolPublisher from st2common.transport import utils as transport_utils from st2common.models.db.liveaction import LiveActionDB @@ -35,7 +34,7 @@ class ActionsQueueConsumerTestCase(TestCase): def test_stop_consumption_on_shutdown(self): exchange = Exchange("st2.execution.test", type="topic") - queue_name = "test-" + str(random.randint(1, 10000)) + queue_name = f"st2.test-{random.randint(1, 10000)}" queue = Queue( name=queue_name, exchange=exchange, routing_key="#", auto_delete=True ) diff --git a/st2common/benchmarks/micro/test_publisher_compression.py b/st2common/benchmarks/micro/test_publisher_compression.py index 51d13fd8a4..0faabf86a1 100644 --- a/st2common/benchmarks/micro/test_publisher_compression.py +++ b/st2common/benchmarks/micro/test_publisher_compression.py @@ -16,7 +16,6 @@ monkey_patch() -from kombu import Exchange from kombu.serialization import pickle import os @@ -27,6 +26,7 @@ from st2common.models.db.liveaction import LiveActionDB from st2common.transport import publishers +from st2common.transport.kombu import Exchange from common import FIXTURES_DIR from common import PYTEST_FIXTURE_FILE_PARAM_DECORATOR diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 15a4d5b32a..a9461951d1 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -404,6 +404,11 @@ def register_opts(ignore_errors=False): help="Compression algorithm to use for compressing the payloads which are sent over " "the message bus. Defaults to no compression.", ), + cfg.StrOpt( + "prefix", + default="st2", + help="Prefix for all exchange and queue names.", + ), ] do_register_opts(messaging_opts, "messaging", ignore_errors) diff --git a/st2common/st2common/stream/listener.py b/st2common/st2common/stream/listener.py index 759133f96f..5eb5dc8b8f 100644 --- a/st2common/st2common/stream/listener.py +++ b/st2common/st2common/stream/listener.py @@ -58,9 +58,15 @@ def get_consumers(self, consumer, channel): raise NotImplementedError("get_consumers() is not implemented") def processor(self, model=None): + exchange_prefix = cfg.CONF.messaging.prefix + def process(body, message): meta = message.delivery_info - event_name = "%s__%s" % (meta.get("exchange"), meta.get("routing_key")) + event_prefix = meta.get("exchange", "") + if exchange_prefix != "st2" and event_prefix.startswith(exchange_prefix): + # use well-known event names over configurable exchange names + event_prefix = event_prefix.replace(f"{exchange_prefix}.", "st2.", 1) + event_name = f"{event_prefix}__{meta.get('routing_key')}" try: if model: diff --git a/st2common/st2common/transport/actionalias.py b/st2common/st2common/transport/actionalias.py index 33fff1e92d..a083b8588b 100644 --- a/st2common/st2common/transport/actionalias.py +++ b/st2common/st2common/transport/actionalias.py @@ -15,8 +15,9 @@ # All Exchanges and Queues related to liveaction. from __future__ import absolute_import -from kombu import Exchange, Queue + from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = [ "ActionAliasPublisher", diff --git a/st2common/st2common/transport/actionexecutionstate.py b/st2common/st2common/transport/actionexecutionstate.py index 46fe095fbf..581fe0a5a5 100644 --- a/st2common/st2common/transport/actionexecutionstate.py +++ b/st2common/st2common/transport/actionexecutionstate.py @@ -17,9 +17,8 @@ from __future__ import absolute_import -from kombu import Exchange, Queue - from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["ActionExecutionStatePublisher"] diff --git a/st2common/st2common/transport/announcement.py b/st2common/st2common/transport/announcement.py index ff1c005a78..76f1ce8b34 100644 --- a/st2common/st2common/transport/announcement.py +++ b/st2common/st2common/transport/announcement.py @@ -15,12 +15,11 @@ from __future__ import absolute_import -from kombu import Exchange, Queue - from st2common import log as logging from st2common.constants.trace import TRACE_CONTEXT from st2common.models.api.trace import TraceContext from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["AnnouncementPublisher", "AnnouncementDispatcher", "get_queue"] diff --git a/st2common/st2common/transport/bootstrap_utils.py b/st2common/st2common/transport/bootstrap_utils.py index b2eb8c7c2d..b5a22b7eda 100644 --- a/st2common/st2common/transport/bootstrap_utils.py +++ b/st2common/st2common/transport/bootstrap_utils.py @@ -31,7 +31,7 @@ from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG from st2common.transport.announcement import ANNOUNCEMENT_XCHG from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper -from st2common.transport.execution import EXECUTION_XCHG +from st2common.transport.execution import EXECUTION_XCHG, EXECUTION_OUTPUT_XCHG from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG from st2common.transport.reactor import SENSOR_CUD_XCHG from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG @@ -67,6 +67,7 @@ ACTIONEXECUTIONSTATE_XCHG, ANNOUNCEMENT_XCHG, EXECUTION_XCHG, + EXECUTION_OUTPUT_XCHG, LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG, TRIGGER_CUD_XCHG, diff --git a/st2common/st2common/transport/execution.py b/st2common/st2common/transport/execution.py index 5d2880fd6f..3ffeafe345 100644 --- a/st2common/st2common/transport/execution.py +++ b/st2common/st2common/transport/execution.py @@ -16,8 +16,9 @@ # All Exchanges and Queues related to liveaction. from __future__ import absolute_import -from kombu import Exchange, Queue + from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = [ "ActionExecutionPublisher", diff --git a/st2common/st2common/transport/kombu.py b/st2common/st2common/transport/kombu.py new file mode 100644 index 0000000000..5074399b0b --- /dev/null +++ b/st2common/st2common/transport/kombu.py @@ -0,0 +1,36 @@ +# Copyright 2024 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import kombu +from oslo_config import cfg + + +class Exchange(kombu.Exchange): + def __call__(self, *args, **kwargs): + # update exchange name with prefix just before binding (as late as possible). + prefix = cfg.CONF.messaging.prefix + if self.name and prefix != "st2": + self.name = self.name.replace("st2.", f"{prefix}.", 1) + return super().__call__(*args, **kwargs) + + +class Queue(kombu.Queue): + def __call__(self, *args, **kwargs): + # update queue name with prefix just before binding (as late as possible). + prefix = cfg.CONF.messaging.prefix + if self.name and prefix != "st2": + self.name = self.name.replace("st2.", f"{prefix}.", 1) + return super().__call__(*args, **kwargs) diff --git a/st2common/st2common/transport/liveaction.py b/st2common/st2common/transport/liveaction.py index 670c5ebb2e..e410c5d933 100644 --- a/st2common/st2common/transport/liveaction.py +++ b/st2common/st2common/transport/liveaction.py @@ -17,9 +17,8 @@ from __future__ import absolute_import -from kombu import Exchange, Queue - from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["LiveActionPublisher", "get_queue", "get_status_management_queue"] diff --git a/st2common/st2common/transport/publishers.py b/st2common/st2common/transport/publishers.py index 73596ed70e..62484d4c46 100644 --- a/st2common/st2common/transport/publishers.py +++ b/st2common/st2common/transport/publishers.py @@ -71,10 +71,11 @@ def do_publish(connection, channel): # completely invalidating this ConnectionPool. Also, a ConnectionPool for # producer does not really solve any problems for us so better to create a # Producer for each publish. - producer = Producer(channel) + # passing exchange to Producer __init__ allows auto_declare to declare + # anything that's missing (especially useful for tests). + producer = Producer(channel, exchange=exchange) kwargs = { "body": payload, - "exchange": exchange, "routing_key": routing_key, "serializer": "pickle", "compression": compression, diff --git a/st2common/st2common/transport/queues.py b/st2common/st2common/transport/queues.py index 33a3f26d03..eefe72b964 100644 --- a/st2common/st2common/transport/queues.py +++ b/st2common/st2common/transport/queues.py @@ -22,8 +22,6 @@ from __future__ import absolute_import -from kombu import Queue - from st2common.constants import action as action_constants from st2common.transport import actionalias from st2common.transport import actionexecutionstate @@ -33,6 +31,7 @@ from st2common.transport import publishers from st2common.transport import reactor from st2common.transport import workflow +from st2common.transport.kombu import Queue __all__ = [ "ACTIONSCHEDULER_REQUEST_QUEUE", diff --git a/st2common/st2common/transport/reactor.py b/st2common/st2common/transport/reactor.py index 3dba86344f..657a35c6b3 100644 --- a/st2common/st2common/transport/reactor.py +++ b/st2common/st2common/transport/reactor.py @@ -14,12 +14,12 @@ # limitations under the License. from __future__ import absolute_import -from kombu import Exchange, Queue from st2common import log as logging from st2common.constants.trace import TRACE_CONTEXT from st2common.models.api.trace import TraceContext from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = [ "TriggerCUDPublisher", diff --git a/st2common/st2common/transport/workflow.py b/st2common/st2common/transport/workflow.py index 0302611a36..f4cdc3fbba 100644 --- a/st2common/st2common/transport/workflow.py +++ b/st2common/st2common/transport/workflow.py @@ -17,16 +17,13 @@ from __future__ import absolute_import -import kombu - from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["WorkflowExecutionPublisher", "get_queue", "get_status_management_queue"] -WORKFLOW_EXECUTION_XCHG = kombu.Exchange("st2.workflow", type="topic") -WORKFLOW_EXECUTION_STATUS_MGMT_XCHG = kombu.Exchange( - "st2.workflow.status", type="topic" -) +WORKFLOW_EXECUTION_XCHG = Exchange("st2.workflow", type="topic") +WORKFLOW_EXECUTION_STATUS_MGMT_XCHG = Exchange("st2.workflow.status", type="topic") class WorkflowExecutionPublisher( @@ -40,10 +37,8 @@ def __init__(self): def get_queue(name, routing_key): - return kombu.Queue(name, WORKFLOW_EXECUTION_XCHG, routing_key=routing_key) + return Queue(name, WORKFLOW_EXECUTION_XCHG, routing_key=routing_key) def get_status_management_queue(name, routing_key): - return kombu.Queue( - name, WORKFLOW_EXECUTION_STATUS_MGMT_XCHG, routing_key=routing_key - ) + return Queue(name, WORKFLOW_EXECUTION_STATUS_MGMT_XCHG, routing_key=routing_key) diff --git a/st2common/st2common/util/queues.py b/st2common/st2common/util/queues.py index 9fce3b20a7..b8c7372fa3 100644 --- a/st2common/st2common/util/queues.py +++ b/st2common/st2common/util/queues.py @@ -47,7 +47,7 @@ def get_queue_name(queue_name_base, queue_name_suffix, add_random_uuid_to_suffix # might cause issues in RabbitMQ. u_hex = uuid.uuid4().hex uuid_suffix = uuid.uuid4().hex[len(u_hex) - 10 :] - queue_suffix = "%s-%s" % (queue_name_suffix, uuid_suffix) + queue_suffix = f"{queue_name_suffix}-{uuid_suffix}" - queue_name = "%s.%s" % (queue_name_base, queue_suffix) + queue_name = f"{queue_name_base}.{queue_suffix}" return queue_name diff --git a/st2common/tests/unit/test_queue_consumer.py b/st2common/tests/unit/test_queue_consumer.py index 463eb0def4..4461444255 100644 --- a/st2common/tests/unit/test_queue_consumer.py +++ b/st2common/tests/unit/test_queue_consumer.py @@ -23,6 +23,7 @@ from tests.unit.base import FakeModelDB +# AMQP connection is mocked, so these do not need messaging.prefix FAKE_XCHG = Exchange("st2.tests", type="topic") FAKE_WORK_Q = Queue("st2.tests.unit", FAKE_XCHG) diff --git a/st2common/tests/unit/test_state_publisher.py b/st2common/tests/unit/test_state_publisher.py index 86bb67c2cf..7961cb7dc3 100644 --- a/st2common/tests/unit/test_state_publisher.py +++ b/st2common/tests/unit/test_state_publisher.py @@ -31,6 +31,7 @@ from st2tests import DbTestCase +# PoolPublisher is mocked, so this does not need messaging.prefix FAKE_STATE_MGMT_XCHG = kombu.Exchange("st2.fake.state", type="topic") diff --git a/st2common/tests/unit/test_transport.py b/st2common/tests/unit/test_transport.py index ae12b1ea9d..71d18e6010 100644 --- a/st2common/tests/unit/test_transport.py +++ b/st2common/tests/unit/test_transport.py @@ -25,13 +25,12 @@ from bson.objectid import ObjectId from kombu.mixins import ConsumerMixin -from kombu import Exchange -from kombu import Queue from oslo_config import cfg from st2common.transport.publishers import PoolPublisher from st2common.transport.utils import _get_ssl_kwargs from st2common.transport import utils as transport_utils +from st2common.transport.kombu import Exchange, Queue from st2common.models.db.liveaction import LiveActionDB __all__ = ["TransportUtilsTestCase"] @@ -69,7 +68,7 @@ def test_publish_compression(self): live_action_db.result = {"foo": "bar"} exchange = Exchange("st2.execution.test", type="topic") - queue_name = "test-" + str(random.randint(1, 10000)) + queue_name = f"st2.test-{random.randint(1, 10000)}" queue = Queue( name=queue_name, exchange=exchange, routing_key="#", auto_delete=True ) diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index 6a2e13ea89..e95e5f0799 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -66,6 +66,7 @@ def _setup_config_opts(coordinator_noop=True): def _override_config_opts(coordinator_noop=False): _override_db_opts() + _override_mq_opts() _override_common_opts() _override_api_opts() _override_keyvalue_opts() @@ -107,8 +108,18 @@ def db_opts_as_env_vars() -> Dict[str, str]: return env +def _override_mq_opts(): + mq_prefix = CONF.messaging.prefix + mq_prefix = "st2test" if mq_prefix == "st2" else mq_prefix + mq_prefix = mq_prefix + os.environ.get("ST2TESTS_PARALLEL_SLOT", "") + CONF.set_override(name="prefix", override=mq_prefix, group="messaging") + + def mq_opts_as_env_vars() -> Dict[str, str]: - return {"ST2_MESSAGING__URL": CONF.messaging.url} + return { + "ST2_MESSAGING__URL": CONF.messaging.url, + "ST2_MESSAGING__PREFIX": CONF.messaging.prefix, + } def _override_common_opts(): @@ -269,6 +280,11 @@ def _register_api_opts(): default=None, help="Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).", ), + cfg.StrOpt( + "prefix", + default="st2", + help="Prefix for all exchange and queue names.", + ), ] _register_opts(messaging_opts, group="messaging")