diff --git a/python/ray/_private/ray_experimental_perf.py b/python/ray/_private/ray_experimental_perf.py index b46408c2abe1..2b07b71793ef 100644 --- a/python/ray/_private/ray_experimental_perf.py +++ b/python/ray/_private/ray_experimental_perf.py @@ -180,6 +180,7 @@ async def _exec_async(): results += timeit( "[unstable] compiled single-actor DAG calls", lambda: _exec(compiled_dag) ) + compiled_dag.teardown() del a # Single-actor asyncio DAG calls @@ -193,6 +194,10 @@ async def _exec_async(): "[unstable] compiled single-actor asyncio DAG calls", ) ) + # TODO: Need to explicitly tear down DAGs with enable_asyncio=True because + # these DAGs create a background thread that can segfault if the CoreWorker + # is torn down first. + compiled_dag.teardown() del a # Scatter-gather DAG calls @@ -210,6 +215,7 @@ async def _exec_async(): f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors", lambda: _exec(compiled_dag), ) + compiled_dag.teardown() # Scatter-gather asyncio DAG calls @@ -222,6 +228,10 @@ async def _exec_async(): f"[unstable] compiled scatter-gather asyncio DAG calls, n={n_cpu} actors", ) ) + # TODO: Need to explicitly tear down DAGs with enable_asyncio=True because + # these DAGs create a background thread that can segfault if the CoreWorker + # is torn down first. + compiled_dag.teardown() # Chain DAG calls @@ -239,6 +249,7 @@ async def _exec_async(): f"[unstable] compiled chain DAG calls, n={n_cpu} actors", lambda: _exec(compiled_dag), ) + compiled_dag.teardown() # Chain asyncio DAG calls @@ -251,6 +262,10 @@ async def _exec_async(): results += loop.run_until_complete( exec_async(f"[unstable] compiled chain asyncio DAG calls, n={n_cpu} actors") ) + # TODO: Need to explicitly tear down DAGs with enable_asyncio=True because + # these DAGs create a background thread that can segfault if the CoreWorker + # is torn down first. + compiled_dag.teardown() # Multiple args with small payloads @@ -273,6 +288,7 @@ async def _exec_async(): f"n={n_actors} actors", lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size), ) + compiled_dag.teardown() # Multiple args with medium payloads @@ -290,6 +306,7 @@ async def _exec_async(): f"n={n_actors} actors", lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size), ) + compiled_dag.teardown() # Multiple args with large payloads @@ -307,6 +324,7 @@ async def _exec_async(): f"n={n_actors} actors", lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size), ) + compiled_dag.teardown() # Worst case for multiple arguments: a single actor takes all the arguments # with small payloads. @@ -327,6 +345,7 @@ async def _exec_async(): "n=1 actors", lambda: _exec(compiled_dag, num_args=n_args, payload_size=payload_size), ) + compiled_dag.teardown() ray.shutdown() diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index b7a85221b391..0abfb5757692 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1874,11 +1874,6 @@ def shutdown(_exiting_interpreter: bool = False): and false otherwise. If we are exiting the interpreter, we will wait a little while to print any extra error messages. """ - # Make sure to clean up compiled dag node if exists. - from ray.dag.compiled_dag_node import _shutdown_all_compiled_dags - - _shutdown_all_compiled_dags() - if _exiting_interpreter and global_worker.mode == SCRIPT_MODE: # This is a duration to sleep before shutting down everything in order # to make sure that log messages finish printing. diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 8049d22dfb3e..692a155bc114 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1,4 +1,3 @@ -import weakref import asyncio from collections import defaultdict from dataclasses import dataclass, asdict @@ -9,7 +8,6 @@ import uuid import traceback -import ray.exceptions from ray.experimental.channel.cached_channel import CachedChannel from ray.experimental.channel.gpu_communicator import GPUCommunicator import ray @@ -54,21 +52,6 @@ logger = logging.getLogger(__name__) -# Keep tracking of every compiled dag created during the lifetime of -# this process. It tracks them as weakref meaning when the compiled dag -# is GC'ed, it is automatically removed from here. It is used to teardown -# compiled dags at interpret shutdown time. -_compiled_dags = weakref.WeakValueDictionary() - - -# Relying on __del__ doesn't work well upon shutdown because -# the destructor order is not guaranteed. We call this function -# upon `ray.worker.shutdown` which is registered to atexit handler -# so that teardown is properly called before objects are destructed. -def _shutdown_all_compiled_dags(): - for _, compiled_dag in _compiled_dags.items(): - compiled_dag.teardown() - @DeveloperAPI def do_allocate_channel( @@ -1665,7 +1648,7 @@ def _is_same_actor(idx1: int, idx2: int) -> bool: return False def _monitor_failures(self): - outer = weakref.proxy(self) + outer = self class Monitor(threading.Thread): def __init__(self): @@ -1674,8 +1657,6 @@ def __init__(self): # Lock to make sure that we only perform teardown for this DAG # once. self.in_teardown_lock = threading.Lock() - self.name = "CompiledGraphMonitorThread" - self._teardown_done = False def wait_teardown(self): for actor, ref in outer.worker_task_refs.items(): @@ -1705,9 +1686,6 @@ def wait_teardown(self): def teardown(self, wait: bool): do_teardown = False with self.in_teardown_lock: - if self._teardown_done: - return - if not self.in_teardown: do_teardown = True self.in_teardown = True @@ -1731,11 +1709,9 @@ def teardown(self, wait: bool): ] for cancel_ref in cancel_refs: try: + # TODO(swang): Suppress exceptions from actors trying to + # read closed channels when DAG is being torn down. ray.get(cancel_ref, timeout=30) - except ray.exceptions.RayChannelError: - # Channel error happens when a channel is closed - # or timed out. In this case, do not log. - pass except Exception: logger.exception("Error cancelling worker task") pass @@ -1748,9 +1724,6 @@ def teardown(self, wait: bool): self.wait_teardown() logger.info("Teardown complete") - with self.in_teardown_lock: - self._teardown_done = True - def run(self): try: ray.get(list(outer.worker_task_refs.values())) @@ -2167,7 +2140,11 @@ def teardown(self): def __del__(self): monitor = getattr(self, "_monitor", None) if monitor is not None: - monitor.teardown(wait=True) + # Teardown asynchronously. + # NOTE(swang): Somehow, this can get called after the CoreWorker + # has already been destructed, so it is not safe to block in + # ray.get. + monitor.teardown(wait=False) @DeveloperAPI @@ -2196,6 +2173,4 @@ def _build_compiled_dag(node): root = dag._find_root() root.traverse_and_apply(_build_compiled_dag) compiled_dag._get_or_compile() - global _compiled_dags - _compiled_dags[compiled_dag.get_id()] = compiled_dag return compiled_dag diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 38661e1a73ad..7970f0dbf541 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -13,8 +13,6 @@ import pytest - -from ray._private.test_utils import run_string_as_driver from ray.exceptions import RayChannelError, RayChannelTimeoutError import ray import ray._private @@ -173,6 +171,10 @@ def test_basic(ray_start_regular): # Delete the buffer so that the next DAG output can be written. del result + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() + def test_two_returns_first(ray_start_regular): a = Actor.remote(0) @@ -185,6 +187,8 @@ def test_two_returns_first(ray_start_regular): res = ray.get(compiled_dag.execute(1)) assert res == 1 + compiled_dag.teardown() + def test_two_returns_second(ray_start_regular): a = Actor.remote(0) @@ -197,6 +201,8 @@ def test_two_returns_second(ray_start_regular): res = ray.get(compiled_dag.execute(1)) assert res == 2 + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_two_returns_one_reader(ray_start_regular, single_fetch): @@ -219,6 +225,8 @@ def test_two_returns_one_reader(ray_start_regular, single_fetch): res = ray.get(refs) assert res == [1, 2] + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_two_returns_two_readers(ray_start_regular, single_fetch): @@ -242,6 +250,8 @@ def test_two_returns_two_readers(ray_start_regular, single_fetch): res = ray.get(refs) assert res == [1, 2] + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_inc_two_returns(ray_start_regular, single_fetch): @@ -261,6 +271,8 @@ def test_inc_two_returns(ray_start_regular, single_fetch): res = ray.get(refs) assert res == [i + 1, i + 2] + compiled_dag.teardown() + def test_two_as_one_return(ray_start_regular): a = Actor.remote(0) @@ -273,6 +285,8 @@ def test_two_as_one_return(ray_start_regular): res = ray.get(compiled_dag.execute(1)) assert res == (1, 2) + compiled_dag.teardown() + def test_multi_output_get_exception(ray_start_regular): a = Actor.remote(0) @@ -294,6 +308,8 @@ def test_multi_output_get_exception(ray_start_regular): ): ray.get(refs) + compiled_dag.teardown() + # TODO(wxdeng): Fix segfault. If this test is run, the following tests # will segfault. @@ -350,6 +366,8 @@ def test_kwargs_not_supported(ray_start_regular): compiled_dag = dag.experimental_compile() assert ray.get(compiled_dag.execute(2)) == 3 + compiled_dag.teardown() + def test_out_of_order_get(ray_start_regular): c = Collector.remote() @@ -366,6 +384,8 @@ def test_out_of_order_get(ray_start_regular): result_a = ray.get(ref_a) assert result_a == ["a"] + compiled_dag.teardown() + def test_actor_multi_methods(ray_start_regular): a = Actor.remote(0) @@ -378,6 +398,8 @@ def test_actor_multi_methods(ray_start_regular): result = ray.get(ref) assert result == 1 + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_actor_methods_execution_order(ray_start_regular, single_fetch): @@ -399,6 +421,8 @@ def test_actor_methods_execution_order(ray_start_regular, single_fetch): else: assert ray.get(refs) == [4, 1] + compiled_dag.teardown() + def test_actor_method_multi_binds(ray_start_regular): a = Actor.remote(0) @@ -411,6 +435,8 @@ def test_actor_method_multi_binds(ray_start_regular): result = ray.get(ref) assert result == 2 + compiled_dag.teardown() + def test_actor_method_bind_same_constant(ray_start_regular): a = Actor.remote(0) @@ -425,6 +451,8 @@ def test_actor_method_bind_same_constant(ray_start_regular): result = ray.get(ref) assert result == 5 + compiled_dag.teardown() + def test_actor_method_bind_same_input(ray_start_regular): actor = Actor.remote(0) @@ -441,6 +469,7 @@ def test_actor_method_bind_same_input(ray_start_regular): ref = compiled_dag.execute(i) result = ray.get(ref) assert result == expected[i] + compiled_dag.teardown() def test_actor_method_bind_same_input_attr(ray_start_regular): @@ -458,6 +487,7 @@ def test_actor_method_bind_same_input_attr(ray_start_regular): ref = compiled_dag.execute(i) result = ray.get(ref) assert result == expected[i] + compiled_dag.teardown() def test_actor_method_bind_diff_input_attr_1(ray_start_regular): @@ -479,6 +509,8 @@ def test_actor_method_bind_diff_input_attr_1(ray_start_regular): ref = compiled_dag.execute(2, 3) assert ray.get(ref) == [0, 1, 2, 4, 6, 9] + compiled_dag.teardown() + def test_actor_method_bind_diff_input_attr_2(ray_start_regular): actor = Actor.remote(0) @@ -501,6 +533,8 @@ def test_actor_method_bind_diff_input_attr_2(ray_start_regular): ref = compiled_dag.execute(2, 3) assert ray.get(ref) == [0, 0, 1, 2, 3, 5, 7, 9, 12] + compiled_dag.teardown() + def test_actor_method_bind_diff_input_attr_3(ray_start_regular): actor = Actor.remote(0) @@ -518,6 +552,8 @@ def test_actor_method_bind_diff_input_attr_3(ray_start_regular): ref = compiled_dag.execute(2, 3) assert ray.get(ref) == 9 + compiled_dag.teardown() + def test_actor_method_bind_diff_input_attr_4(ray_start_regular): actor = Actor.remote(0) @@ -536,6 +572,8 @@ def test_actor_method_bind_diff_input_attr_4(ray_start_regular): ref = compiled_dag.execute(2, 3, 4) assert ray.get(ref) == [1, 3, 6, 9, 14, 18] + compiled_dag.teardown() + def test_actor_method_bind_diff_input_attr_5(ray_start_regular): actor = Actor.remote(0) @@ -554,6 +592,8 @@ def test_actor_method_bind_diff_input_attr_5(ray_start_regular): ref = compiled_dag.execute(2, 3, 4) assert ray.get(ref) == [1, 3, 6, 10, 15, 21] + compiled_dag.teardown() + def test_actor_method_bind_diff_kwargs_input_attr(ray_start_regular): actor = Actor.remote(0) @@ -574,6 +614,8 @@ def test_actor_method_bind_diff_kwargs_input_attr(ray_start_regular): ref = compiled_dag.execute(x=2, y=3) assert ray.get(ref) == [0, 1, 2, 4, 6, 9] + compiled_dag.teardown() + def test_actor_method_bind_same_arg(ray_start_regular): a1 = Actor.remote(0) @@ -592,6 +634,7 @@ def test_actor_method_bind_same_arg(ray_start_regular): ref = compiled_dag.execute(i) result = ray.get(ref) assert result == expected[i] + compiled_dag.teardown() def test_mixed_bind_same_input(ray_start_regular): @@ -611,6 +654,7 @@ def test_mixed_bind_same_input(ray_start_regular): ref = compiled_dag.execute(i) result = ray.get(ref) assert result == expected[i] + compiled_dag.teardown() def test_regular_args(ray_start_regular): @@ -626,6 +670,8 @@ def test_regular_args(ray_start_regular): result = ray.get(ref) assert result == (i + 1) * 3 + compiled_dag.teardown() + class TestMultiArgs: def test_multi_args_basic(self, ray_start_regular): @@ -643,6 +689,8 @@ def test_multi_args_basic(self, ray_start_regular): result = ray.get(ref) assert result == [3, 2] + compiled_dag.teardown() + def test_multi_args_single_actor(self, ray_start_regular): c = Collector.remote() with InputNode() as i: @@ -677,6 +725,8 @@ def test_multi_args_single_actor(self, ray_start_regular): ): compiled_dag.execute(args=(2, 3)) + compiled_dag.teardown() + def test_multi_args_branch(self, ray_start_regular): a = Actor.remote(0) c = Collector.remote() @@ -690,6 +740,8 @@ def test_multi_args_branch(self, ray_start_regular): result = ray.get(ref) assert result == [2, 3] + compiled_dag.teardown() + def test_kwargs_basic(self, ray_start_regular): a1 = Actor.remote(0) a2 = Actor.remote(0) @@ -705,6 +757,8 @@ def test_kwargs_basic(self, ray_start_regular): result = ray.get(ref) assert result == [3, 2] + compiled_dag.teardown() + def test_kwargs_single_actor(self, ray_start_regular): c = Collector.remote() with InputNode() as i: @@ -737,6 +791,8 @@ def test_kwargs_single_actor(self, ray_start_regular): ): compiled_dag.execute(x=3) + compiled_dag.teardown() + def test_kwargs_branch(self, ray_start_regular): a = Actor.remote(0) c = Collector.remote() @@ -750,6 +806,8 @@ def test_kwargs_branch(self, ray_start_regular): result = ray.get(ref) assert result == [3, 2] + compiled_dag.teardown() + def test_multi_args_and_kwargs(self, ray_start_regular): a1 = Actor.remote(0) a2 = Actor.remote(0) @@ -765,6 +823,8 @@ def test_multi_args_and_kwargs(self, ray_start_regular): result = ray.get(ref) assert result == [3, 4, 2] + compiled_dag.teardown() + def test_multi_args_and_torch_type(self, ray_start_regular): a1 = Actor.remote(0) a2 = Actor.remote(0) @@ -788,6 +848,8 @@ def test_multi_args_and_torch_type(self, ray_start_regular): assert torch.equal(tensors[0], cpu_tensors[1]) assert torch.equal(tensors[1], cpu_tensors[0]) + compiled_dag.teardown() + def test_mix_entire_input_and_args(self, ray_start_regular): """ It is not allowed to consume both the entire input and a partial @@ -821,6 +883,8 @@ def test_multi_args_same_actor(self, ray_start_regular): result = ray.get(ref) assert result == [1, 3] + compiled_dag.teardown() + def test_multi_args_basic_asyncio(self, ray_start_regular): a1 = Actor.remote(0) a2 = Actor.remote(0) @@ -838,6 +902,7 @@ async def main(): loop = get_or_create_event_loop() loop.run_until_complete(asyncio.gather(main())) + compiled_dag.teardown() def test_multi_args_branch_asyncio(self, ray_start_regular): a = Actor.remote(0) @@ -855,6 +920,7 @@ async def main(): loop = get_or_create_event_loop() loop.run_until_complete(asyncio.gather(main())) + compiled_dag.teardown() def test_kwargs_basic_asyncio(self, ray_start_regular): a1 = Actor.remote(0) @@ -874,6 +940,7 @@ async def main(): loop = get_or_create_event_loop() loop.run_until_complete(asyncio.gather(main())) + compiled_dag.teardown() def test_kwargs_branch_asyncio(self, ray_start_regular): a = Actor.remote(0) @@ -891,6 +958,7 @@ async def main(): loop = get_or_create_event_loop() loop.run_until_complete(asyncio.gather(main())) + compiled_dag.teardown() def test_multi_args_and_kwargs_asyncio(self, ray_start_regular): a1 = Actor.remote(0) @@ -910,6 +978,7 @@ async def main(): loop = get_or_create_event_loop() loop.run_until_complete(asyncio.gather(main())) + compiled_dag.teardown() @pytest.mark.parametrize("num_actors", [1, 4]) @@ -933,6 +1002,8 @@ def test_scatter_gather_dag(ray_start_regular, num_actors, single_fetch): results = ray.get(refs) assert results == [i + 1] * num_actors + compiled_dag.teardown() + @pytest.mark.parametrize("num_actors", [1, 4]) def test_chain_dag(ray_start_regular, num_actors): @@ -949,6 +1020,8 @@ def test_chain_dag(ray_start_regular, num_actors): result = ray.get(ref) assert result == list(range(num_actors)) + compiled_dag.teardown() + def test_get_timeout(ray_start_regular): a = Actor.remote(0) @@ -970,6 +1043,8 @@ def test_get_timeout(ray_start_regular): timed_out = True assert timed_out + compiled_dag.teardown() + def test_buffered_get_timeout(ray_start_regular): a = Actor.remote(0) @@ -990,6 +1065,8 @@ def test_buffered_get_timeout(ray_start_regular): # be raised. ray.get(refs[-1], timeout=3.5) + compiled_dag.teardown() + def test_get_with_zero_timeout(ray_start_regular): a = Actor.remote(0) @@ -1004,6 +1081,8 @@ def test_get_with_zero_timeout(ray_start_regular): result = ray.get(ref, timeout=0) assert result == 1 + compiled_dag.teardown() + def test_dag_exception_basic(ray_start_regular, capsys): # Test application throwing exceptions with a single task. @@ -1029,6 +1108,8 @@ def test_dag_exception_basic(ray_start_regular, capsys): # Can use the DAG after exceptions are thrown. assert ray.get(compiled_dag.execute(1)) == 1 + compiled_dag.teardown() + def test_dag_exception_chained(ray_start_regular, capsys): # Test application throwing exceptions with a task that depends on another @@ -1056,6 +1137,8 @@ def test_dag_exception_chained(ray_start_regular, capsys): # Can use the DAG after exceptions are thrown. assert ray.get(compiled_dag.execute(1)) == 2 + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_dag_exception_multi_output(ray_start_regular, single_fetch, capsys): @@ -1103,6 +1186,8 @@ def test_dag_exception_multi_output(ray_start_regular, single_fetch, capsys): else: assert ray.get(refs) == [1, 1] + compiled_dag.teardown() + def test_dag_errors(ray_start_regular): a = Actor.remote(0) @@ -1196,6 +1281,7 @@ def f(x): ), ): ray.get(ref) + compiled_dag.teardown() class TestDAGExceptionCompileMultipleTimes: @@ -1254,7 +1340,8 @@ def test_compile_twice_with_multioutputnode_without_teardown( "object multiple times no matter whether `teardown` is called or not. " "Please reuse the existing compiled DAG or create a new one.", ): - compiled_dag = dag.experimental_compile() # noqa + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() def test_compile_twice_with_different_nodes(self, ray_start_regular): a = Actor.remote(0) @@ -1299,6 +1386,7 @@ def test_exceed_max_buffered_results(ray_start_regular): ray.get(ref) del refs + compiled_dag.teardown() @pytest.mark.parametrize("single_fetch", [True, False]) @@ -1337,6 +1425,7 @@ def test_exceed_max_buffered_results_multi_output(ray_start_regular, single_fetc ray.get(ref) del refs + compiled_dag.teardown() def test_compiled_dag_ref_del(ray_start_regular): @@ -1352,6 +1441,8 @@ def test_compiled_dag_ref_del(ray_start_regular): ref = compiled_dag.execute(1) del ref + compiled_dag.teardown() + def test_dag_fault_tolerance_chain(ray_start_regular): actors = [ @@ -1393,6 +1484,8 @@ def test_dag_fault_tolerance_chain(ray_start_regular): results = ray.get(ref) assert results == i + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_dag_fault_tolerance(ray_start_regular, single_fetch): @@ -1443,6 +1536,8 @@ def test_dag_fault_tolerance(ray_start_regular, single_fetch): else: ray.get(refs) + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_dag_fault_tolerance_sys_exit(ray_start_regular, single_fetch): @@ -1493,6 +1588,8 @@ def test_dag_fault_tolerance_sys_exit(ray_start_regular, single_fetch): else: ray.get(refs) + compiled_dag.teardown() + def test_dag_teardown_while_running(ray_start_regular): a = Actor.remote(0) @@ -1517,6 +1614,8 @@ def test_dag_teardown_while_running(ray_start_regular): result = ray.get(ref) assert result == 0.1 + compiled_dag.teardown() + @pytest.mark.parametrize("max_queue_size", [None, 2]) def test_asyncio(ray_start_regular, max_queue_size): @@ -1539,6 +1638,9 @@ async def main(i): assert (result == val).all() loop.run_until_complete(asyncio.gather(*[main(i) for i in range(10)])) + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() @pytest.mark.parametrize("max_queue_size", [None, 2]) @@ -1562,6 +1664,7 @@ async def main(): assert result_a == ["a"] loop.run_until_complete(main()) + compiled_dag.teardown() @pytest.mark.parametrize("max_queue_size", [None, 2]) @@ -1596,6 +1699,9 @@ async def main(i): assert (result == val).all() loop.run_until_complete(asyncio.gather(*[main(i) for i in range(10)])) + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() @pytest.mark.parametrize("max_queue_size", [None, 2]) @@ -1633,6 +1739,9 @@ async def main(): assert result == 2 loop.run_until_complete(main()) + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() class TestCompositeChannel: @@ -1668,6 +1777,8 @@ def test_composite_channel_one_actor(self, ray_start_regular): ref = compiled_dag.execute(3) assert ray.get(ref) == 108 + compiled_dag.teardown() + def test_composite_channel_two_actors(self, ray_start_regular): """ In this test, there are three 'inc' tasks on the two Ray actors, chained @@ -1700,6 +1811,8 @@ def test_composite_channel_two_actors(self, ray_start_regular): ref = compiled_dag.execute(3) assert ray.get(ref) == 829 + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_composite_channel_multi_output(self, ray_start_regular, single_fetch): """ @@ -1734,6 +1847,8 @@ def test_composite_channel_multi_output(self, ray_start_regular, single_fetch): else: assert ray.get(refs) == [10, 106] + compiled_dag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_intra_process_channel_with_multi_readers( self, ray_start_regular, single_fetch @@ -1780,6 +1895,8 @@ def test_intra_process_channel_with_multi_readers( else: assert ray.get(refs) == [3, 3] + compiled_dag.teardown() + class TestLeafNode: def test_leaf_node_one_actor(self, ray_start_regular): @@ -1803,6 +1920,7 @@ def test_leaf_node_one_actor(self, ray_start_regular): ref = compiled_dag.execute(10) assert ray.get(ref) == [20] + compiled_dag.teardown() def test_leaf_node_two_actors(self, ray_start_regular): """ @@ -1825,6 +1943,7 @@ def test_leaf_node_two_actors(self, ray_start_regular): ref = compiled_dag.execute(10) assert ray.get(ref) == [120, 220] + compiled_dag.teardown() def test_output_node(ray_start_regular): @@ -1878,6 +1997,7 @@ def echo(self, data): ref = compiled_dag.execute(x=1, y=2) assert ray.get(ref) == [1, 2, 1] + compiled_dag.teardown() @pytest.mark.parametrize("single_fetch", [True, False]) @@ -1959,6 +2079,7 @@ def read_input(self, input): "BWD rank-1, batch-1", "BWD rank-1, batch-2", ] + output_dag.teardown() def test_channel_read_after_close(ray_start_regular): @@ -2110,6 +2231,7 @@ async def main(): loop = get_or_create_event_loop() loop.run_until_complete(main()) + async_dag.teardown() def test_event_profiling(ray_start_regular, monkeypatch): @@ -2139,6 +2261,8 @@ def test_event_profiling(ray_start_regular, monkeypatch): assert event.method_name == "inc" assert event.operation in ["READ", "COMPUTE", "WRITE"] + adag.teardown() + @ray.remote class TestWorker: @@ -2341,43 +2465,6 @@ def call(self, value): assert torch.equal(ray.get(ref), torch.tensor([5, 5, 5, 5, 5])) -def test_async_shutdown(shutdown_only): - """Verify that when async API is used, shutdown doesn't hang - because of threads joining at exit. - """ - - script = """ -import asyncio -import ray -from ray.dag import InputNode, MultiOutputNode - -async def main(): - @ray.remote - class A: - def f(self, i): - return i - - a = A.remote() - b = A.remote() - - with InputNode() as inp: - x = a.f.bind(inp) - y = b.f.bind(inp) - dag = MultiOutputNode([x, y]) - - adag = dag.experimental_compile(enable_asyncio=True) - refs = await adag.execute_async(1) - outputs = [] - for ref in refs: - outputs.append(await ref) - print(outputs) - -asyncio.run(main()) - """ - - print(run_string_as_driver(script)) - - def test_multi_arg_exception(shutdown_only): a = Actor.remote(0) with InputNode() as i: @@ -2392,6 +2479,8 @@ def test_multi_arg_exception(shutdown_only): with pytest.raises(RuntimeError): ray.get(y) + compiled_dag.teardown() + def test_multi_arg_exception_async(shutdown_only): a = Actor.remote(0) @@ -2412,6 +2501,8 @@ async def main(): loop = get_or_create_event_loop() loop.run_until_complete(main()) + compiled_dag.teardown() + class TestVisualization: diff --git a/python/ray/dag/tests/experimental/test_detect_deadlock_dag.py b/python/ray/dag/tests/experimental/test_detect_deadlock_dag.py index 42ac5a2dc672..bedfb2701ba5 100644 --- a/python/ray/dag/tests/experimental/test_detect_deadlock_dag.py +++ b/python/ray/dag/tests/experimental/test_detect_deadlock_dag.py @@ -64,7 +64,8 @@ def test_invalid_graph_1_actor(ray_start_regular, tensor_transport): dag = a.no_op.bind(dag) if tensor_transport == TorchTensorType.AUTO: - dag.experimental_compile() + compiled_graph = dag.experimental_compile() + compiled_graph.teardown() elif tensor_transport == TorchTensorType.NCCL: with pytest.raises(ValueError, match=INVALID_GRAPH): dag.experimental_compile() @@ -139,7 +140,8 @@ def test_valid_graph_2_actors_1(ray_start_regular, tensor_transport): ] ) - dag.experimental_compile() + compiled_graph = dag.experimental_compile() + compiled_graph.teardown() @pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 2}], indirect=True) @@ -167,7 +169,8 @@ def test_valid_graph_2_actors_2(ray_start_regular): dag.with_type_hint(TorchTensorType(transport="nccl")) dag = b.no_op.bind(dag) - dag.experimental_compile() + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() @pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 2}], indirect=True) @@ -204,7 +207,8 @@ def test_invalid_graph_2_actors_1(ray_start_regular, tensor_transport): ) if tensor_transport == TorchTensorType.AUTO: - dag.experimental_compile() + compiled_graph = dag.experimental_compile() + compiled_graph.teardown() elif tensor_transport == TorchTensorType.NCCL: with pytest.raises(ValueError, match=INVALID_GRAPH): dag.experimental_compile() @@ -241,7 +245,8 @@ def test_invalid_graph_2_actors_2(ray_start_regular, tensor_transport): ) if tensor_transport == TorchTensorType.AUTO: - dag.experimental_compile() + compiled_graph = dag.experimental_compile() + compiled_graph.teardown() elif tensor_transport == TorchTensorType.NCCL: with pytest.raises(ValueError, match=INVALID_GRAPH): dag.experimental_compile() @@ -273,7 +278,8 @@ def test_valid_graph_3_actors_1(ray_start_regular, tensor_transport): ] ) - dag.experimental_compile() + compiled_graph = dag.experimental_compile() + compiled_graph.teardown() @pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 3}], indirect=True) @@ -298,7 +304,8 @@ def test_valid_graph_3_actors_2(ray_start_regular): branch2.with_type_hint(TorchTensorType(transport="nccl")) dag = a.no_op_two.bind(branch1, branch2) - dag.experimental_compile() + compiled_dag = dag.experimental_compile() + compiled_dag.teardown() if __name__ == "__main__": diff --git a/python/ray/dag/tests/experimental/test_execution_schedule_gpu.py b/python/ray/dag/tests/experimental/test_execution_schedule_gpu.py index bdb9dafbdd62..7bfd84502901 100644 --- a/python/ray/dag/tests/experimental/test_execution_schedule_gpu.py +++ b/python/ray/dag/tests/experimental/test_execution_schedule_gpu.py @@ -194,6 +194,8 @@ def test_simulate_pp_2workers_2batches_1f1b( for tensor in tensors: assert torch.equal(tensor, tensor_cpu) + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 4}], indirect=True) def test_simulate_pp_4workers_8batches_1f1b(ray_start_regular, monkeypatch): @@ -216,6 +218,7 @@ def test_simulate_pp_4workers_8batches_1f1b(ray_start_regular, monkeypatch): assert len(tensors) == num_microbatches for t in tensors: assert torch.equal(t, tensor_cpu) + compiled_dag.teardown() @pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 3}], indirect=True) @@ -283,6 +286,8 @@ def test_three_actors_with_nccl_1(ray_start_regular): for t in tensors: assert torch.equal(t, tensor_cpu) + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 3}], indirect=True) @pytest.mark.parametrize("single_fetch", [True, False]) @@ -364,6 +369,8 @@ def test_three_actors_with_nccl_2(ray_start_regular, single_fetch, monkeypatch): for tensor in tensors: assert torch.equal(tensor, tensor_cpu) + compiled_dag.teardown() + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): diff --git a/python/ray/dag/tests/experimental/test_multi_args_gpu.py b/python/ray/dag/tests/experimental/test_multi_args_gpu.py index d484132f4998..4d9dcf8e11a8 100644 --- a/python/ray/dag/tests/experimental/test_multi_args_gpu.py +++ b/python/ray/dag/tests/experimental/test_multi_args_gpu.py @@ -68,6 +68,8 @@ def backward(self, data): assert torch.equal(tensors[2], tensor_cpu_list[2]) assert torch.equal(tensors[3], tensor_cpu_list[2]) + compiled_dag.teardown() + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 822b0b17cfd9..b8cb128eab27 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -102,6 +102,8 @@ def _get_node_id(self) -> "ray.NodeID": for i in range(1, 10): assert ray.get(adag.execute(1)) == [i, i, i] + adag.teardown() + def test_bunch_readers_on_different_nodes(ray_start_cluster): cluster = ray_start_cluster @@ -141,6 +143,8 @@ def _get_node_id(self) -> "ray.NodeID": i for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) ] + adag.teardown() + @pytest.mark.parametrize("single_fetch", [True, False]) def test_pp(ray_start_cluster, single_fetch): @@ -186,6 +190,8 @@ def execute_model(self, val): # So that raylets' error messages are printed to the driver time.sleep(2) + compiled_dag.teardown() + def test_payload_large(ray_start_cluster, monkeypatch): GRPC_MAX_SIZE = 1024 * 1024 * 5 @@ -235,6 +241,10 @@ def get_node_id(self): result = ray.get(ref) assert result == val + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() + @pytest.mark.parametrize("num_actors", [1, 4]) @pytest.mark.parametrize("num_nodes", [1, 4]) @@ -285,6 +295,8 @@ def _get_node_id(self) -> "ray.NodeID": result = ray.get(ref) assert result == [val for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))] + compiled_dag.teardown() + def test_multi_node_dag_from_actor(ray_start_cluster): cluster = ray_start_cluster diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 3cd8bc8765d1..70e9a14296b9 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -1,9 +1,9 @@ # coding: utf-8 import logging -import time import os import socket import sys +import time from typing import List, Optional, Tuple import pytest @@ -19,7 +19,6 @@ TorchTensorAllocator, ) from ray.experimental.channel.nccl_group import _NcclGroup - from ray.experimental.channel.torch_tensor_type import TorchTensorType from ray.tests.conftest import * # noqa from ray.experimental.util.types import ReduceOp @@ -104,74 +103,6 @@ def forward(self, inp): return torch.randn(10, 10) -class TestNcclGroup(GPUCommunicator): - """ - A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`. - """ - - def __init__(self, world_size, comm_id, actor_handles): - self._world_size = world_size - self._comm_id = comm_id - self._actor_handles = actor_handles - self._inner = None - - def initialize(self, rank: int) -> None: - self._inner = _NcclGroup( - self._world_size, - self._comm_id, - rank, - self._actor_handles, - torch.cuda.current_stream().cuda_stream, - ) - - def get_rank(self, actor: ray.actor.ActorHandle) -> int: - # Implement this without forwarding to `_inner` to allow the method - # to be called before initialization. - actor_ids = [a._ray_actor_id for a in self._actor_handles] - try: - rank = actor_ids.index(actor._ray_actor_id) - except ValueError: - raise ValueError("Actor is not in the NCCL group.") - return rank - - def get_world_size(self) -> int: - # Implement this without forwarding to `_inner` to allow the method - # to be called before initialization. - return self._world_size - - def get_self_rank(self) -> Optional[int]: - if self._inner is None: - return None - return self._inner.get_self_rank() - - def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: - return self._actor_handles - - def send(self, value: "torch.Tensor", peer_rank: int) -> None: - return self._inner.send(value, peer_rank) - - def recv( - self, - shape: Tuple[int], - dtype: "torch.dtype", - peer_rank: int, - allocator: Optional[TorchTensorAllocator] = None, - ) -> "torch.Tensor": - return self._inner.recv(shape, dtype, peer_rank, allocator=allocator) - - def allreduce( - self, - send_buf: "torch.Tensor", - recv_buf: "torch.Tensor", - op: ReduceOp = ReduceOp.SUM, - ) -> None: - self._inner.allreduce(send_buf, recv_buf, op) - recv_buf += 1 - - def destroy(self) -> None: - return self._inner.destroy() - - @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_p2p(ray_start_regular): if USE_GPU: @@ -234,6 +165,7 @@ def test_torch_tensor_p2p(ray_start_regular): ref = compiled_dag.execute((shape, dtype, 1)) ray.get(ref) + compiled_dag.teardown() @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) @@ -271,6 +203,8 @@ def test_torch_tensor_as_dag_input(ray_start_regular): result = ray.get(ref) assert result == (i, (20,), dtype) + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl(ray_start_regular): @@ -332,6 +266,7 @@ def test_torch_tensor_nccl(ray_start_regular): ref = compiled_dag.execute(i) result = ray.get(ref) assert result == (i, shape, dtype) + compiled_dag.teardown() # TODO(swang): Check that actors are still alive. Currently this fails due # to a ref counting assertion error. @@ -370,6 +305,8 @@ def test_torch_tensor_nccl_dynamic(ray_start_regular): result = ray.get(ref) assert result == (i, shape, dtype) + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_custom_comm(ray_start_regular): @@ -387,6 +324,77 @@ def test_torch_tensor_custom_comm(ray_start_regular): from cupy.cuda import nccl + class TestNcclGroup(GPUCommunicator): + """ + A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`. + """ + + def __init__(self, world_size, comm_id, actor_handles): + self._world_size = world_size + self._comm_id = comm_id + self._actor_handles = actor_handles + self._inner = None + + def initialize(self, rank: int) -> None: + print(f"initializing rank {rank}") + try: + self._inner = _NcclGroup( + self._world_size, + self._comm_id, + rank, + self._actor_handles, + torch.cuda.current_stream().cuda_stream, + ) + except Exception as e: + print(f"Got {e}") + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + return self._world_size + + def get_self_rank(self) -> Optional[int]: + if self._inner is None: + return None + return self._inner.get_self_rank() + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + return self._inner.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + return self._inner.recv(shape, dtype, peer_rank, allocator=allocator) + + def allreduce( + self, + send_buf: "torch.Tensor", + recv_buf: "torch.Tensor", + op: ReduceOp = ReduceOp.SUM, + ) -> None: + self._inner.allreduce(send_buf, recv_buf, op) + recv_buf += 1 + + def destroy(self) -> None: + return self._inner.destroy() + comm_id = nccl.get_unique_id() nccl_group = TestNcclGroup(2, comm_id, [sender, receiver]) with InputNode() as inp: @@ -404,6 +412,8 @@ def test_torch_tensor_custom_comm(ray_start_regular): result = ray.get(ref) assert result == (i, shape, dtype) + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_custom_comm_invalid(ray_start_regular): @@ -635,6 +645,8 @@ def destroy(self) -> None: result = ray.get(ref) assert result == (i, shape, dtype) + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_wrong_shape(ray_start_regular): @@ -681,6 +693,8 @@ def test_torch_tensor_nccl_wrong_shape(ray_start_regular): with pytest.raises(RayChannelError): ref = compiled_dag.execute(shape=(20,), dtype=dtype, value=1) + compiled_dag.teardown() + # TODO(swang): This currently requires time.sleep to avoid some issue with # following tests. time.sleep(3) @@ -724,6 +738,8 @@ def test_torch_tensor_nccl_nested(ray_start_regular): expected_result = {0: (0, shape, dtype)} assert result == expected_result + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_nested_dynamic(ray_start_regular): @@ -762,6 +778,8 @@ def test_torch_tensor_nccl_nested_dynamic(ray_start_regular): expected_result = {j: (j, shape, dtype) for j in range(i)} assert result == expected_result + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_direct_return_error(ray_start_regular): @@ -808,6 +826,12 @@ def test_torch_tensor_nccl_direct_return_error(ray_start_regular): with pytest.raises(RayChannelError): ref = compiled_dag.execute(shape=shape, dtype=dtype, value=1, send_tensor=True) + compiled_dag.teardown() + + # TODO(swang): This currently requires time.sleep to avoid some issue with + # following tests. + time.sleep(3) + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_exceptions(ray_start_regular): @@ -872,6 +896,8 @@ def test_torch_tensor_exceptions(ray_start_regular): result = ray.get(ref) assert result == (i, shape, dtype) + compiled_dag.teardown() + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_all_reduce(ray_start_regular): @@ -1036,6 +1062,77 @@ def test_torch_tensor_nccl_all_reduce_custom_comm(ray_start_regular): num_workers = 2 workers = [actor_cls.remote() for _ in range(num_workers)] + class TestNcclGroup(GPUCommunicator): + """ + A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`. + """ + + def __init__(self, world_size, comm_id, actor_handles): + self._world_size = world_size + self._comm_id = comm_id + self._actor_handles = actor_handles + self._inner = None + + def initialize(self, rank: int) -> None: + print(f"initializing rank {rank}") + try: + self._inner = _NcclGroup( + self._world_size, + self._comm_id, + rank, + self._actor_handles, + torch.cuda.current_stream().cuda_stream, + ) + except Exception as e: + print(f"Got {e}") + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + return self._world_size + + def get_self_rank(self) -> Optional[int]: + if self._inner is None: + return None + return self._inner.get_self_rank() + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + return self._inner.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + return self._inner.recv(shape, dtype, peer_rank, allocator=allocator) + + def allreduce( + self, + send_buf: "torch.Tensor", + recv_buf: "torch.Tensor", + op: ReduceOp = ReduceOp.SUM, + ) -> None: + self._inner.allreduce(send_buf, recv_buf, op) + recv_buf += 1 + + def destroy(self) -> None: + return self._inner.destroy() + from cupy.cuda import nccl comm_id = nccl.get_unique_id() diff --git a/python/ray/experimental/channel/common.py b/python/ray/experimental/channel/common.py index 0abdd0deb670..84d5d5a6c111 100644 --- a/python/ray/experimental/channel/common.py +++ b/python/ray/experimental/channel/common.py @@ -1,14 +1,12 @@ import asyncio import concurrent import copy -import sys import threading import time from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Tuple, Union import ray -import ray.exceptions from ray.experimental.channel.gpu_communicator import GPUCommunicator from ray.experimental.channel.serialization_context import _SerializationContext from ray.util.annotations import DeveloperAPI, PublicAPI @@ -21,29 +19,6 @@ import torch -def retry_and_check_interpreter_exit(f) -> bool: - """This function is only useful when f contains channel read/write. - - Keep retrying channel read/write inside `f` and check if interpreter exits. - It is important in case the read/write happens in a separate thread pool. - See https://github.com/ray-project/ray/pull/47702 - """ - exiting = False - while True: - try: - # results.append(c.read(timeout=1)) - f() - break - except ray.exceptions.RayChannelTimeoutError: - if sys.is_finalizing(): - # Interpreter exits. We should ignore the error and - # stop reading so that the thread can join. - exiting = True - break - - return exiting - - # Holds the input arguments for an accelerated DAG node. @PublicAPI(stability="alpha") class RayDAGArgs(NamedTuple): @@ -381,15 +356,7 @@ def start(self): self._background_task = asyncio.ensure_future(self.run()) def _run(self): - results = [] - for c in self._input_channels: - exiting = retry_and_check_interpreter_exit( - lambda: results.append(c.read(timeout=1)) - ) - if exiting: - break - - return results + return [c.read() for c in self._input_channels] async def run(self): loop = asyncio.get_running_loop() @@ -411,7 +378,6 @@ async def run(self): def close(self): super().close() self._background_task_executor.shutdown(cancel_futures=True) - self._background_task.cancel() @DeveloperAPI @@ -574,11 +540,7 @@ def _run(self, res): for i, channel in enumerate(self._output_channels): idx = self._output_idxs[i] res_i = _adapt(res, idx, self._is_input) - exiting = retry_and_check_interpreter_exit( - lambda: channel.write(res_i, timeout=1) - ) - if exiting: - break + channel.write(res_i) async def run(self): loop = asyncio.get_event_loop()