diff --git a/tests/test_subshells.py b/tests/test_subshells.py index 42082276..f119a787 100644 --- a/tests/test_subshells.py +++ b/tests/test_subshells.py @@ -40,19 +40,26 @@ def list_subshell_helper(kc: BlockingKernelClient): return reply["content"] -def execute_request_subshell_id(kc: BlockingKernelClient, code: str, subshell_id: str | None): +def execute_request_subshell_id( + kc: BlockingKernelClient, code: str, subshell_id: str | None, terminator: str = "\n" +): msg = kc.session.msg("execute_request", {"code": code}) msg["header"]["subshell_id"] = subshell_id msg_id = msg["msg_id"] kc.shell_channel.send(msg) + stdout = "" while True: msg = kc.get_iopub_msg() - # Get the stream message corresponding to msg_id - if msg["msg_type"] == "stream" and msg["parent_header"]["msg_id"] == msg_id: - content = msg["content"] - # assert content["name"] == "stdout" - break - return content["text"].strip() + # Get the stream messages corresponding to msg_id + if ( + msg["msg_type"] == "stream" + and msg["parent_header"]["msg_id"] == msg_id + and msg["content"]["name"] == "stdout" + ): + stdout += msg["content"]["text"] + if stdout.endswith(terminator): + break + return stdout.strip() def execute_thread_count(kc: BlockingKernelClient) -> int: @@ -118,15 +125,53 @@ def test_thread_ids(): delete_subshell_helper(kc, subshell_id) +@pytest.mark.parametrize("are_subshells", [(False, True), (True, False), (True, True)]) +@pytest.mark.parametrize("overlap", [True, False]) +def test_run_concurrently_sequence(are_subshells, overlap): + with kernel() as kc: + subshell_ids = [ + create_subshell_helper(kc)["subshell_id"] if is_subshell else None + for is_subshell in are_subshells + ] + if overlap: + codes = [ + "import time; start0=True; end0=False; time.sleep(0.2); end0=True", + "assert start0; assert not end0; time.sleep(0.2); assert end0", + ] + else: + codes = [ + "import time; start0=True; end0=False; time.sleep(0.2); assert end1", + "assert start0; assert not end0; end1=True", + ] + + msgs = [] + for subshell_id, code in zip(subshell_ids, codes): + msg = kc.session.msg("execute_request", {"code": code}) + msg["header"]["subshell_id"] = subshell_id + kc.shell_channel.send(msg) + msgs.append(msg) + if len(msgs) == 1: + time.sleep(0.1) # Wait for first execute_request to start. + + replies = get_replies(kc, [msg["msg_id"] for msg in msgs]) + + for subshell_id in subshell_ids: + if subshell_id: + delete_subshell_helper(kc, subshell_id) + + for reply in replies: + assert reply["content"]["status"] == "ok" + + @pytest.mark.parametrize("include_main_shell", [True, False]) -def test_run_concurrently(include_main_shell): +def test_run_concurrently_timing(include_main_shell): with kernel() as kc: subshell_ids = [ None if include_main_shell else create_subshell_helper(kc)["subshell_id"], create_subshell_helper(kc)["subshell_id"], ] - times = (0.05, 0.05) + times = (0.2, 0.2) # Prepare messages, times are sleep times in seconds. # Identical times for both subshells is a harder test as preparing and sending # the execute_reply messages may overlap. @@ -145,14 +190,18 @@ def test_run_concurrently(include_main_shell): _ = get_replies(kc, [msg["msg_id"] for msg in msgs]) end = datetime.now() - duration = end - start - assert duration >= timedelta(seconds=max(times)) - assert duration < timedelta(seconds=sum(times)) - for subshell_id in subshell_ids: if subshell_id: delete_subshell_helper(kc, subshell_id) + duration = end - start + assert duration >= timedelta(seconds=max(times)) + # Care is needed with this test as runtime conditions such as gathering + # coverage can slow it down causing the following assert to fail. + # The sleep time of 0.2 is empirically determined to run OK in CI, but + # consider increasing it if the following fails. + assert duration < timedelta(seconds=sum(times)) + def test_execution_count(): with kernel() as kc: @@ -173,12 +222,12 @@ def test_execution_count(): # Wait for replies, may be in any order. replies = get_replies(kc, [msg["msg_id"] for msg in msgs]) + delete_subshell_helper(kc, subshell_id) + execution_counts = [r["content"]["execution_count"] for r in replies] ec = execution_counts[0] assert execution_counts == [ec, ec - 1, ec + 2, ec + 1] - delete_subshell_helper(kc, subshell_id) - def test_create_while_execute(): with kernel() as kc: diff --git a/tests/utils.py b/tests/utils.py index c7acbae9..b20e8fcb 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -75,15 +75,13 @@ def get_replies(kc, msg_ids: list[str], timeout=TIMEOUT, channel="shell"): t0 = time() count = 0 replies = [None] * len(msg_ids) - while True: + while count < len(msg_ids): get_msg = getattr(kc, f"get_{channel}_msg") reply = get_msg(timeout=timeout) try: msg_id = reply["parent_header"]["msg_id"] replies[msg_ids.index(msg_id)] = reply count += 1 - if count == len(msg_ids): - break except ValueError: # Allow debugging ignored replies print(f"Ignoring reply not to any of {msg_ids}: {reply}")