Skip to content

Commit

Permalink
Revert usage of long running REQ channel (bsc#1213960, bsc#1213630, b…
Browse files Browse the repository at this point in the history
…sc#1213257)

* Revert usage of long running REQ channel (bsc#1213960, bsc#1213630, bsc#1213257)

This reverts commits:
  saltstack/salt@a99ffb5
  saltstack/salt@80ae518
  saltstack/salt@3c7e1ec
  saltstack/salt@171926c

From this PR: saltstack/salt#61468

See: saltstack/salt#62959 (comment)

* Revert "Fix linter"

This reverts commit d09d2d3.

* Revert "add a regression test"

This reverts commit b2c32be.

* Fix failing tests after reverting commits
  • Loading branch information
meaksh authored Sep 1, 2023
1 parent 7051f86 commit 3cc2aee
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 127 deletions.
8 changes: 2 additions & 6 deletions doc/topics/development/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,11 @@ the received message.
4) The new minion thread is created. The _thread_return() function starts up
and actually calls out to the requested function contained in the job.
5) The requested function runs and returns a result. [Still in thread.]
6) The result of the function that's run is published on the minion's local event bus with event
tag "__master_req_channel_payload" [Still in thread.]
6) The result of the function that's run is encrypted and returned to the
master's ReqServer (TCP 4506 on master). [Still in thread.]
7) Thread exits. Because the main thread was only blocked for the time that it
took to initialize the worker thread, many other requests could have been
received and processed during this time.
8) Minion event handler gets the event with tag "__master_req_channel_payload"
and sends the payload to master's ReqServer (TCP 4506 on master), via the long-running async request channel
that was opened when minion first started up.



