From 3cc2aee2290bd9a4fba9e0cebe3b65370aa76af6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?= Date: Fri, 1 Sep 2023 08:22:44 +0100 Subject: [PATCH] Revert usage of long running REQ channel (bsc#1213960, bsc#1213630, bsc#1213257) * Revert usage of long running REQ channel (bsc#1213960, bsc#1213630, bsc#1213257) This reverts commits: https://github.com/saltstack/salt/commit/a99ffb557b8a1142225d4925aba4a5e493923d2f https://github.com/saltstack/salt/commit/80ae5188807550e7592fa12d8661ee83c4313ec8 https://github.com/saltstack/salt/commit/3c7e1ec1f08abd7cd1ba78ad7880acb6ba6fdce7 https://github.com/saltstack/salt/commit/171926cc57618b51bf3fdc042b62212e681180fc From this PR: https://github.com/saltstack/salt/pull/61468 See: https://github.com/saltstack/salt/issues/62959#issuecomment-1658335432 * Revert "Fix linter" This reverts commit d09d2d3f31e06c554b4ed869b7dc4f8b8bce5dc9. * Revert "add a regression test" This reverts commit b2c32be0a80c92585a9063409c42895357bb0dbe. * Fix failing tests after reverting commits --- doc/topics/development/architecture.rst | 8 +- salt/channel/client.py | 9 +-- salt/minion.py | 47 +++-------- salt/transport/ipc.py | 5 +- salt/utils/asynchronous.py | 2 +- .../transport/server/test_req_channel.py | 16 ++-- tests/pytests/unit/test_minion.py | 79 ++++--------------- 7 files changed, 39 insertions(+), 127 deletions(-) diff --git a/doc/topics/development/architecture.rst b/doc/topics/development/architecture.rst index 17400db0017..1c717092f83 100644 --- a/doc/topics/development/architecture.rst +++ b/doc/topics/development/architecture.rst @@ -220,15 +220,11 @@ the received message. 4) The new minion thread is created. The _thread_return() function starts up and actually calls out to the requested function contained in the job. 5) The requested function runs and returns a result. [Still in thread.] -6) The result of the function that's run is published on the minion's local event bus with event -tag "__master_req_channel_payload" [Still in thread.] +6) The result of the function that's run is encrypted and returned to the +master's ReqServer (TCP 4506 on master). [Still in thread.] 7) Thread exits. Because the main thread was only blocked for the time that it took to initialize the worker thread, many other requests could have been received and processed during this time. -8) Minion event handler gets the event with tag "__master_req_channel_payload" -and sends the payload to master's ReqServer (TCP 4506 on master), via the long-running async request channel -that was opened when minion first started up. - A Note on ClearFuncs vs. AESFuncs diff --git a/salt/channel/client.py b/salt/channel/client.py index e5b073ccdba..76d7a8e5b93 100644 --- a/salt/channel/client.py +++ b/salt/channel/client.py @@ -98,7 +98,6 @@ class AsyncReqChannel: "_crypted_transfer", "_uncrypted_transfer", "send", - "connect", ] close_methods = [ "close", @@ -128,7 +127,7 @@ def factory(cls, opts, **kwargs): else: auth = None - transport = salt.transport.request_client(opts, io_loop=io_loop) + transport = salt.transport.request_client(opts, io_loop) return cls(opts, transport, auth) def __init__(self, opts, transport, auth, **kwargs): @@ -271,10 +270,6 @@ def _uncrypted_transfer(self, load, timeout=60): raise salt.ext.tornado.gen.Return(ret) - @salt.ext.tornado.gen.coroutine - def connect(self): - yield self.transport.connect() - @salt.ext.tornado.gen.coroutine def send(self, load, tries=3, timeout=60, raw=False): """ @@ -295,7 +290,7 @@ def send(self, load, tries=3, timeout=60, raw=False): ret = yield self._crypted_transfer(load, timeout=timeout, raw=raw) break except Exception as exc: # pylint: disable=broad-except - log.trace("Failed to send msg %r", exc) + log.error("Failed to send msg %r", exc) if _try >= tries: raise else: diff --git a/salt/minion.py b/salt/minion.py index c3b65f16c3d..9597d6e63a0 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1361,30 +1361,11 @@ def connect_master(self, failed=False): """ Return a future which will complete when you are connected to a master """ - # Consider refactoring so that eval_master does not have a subtle side-effect on the contents of the opts array master, self.pub_channel = yield self.eval_master( self.opts, self.timeout, self.safe, failed ) - - # a long-running req channel - self.req_channel = salt.transport.client.AsyncReqChannel.factory( - self.opts, io_loop=self.io_loop - ) - - if hasattr( - self.req_channel, "connect" - ): # TODO: consider generalizing this for all channels - log.debug("Connecting minion's long-running req channel") - yield self.req_channel.connect() - yield self._post_master_init(master) - @salt.ext.tornado.gen.coroutine - def handle_payload(self, payload, reply_func): - self.payloads.append(payload) - yield reply_func(payload) - self.payload_ack.notify() - # TODO: better name... @salt.ext.tornado.gen.coroutine def _post_master_init(self, master): @@ -1599,6 +1580,7 @@ def _load_modules( return functions, returners, errors, executors def _send_req_sync(self, load, timeout): + if self.opts["minion_sign_messages"]: log.trace("Signing event to be published onto the bus.") minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem") @@ -1607,11 +1589,9 @@ def _send_req_sync(self, load, timeout): ) load["sig"] = sig - with salt.utils.event.get_event( - "minion", opts=self.opts, listen=False - ) as event: - return event.fire_event( - load, "__master_req_channel_payload", timeout=timeout + with salt.channel.client.ReqChannel.factory(self.opts) as channel: + return channel.send( + load, timeout=timeout, tries=self.opts["return_retry_tries"] ) @salt.ext.tornado.gen.coroutine @@ -1624,11 +1604,9 @@ def _send_req_async(self, load, timeout): ) load["sig"] = sig - with salt.utils.event.get_event( - "minion", opts=self.opts, listen=False - ) as event: - ret = yield event.fire_event_async( - load, "__master_req_channel_payload", timeout=timeout + with salt.channel.client.AsyncReqChannel.factory(self.opts) as channel: + ret = yield channel.send( + load, timeout=timeout, tries=self.opts["return_retry_tries"] ) raise salt.ext.tornado.gen.Return(ret) @@ -2055,7 +2033,7 @@ def _thread_return(cls, minion_instance, opts, data): minion_instance._return_pub(ret) # Add default returners from minion config - # Should have been converted to comma-delimited string already + # Should have been coverted to comma-delimited string already if isinstance(opts.get("return"), str): if data["ret"]: data["ret"] = ",".join((data["ret"], opts["return"])) @@ -2662,7 +2640,6 @@ def _mine_send(self, tag, data): """ Send mine data to the master """ - # Consider using a long-running req channel to send mine data with salt.channel.client.ReqChannel.factory(self.opts) as channel: data["tok"] = self.tok try: @@ -2699,12 +2676,6 @@ def handle_event(self, package): force_refresh=data.get("force_refresh", False), notify=data.get("notify", False), ) - elif tag.startswith("__master_req_channel_payload"): - yield _minion.req_channel.send( - data, - timeout=_minion._return_retry_timer(), - tries=_minion.opts["return_retry_tries"], - ) elif tag.startswith("pillar_refresh"): yield _minion.pillar_refresh( force_refresh=data.get("force_refresh", False), @@ -3175,7 +3146,7 @@ def _handle_payload(self, payload): if self._target_load(payload["load"]): self._handle_decoded_payload(payload["load"]) elif self.opts["zmq_filtering"]: - # In the filtering enabled case, we'd like to know when minion sees something it shouldn't + # In the filtering enabled case, we'd like to know when minion sees something it shouldnt log.trace( "Broadcast message received not for this minion, Load: %s", payload["load"], diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 3a3f0c7a5f2..cee100b086e 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -208,10 +208,7 @@ def return_message(msg): log.error("Exception occurred while handling stream: %s", exc) def handle_connection(self, connection, address): - log.trace( - "IPCServer: Handling connection to address: %s", - address if address else connection, - ) + log.trace("IPCServer: Handling connection to address: %s", address) try: with salt.utils.asynchronous.current_ioloop(self.io_loop): stream = IOStream( diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index 0c645bbc3bb..88596a4a20b 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -34,7 +34,7 @@ class SyncWrapper: This is uses as a simple wrapper, for example: asynchronous = AsyncClass() - # this method would regularly return a future + # this method would reguarly return a future future = asynchronous.async_method() sync = SyncWrapper(async_factory_method, (arg1, arg2), {'kwarg1': 'val'}) diff --git a/tests/pytests/functional/transport/server/test_req_channel.py b/tests/pytests/functional/transport/server/test_req_channel.py index 4a74802a0d0..555c040c1ca 100644 --- a/tests/pytests/functional/transport/server/test_req_channel.py +++ b/tests/pytests/functional/transport/server/test_req_channel.py @@ -124,7 +124,7 @@ def req_channel_crypt(request): @pytest.fixture -def push_channel(req_server_channel, salt_minion, req_channel_crypt): +def req_channel(req_server_channel, salt_minion, req_channel_crypt): with salt.channel.client.ReqChannel.factory( salt_minion.config, crypt=req_channel_crypt ) as _req_channel: @@ -135,7 +135,7 @@ def push_channel(req_server_channel, salt_minion, req_channel_crypt): _req_channel.obj._refcount = 0 -def test_basic(push_channel): +def test_basic(req_channel): """ Test a variety of messages, make sure we get the expected responses """ @@ -145,11 +145,11 @@ def test_basic(push_channel): {"baz": "qux", "list": [1, 2, 3]}, ] for msg in msgs: - ret = push_channel.send(dict(msg), timeout=5, tries=1) + ret = req_channel.send(dict(msg), timeout=5, tries=1) assert ret["load"] == msg -def test_normalization(push_channel): +def test_normalization(req_channel): """ Since we use msgpack, we need to test that list types are converted to lists """ @@ -160,21 +160,21 @@ def test_normalization(push_channel): {"list": tuple([1, 2, 3])}, ] for msg in msgs: - ret = push_channel.send(msg, timeout=5, tries=1) + ret = req_channel.send(msg, timeout=5, tries=1) for key, value in ret["load"].items(): assert types[key] == type(value) -def test_badload(push_channel, req_channel_crypt): +def test_badload(req_channel, req_channel_crypt): """ Test a variety of bad requests, make sure that we get some sort of error """ msgs = ["", [], tuple()] if req_channel_crypt == "clear": for msg in msgs: - ret = push_channel.send(msg, timeout=5, tries=1) + ret = req_channel.send(msg, timeout=5, tries=1) assert ret == "payload and load must be a dict" else: for msg in msgs: with pytest.raises(salt.exceptions.AuthenticationError): - push_channel.send(msg, timeout=5, tries=1) + req_channel.send(msg, timeout=5, tries=1) diff --git a/tests/pytests/unit/test_minion.py b/tests/pytests/unit/test_minion.py index 1cee025a485..4508eaee956 100644 --- a/tests/pytests/unit/test_minion.py +++ b/tests/pytests/unit/test_minion.py @@ -55,27 +55,26 @@ def test_minion_load_grains_default(): @pytest.mark.parametrize( - "event", + "req_channel", [ ( - "fire_event", - lambda data, tag, cb=None, timeout=60: True, + "salt.channel.client.AsyncReqChannel.factory", + lambda load, timeout, tries: salt.ext.tornado.gen.maybe_future(tries), ), ( - "fire_event_async", - lambda data, tag, cb=None, timeout=60: salt.ext.tornado.gen.maybe_future( - True - ), + "salt.channel.client.ReqChannel.factory", + lambda load, timeout, tries: tries, ), ], ) -def test_send_req_fires_completion_event(event, minion_opts): - event_enter = MagicMock() - event_enter.send.side_effect = event[1] - event = MagicMock() - event.__enter__.return_value = event_enter +def test_send_req_tries(req_channel, minion_opts): + channel_enter = MagicMock() + channel_enter.send.side_effect = req_channel[1] + channel = MagicMock() + channel.__enter__.return_value = channel_enter - with patch("salt.utils.event.get_event", return_value=event): + with patch(req_channel[0], return_value=channel): + minion_opts = salt.config.DEFAULT_MINION_OPTS.copy() minion_opts["random_startup_delay"] = 0 minion_opts["return_retry_tries"] = 30 minion_opts["grains"] = {} @@ -85,62 +84,16 @@ def test_send_req_fires_completion_event(event, minion_opts): load = {"load": "value"} timeout = 60 - # XXX This is buggy because "async" in event[0] will never evaluate - # to True and if it *did* evaluate to true the test would fail - # because you Mock isn't a co-routine. - if "async" in event[0]: + if "Async" in req_channel[0]: rtn = minion._send_req_async(load, timeout).result() else: rtn = minion._send_req_sync(load, timeout) - # get the - for idx, call in enumerate(event.mock_calls, 1): - if "fire_event" in call[0]: - condition_event_tag = ( - len(call.args) > 1 - and call.args[1] == "__master_req_channel_payload" - ) - condition_event_tag_error = "{} != {}; Call(number={}): {}".format( - idx, call, call.args[1], "__master_req_channel_payload" - ) - condition_timeout = ( - len(call.kwargs) == 1 and call.kwargs["timeout"] == timeout - ) - condition_timeout_error = "{} != {}; Call(number={}): {}".format( - idx, call, call.kwargs["timeout"], timeout - ) - - fire_event_called = True - assert condition_event_tag, condition_event_tag_error - assert condition_timeout, condition_timeout_error - - assert fire_event_called - assert rtn - - -async def test_send_req_async_regression_62453(minion_opts): - event_enter = MagicMock() - event_enter.send.side_effect = ( - lambda data, tag, cb=None, timeout=60: salt.ext.tornado.gen.maybe_future(True) - ) - event = MagicMock() - event.__enter__.return_value = event_enter - - minion_opts["random_startup_delay"] = 0 - minion_opts["return_retry_tries"] = 30 - minion_opts["grains"] = {} - with patch("salt.loader.grains"): - minion = salt.minion.Minion(minion_opts) - - load = {"load": "value"} - timeout = 60 - - # We are just validating no exception is raised - rtn = await minion._send_req_async(load, timeout) - assert rtn is False + assert rtn == 30 -def test_mine_send_tries(): +@patch("salt.channel.client.ReqChannel.factory") +def test_mine_send_tries(req_channel_factory): channel_enter = MagicMock() channel_enter.send.side_effect = lambda load, timeout, tries: tries channel = MagicMock()