A Note on ClearFuncs vs. AESFuncs
Expand Down
9 changes: 2 additions & 7 deletions salt/channel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class AsyncReqChannel:
"_crypted_transfer",
"_uncrypted_transfer",
"send",
"connect",
]
close_methods = [
"close",
Expand Down Expand Up @@ -128,7 +127,7 @@ def factory(cls, opts, **kwargs):
else:
auth = None

transport = salt.transport.request_client(opts, io_loop=io_loop)
transport = salt.transport.request_client(opts, io_loop)
return cls(opts, transport, auth)

def __init__(self, opts, transport, auth, **kwargs):
Expand Down Expand Up @@ -271,10 +270,6 @@ def _uncrypted_transfer(self, load, timeout=60):

raise salt.ext.tornado.gen.Return(ret)

@salt.ext.tornado.gen.coroutine
def connect(self):
yield self.transport.connect()

@salt.ext.tornado.gen.coroutine
def send(self, load, tries=3, timeout=60, raw=False):
"""
Expand All @@ -295,7 +290,7 @@ def send(self, load, tries=3, timeout=60, raw=False):
ret = yield self._crypted_transfer(load, timeout=timeout, raw=raw)
break
except Exception as exc: # pylint: disable=broad-except
log.trace("Failed to send msg %r", exc)
log.error("Failed to send msg %r", exc)
if _try >= tries:
raise
else:
Expand Down
47 changes: 9 additions & 38 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1361,30 +1361,11 @@ def connect_master(self, failed=False):
"""
Return a future which will complete when you are connected to a master
"""
# Consider refactoring so that eval_master does not have a subtle side-effect on the contents of the opts array
master, self.pub_channel = yield self.eval_master(
self.opts, self.timeout, self.safe, failed
)

# a long-running req channel
self.req_channel = salt.transport.client.AsyncReqChannel.factory(
self.opts, io_loop=self.io_loop
)

if hasattr(
self.req_channel, "connect"
): # TODO: consider generalizing this for all channels
log.debug("Connecting minion's long-running req channel")
yield self.req_channel.connect()

yield self._post_master_init(master)

@salt.ext.tornado.gen.coroutine
def handle_payload(self, payload, reply_func):
self.payloads.append(payload)
yield reply_func(payload)
self.payload_ack.notify()

# TODO: better name...
@salt.ext.tornado.gen.coroutine
def _post_master_init(self, master):
Expand Down Expand Up @@ -1599,6 +1580,7 @@ def _load_modules(
return functions, returners, errors, executors

def _send_req_sync(self, load, timeout):

if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
Expand All @@ -1607,11 +1589,9 @@ def _send_req_sync(self, load, timeout):
)
load["sig"] = sig

with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
) as event:
return event.fire_event(
load, "__master_req_channel_payload", timeout=timeout
with salt.channel.client.ReqChannel.factory(self.opts) as channel:
return channel.send(
load, timeout=timeout, tries=self.opts["return_retry_tries"]
)

@salt.ext.tornado.gen.coroutine
Expand All @@ -1624,11 +1604,9 @@ def _send_req_async(self, load, timeout):
)
load["sig"] = sig

with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
) as event:
ret = yield event.fire_event_async(
load, "__master_req_channel_payload", timeout=timeout
with salt.channel.client.AsyncReqChannel.factory(self.opts) as channel:
ret = yield channel.send(
load, timeout=timeout, tries=self.opts["return_retry_tries"]
)
raise salt.ext.tornado.gen.Return(ret)

Expand Down Expand Up @@ -2055,7 +2033,7 @@ def _thread_return(cls, minion_instance, opts, data):
minion_instance._return_pub(ret)

# Add default returners from minion config
# Should have been converted to comma-delimited string already
# Should have been coverted to comma-delimited string already
if isinstance(opts.get("return"), str):
if data["ret"]:
data["ret"] = ",".join((data["ret"], opts["return"]))
Expand Down Expand Up @@ -2662,7 +2640,6 @@ def _mine_send(self, tag, data):
"""
Send mine data to the master
"""
# Consider using a long-running req channel to send mine data
with salt.channel.client.ReqChannel.factory(self.opts) as channel:
data["tok"] = self.tok
try:
Expand Down Expand Up @@ -2699,12 +2676,6 @@ def handle_event(self, package):
force_refresh=data.get("force_refresh", False),
notify=data.get("notify", False),
)
elif tag.startswith("__master_req_channel_payload"):
yield _minion.req_channel.send(
data,
timeout=_minion._return_retry_timer(),
tries=_minion.opts["return_retry_tries"],
)
elif tag.startswith("pillar_refresh"):
yield _minion.pillar_refresh(
force_refresh=data.get("force_refresh", False),
Expand Down Expand Up @@ -3175,7 +3146,7 @@ def _handle_payload(self, payload):
if self._target_load(payload["load"]):
self._handle_decoded_payload(payload["load"])
elif self.opts["zmq_filtering"]:
# In the filtering enabled case, we'd like to know when minion sees something it shouldn't
# In the filtering enabled case, we'd like to know when minion sees something it shouldnt
log.trace(
"Broadcast message received not for this minion, Load: %s",
payload["load"],
Expand Down
5 changes: 1 addition & 4 deletions salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,7 @@ def return_message(msg):
log.error("Exception occurred while handling stream: %s", exc)

def handle_connection(self, connection, address):
log.trace(
"IPCServer: Handling connection to address: %s",
address if address else connection,
)
log.trace("IPCServer: Handling connection to address: %s", address)
try:
with salt.utils.asynchronous.current_ioloop(self.io_loop):
stream = IOStream(
Expand Down
2 changes: 1 addition & 1 deletion salt/utils/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class SyncWrapper:
This is uses as a simple wrapper, for example:
asynchronous = AsyncClass()
# this method would regularly return a future
# this method would reguarly return a future
future = asynchronous.async_method()
sync = SyncWrapper(async_factory_method, (arg1, arg2), {'kwarg1': 'val'})
Expand Down
16 changes: 8 additions & 8 deletions tests/pytests/functional/transport/server/test_req_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def req_channel_crypt(request):


@pytest.fixture
def push_channel(req_server_channel, salt_minion, req_channel_crypt):
def req_channel(req_server_channel, salt_minion, req_channel_crypt):
with salt.channel.client.ReqChannel.factory(
salt_minion.config, crypt=req_channel_crypt
) as _req_channel:
Expand All @@ -135,7 +135,7 @@ def push_channel(req_server_channel, salt_minion, req_channel_crypt):
_req_channel.obj._refcount = 0


def test_basic(push_channel):
def test_basic(req_channel):
"""
Test a variety of messages, make sure we get the expected responses
"""
Expand All @@ -145,11 +145,11 @@ def test_basic(push_channel):
{"baz": "qux", "list": [1, 2, 3]},
]
for msg in msgs:
ret = push_channel.send(dict(msg), timeout=5, tries=1)
ret = req_channel.send(dict(msg), timeout=5, tries=1)
assert ret["load"] == msg


def test_normalization(push_channel):
def test_normalization(req_channel):
"""
Since we use msgpack, we need to test that list types are converted to lists
"""
Expand All @@ -160,21 +160,21 @@ def test_normalization(push_channel):
{"list": tuple([1, 2, 3])},
]
for msg in msgs:
ret = push_channel.send(msg, timeout=5, tries=1)
ret = req_channel.send(msg, timeout=5, tries=1)
for key, value in ret["load"].items():
assert types[key] == type(value)


def test_badload(push_channel, req_channel_crypt):
def test_badload(req_channel, req_channel_crypt):
"""
Test a variety of bad requests, make sure that we get some sort of error
"""
msgs = ["", [], tuple()]
if req_channel_crypt == "clear":
for msg in msgs:
ret = push_channel.send(msg, timeout=5, tries=1)
ret = req_channel.send(msg, timeout=5, tries=1)
assert ret == "payload and load must be a dict"
else:
for msg in msgs:
with pytest.raises(salt.exceptions.AuthenticationError):
push_channel.send(msg, timeout=5, tries=1)
req_channel.send(msg, timeout=5, tries=1)
79 changes: 16 additions & 63 deletions tests/pytests/unit/test_minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,26 @@ def test_minion_load_grains_default():


@pytest.mark.parametrize(
"event",
"req_channel",
[
(
"fire_event",
lambda data, tag, cb=None, timeout=60: True,
"salt.channel.client.AsyncReqChannel.factory",
lambda load, timeout, tries: salt.ext.tornado.gen.maybe_future(tries),
),
(
"fire_event_async",
lambda data, tag, cb=None, timeout=60: salt.ext.tornado.gen.maybe_future(
True
),
"salt.channel.client.ReqChannel.factory",
lambda load, timeout, tries: tries,
),
],
)
def test_send_req_fires_completion_event(event, minion_opts):
event_enter = MagicMock()
event_enter.send.side_effect = event[1]
event = MagicMock()
event.__enter__.return_value = event_enter
def test_send_req_tries(req_channel, minion_opts):
channel_enter = MagicMock()
channel_enter.send.side_effect = req_channel[1]
channel = MagicMock()
channel.__enter__.return_value = channel_enter

with patch("salt.utils.event.get_event", return_value=event):
with patch(req_channel[0], return_value=channel):
minion_opts = salt.config.DEFAULT_MINION_OPTS.copy()
minion_opts["random_startup_delay"] = 0
minion_opts["return_retry_tries"] = 30
minion_opts["grains"] = {}
Expand All @@ -85,62 +84,16 @@ def test_send_req_fires_completion_event(event, minion_opts):
load = {"load": "value"}
timeout = 60

# XXX This is buggy because "async" in event[0] will never evaluate
# to True and if it *did* evaluate to true the test would fail
# because you Mock isn't a co-routine.
if "async" in event[0]:
if "Async" in req_channel[0]:
rtn = minion._send_req_async(load, timeout).result()
else:
rtn = minion._send_req_sync(load, timeout)

# get the
for idx, call in enumerate(event.mock_calls, 1):
if "fire_event" in call[0]:
condition_event_tag = (
len(call.args) > 1
and call.args[1] == "__master_req_channel_payload"
)
condition_event_tag_error = "{} != {}; Call(number={}): {}".format(
idx, call, call.args[1], "__master_req_channel_payload"
)
condition_timeout = (
len(call.kwargs) == 1 and call.kwargs["timeout"] == timeout
)
condition_timeout_error = "{} != {}; Call(number={}): {}".format(
idx, call, call.kwargs["timeout"], timeout
)

fire_event_called = True
assert condition_event_tag, condition_event_tag_error
assert condition_timeout, condition_timeout_error

assert fire_event_called
assert rtn


async def test_send_req_async_regression_62453(minion_opts):
event_enter = MagicMock()
event_enter.send.side_effect = (
lambda data, tag, cb=None, timeout=60: salt.ext.tornado.gen.maybe_future(True)
)
event = MagicMock()
event.__enter__.return_value = event_enter

minion_opts["random_startup_delay"] = 0
minion_opts["return_retry_tries"] = 30
minion_opts["grains"] = {}
with patch("salt.loader.grains"):
minion = salt.minion.Minion(minion_opts)

load = {"load": "value"}
timeout = 60

# We are just validating no exception is raised
rtn = await minion._send_req_async(load, timeout)
assert rtn is False
assert rtn == 30


def test_mine_send_tries():
@patch("salt.channel.client.ReqChannel.factory")
def test_mine_send_tries(req_channel_factory):
channel_enter = MagicMock()
channel_enter.send.side_effect = lambda load, timeout, tries: tries
channel = MagicMock()
Expand Down

0 comments on commit 3cc2aee

Please sign in to comment.