diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 4797aaf139a4..fd634f890053 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -45,7 +45,7 @@ def do_allocate_channel( Args: readers: The actor handles of the readers. - typ: The output type hint for the channel. + buffer_size_bytes: The maximum size of messages in the channel. Returns: The allocated channel. @@ -131,7 +131,7 @@ def _exec_task(self, task: "ExecutableTask", idx: int) -> bool: True if we are done executing all tasks of this actor, False otherwise. """ # TODO: for cases where output is passed as input to a task on - # the same actor, introduce a "IntraProcessChannel" to avoid the overhead + # the same actor, introduce a "LocalChannel" to avoid the overhead # of serialization/deserialization and synchronization. method = getattr(self, task.method_name) input_reader = self._input_readers[idx] @@ -683,12 +683,12 @@ def _get_or_compile( # `readers` is the nodes that are ordered after the current one (`task`) # in the DAG. readers = [self.idx_to_task[idx] for idx in task.downstream_node_idxs] + assert len(readers) == 1 def _get_node_id(self): return ray.get_runtime_context().get_node_id() if isinstance(readers[0].dag_node, MultiOutputNode): - assert len(readers) == 1 # This node is a multi-output node, which means that it will only be # read by the driver, not an actor. Thus, we handle this case by # setting `reader_handles` to `[self._driver_actor]`. diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index a289a65a9766..1923deaafb56 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -646,119 +646,6 @@ async def main(): compiled_dag.teardown() -class TestCompositeChannel: - def test_composite_channel_one_actor(self, ray_start_regular_shared): - """ - In this test, there are three 'inc' tasks on the same Ray actor, chained - together. Therefore, the DAG will look like this: - - Driver -> a.inc -> a.inc -> a.inc -> Driver - - All communication between the driver and the actor will be done through remote - channels, i.e., shared memory channels. All communication between the actor - tasks will be conducted through local channels, i.e., IntraProcessChannel in - this case. - - To elaborate, all output channels of the actor DAG nodes will be - CompositeChannel, and the first two will have a local channel, while the last - one will have a remote channel. - """ - a = Actor.remote(0) - with InputNode() as inp: - dag = a.inc.bind(inp) - dag = a.inc.bind(dag) - dag = a.inc.bind(dag) - - compiled_dag = dag.experimental_compile() - output_channel = compiled_dag.execute(1) - result = output_channel.begin_read() - assert result == 4 - output_channel.end_read() - - output_channel = compiled_dag.execute(2) - result = output_channel.begin_read() - assert result == 24 - output_channel.end_read() - - output_channel = compiled_dag.execute(3) - result = output_channel.begin_read() - assert result == 108 - output_channel.end_read() - - compiled_dag.teardown() - - def test_composite_channel_two_actors(self, ray_start_regular_shared): - """ - In this test, there are three 'inc' tasks on the two Ray actors, chained - together. Therefore, the DAG will look like this: - - Driver -> a.inc -> b.inc -> a.inc -> Driver - - All communication between the driver and actors will be done through remote - channels. Also, all communication between the actor tasks will be conducted - through remote channels, i.e., shared memory channel in this case because no - consecutive tasks are on the same actor. - """ - a = Actor.remote(0) - b = Actor.remote(100) - with InputNode() as inp: - dag = a.inc.bind(inp) - dag = b.inc.bind(dag) - dag = a.inc.bind(dag) - - # a: 0+1 -> b: 100+1 -> a: 1+101 - compiled_dag = dag.experimental_compile() - output_channel = compiled_dag.execute(1) - result = output_channel.begin_read() - assert result == 102 - output_channel.end_read() - - # a: 102+2 -> b: 101+104 -> a: 104+205 - output_channel = compiled_dag.execute(2) - result = output_channel.begin_read() - assert result == 309 - output_channel.end_read() - - # a: 309+3 -> b: 205+312 -> a: 312+517 - output_channel = compiled_dag.execute(3) - result = output_channel.begin_read() - assert result == 829 - output_channel.end_read() - - compiled_dag.teardown() - - def test_composite_channel_multi_output(self, ray_start_regular_shared): - """ - Driver -> a.inc -> a.inc ---> Driver - | | - -> b.inc - - - All communication in this DAG will be done through CompositeChannel. - Under the hood, the communication between two `a.inc` tasks will - be done through a local channel, i.e., IntraProcessChannel in this - case, while the communication between `a.inc` and `b.inc` will be - done through a shared memory channel. - """ - a = Actor.remote(0) - b = Actor.remote(100) - with InputNode() as inp: - dag = a.inc.bind(inp) - dag = MultiOutputNode([a.inc.bind(dag), b.inc.bind(dag)]) - - compiled_dag = dag.experimental_compile() - output_channel = compiled_dag.execute(1) - result = output_channel.begin_read() - assert result == [2, 101] - output_channel.end_read() - - output_channel = compiled_dag.execute(3) - result = output_channel.begin_read() - assert result == [10, 106] - output_channel.end_read() - - compiled_dag.teardown() - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/experimental/channel/__init__.py b/python/ray/experimental/channel/__init__.py index 76d75c938026..6347fa79275e 100644 --- a/python/ray/experimental/channel/__init__.py +++ b/python/ray/experimental/channel/__init__.py @@ -9,8 +9,7 @@ SynchronousWriter, WriterInterface, ) -from ray.experimental.channel.intra_process_channel import IntraProcessChannel -from ray.experimental.channel.shared_memory_channel import Channel, CompositeChannel +from ray.experimental.channel.shared_memory_channel import Channel from ray.experimental.channel.torch_tensor_nccl_channel import TorchTensorNcclChannel __all__ = [ @@ -23,6 +22,4 @@ "WriterInterface", "ChannelContext", "TorchTensorNcclChannel", - "IntraProcessChannel", - "CompositeChannel", ] diff --git a/python/ray/experimental/channel/common.py b/python/ray/experimental/channel/common.py index ac5039c9cb93..ecf70287cee6 100644 --- a/python/ray/experimental/channel/common.py +++ b/python/ray/experimental/channel/common.py @@ -175,15 +175,9 @@ def __init__( pass def ensure_registered_as_writer(self): - """ - Check whether the process is a valid writer. This method must be idempotent. - """ raise NotImplementedError def ensure_registered_as_reader(self): - """ - Check whether the process is a valid reader. This method must be idempotent. - """ raise NotImplementedError def write(self, value: Any) -> None: diff --git a/python/ray/experimental/channel/intra_process_channel.py b/python/ray/experimental/channel/intra_process_channel.py deleted file mode 100644 index cf361d336b9f..000000000000 --- a/python/ray/experimental/channel/intra_process_channel.py +++ /dev/null @@ -1,65 +0,0 @@ -import uuid -from typing import Any, Optional - -import ray -from ray.experimental.channel import ChannelContext -from ray.experimental.channel.common import ChannelInterface -from ray.util.annotations import PublicAPI - - -@PublicAPI(stability="alpha") -class IntraProcessChannel(ChannelInterface): - """ - IntraProcessChannel is a channel for communication between two tasks in the same - worker process. It writes data directly to the worker's _SerializationContext - and reads data from the _SerializationContext to avoid the serialization - overhead and the need for reading/writing from shared memory. - - Args: - actor_handle: The actor handle of the worker process. - """ - - def __init__( - self, - actor_handle: ray.actor.ActorHandle, - _channel_id: Optional[str] = None, - ): - # TODO (kevin85421): Currently, if we don't pass `actor_handle` to - # `IntraProcessChannel`, the actor will die due to the reference count of - # `actor_handle` is 0. We should fix this issue in the future. - self._actor_handle = actor_handle - # Generate a unique ID for the channel. The writer and reader will use - # this ID to store and retrieve data from the _SerializationContext. - self._channel_id = _channel_id - if self._channel_id is None: - self._channel_id = str(uuid.uuid4()) - - def ensure_registered_as_writer(self) -> None: - pass - - def ensure_registered_as_reader(self) -> None: - pass - - def __reduce__(self): - return IntraProcessChannel, ( - self._actor_handle, - self._channel_id, - ) - - def write(self, value: Any): - # Because both the reader and writer are in the same worker process, - # we can directly store the data in the context instead of storing - # it in the channel object. This removes the serialization overhead of `value`. - ctx = ChannelContext.get_current().serialization_context - ctx.set_data(self._channel_id, value) - - def begin_read(self) -> Any: - ctx = ChannelContext.get_current().serialization_context - return ctx.get_data(self._channel_id) - - def end_read(self): - pass - - def close(self) -> None: - ctx = ChannelContext.get_current().serialization_context - ctx.reset_data(self._channel_id) diff --git a/python/ray/experimental/channel/serialization_context.py b/python/ray/experimental/channel/serialization_context.py index ef30320c3893..0d1de546cddc 100644 --- a/python/ray/experimental/channel/serialization_context.py +++ b/python/ray/experimental/channel/serialization_context.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, List, Union +from typing import TYPE_CHECKING, List, Union if TYPE_CHECKING: import numpy as np @@ -9,31 +9,10 @@ class _SerializationContext: def __init__(self): self.use_external_transport: bool = False self.tensors: List["torch.Tensor"] = [] - # Buffer for transferring data between tasks in the same worker process. - # The key is the channel ID, and the value is the data. We don't use a - # lock when reading/writing the buffer because a DAG node actor will only - # execute one task at a time in `do_exec_tasks`. It will not execute multiple - # Ray tasks on a single actor simultaneously. - self.intra_process_channel_buffers: Dict[str, Any] = {} def set_use_external_transport(self, use_external_transport: bool) -> None: self.use_external_transport = use_external_transport - def set_data(self, channel_id: str, value: Any) -> None: - assert ( - channel_id not in self.intra_process_channel_buffers - ), f"Channel {channel_id} already exists in the buffer." - self.intra_process_channel_buffers[channel_id] = value - - def get_data(self, channel_id: str) -> Any: - assert ( - channel_id in self.intra_process_channel_buffers - ), f"Channel {channel_id} does not exist in the buffer." - return self.intra_process_channel_buffers.pop(channel_id) - - def reset_data(self, channel_id: str) -> None: - self.intra_process_channel_buffers.pop(channel_id, None) - def reset_tensors(self, tensors: List["torch.Tensor"]) -> List["torch.Tensor"]: prev_tensors = self.tensors self.tensors = tensors diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index e120b7ce5209..b5ffdb5e03c3 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -1,11 +1,10 @@ import io import logging -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, List, Optional, Union import ray from ray._raylet import SerializedObject from ray.experimental.channel.common import ChannelInterface, ChannelOutputType -from ray.experimental.channel.intra_process_channel import IntraProcessChannel from ray.experimental.channel.torch_tensor_type import TorchTensorType from ray.util.annotations import PublicAPI @@ -84,17 +83,6 @@ def _create_channel_ref( return object_ref -def _get_self_actor() -> Optional["ray.actor.ActorHandle"]: - """ - Get the current actor handle in this worker. - If this is called in a driver process, it will return None. - """ - try: - return ray.get_runtime_context().current_actor - except RuntimeError: - return None - - class _ResizeChannel: """ When a channel must be resized, the channel backing store must be resized on both @@ -155,7 +143,7 @@ def create_channel( cpu_data_typ=cpu_data_typ, ) - return CompositeChannel(writer, readers) + return Channel(writer, readers) def set_nccl_group_id(self, group_id: str) -> None: assert self.requires_nccl() @@ -235,7 +223,12 @@ def __init__( # actor, so we shouldn't need to include `writer` in the # constructor args. Either support Channels being constructed by # someone other than the writer or remove it from the args. - self_actor = _get_self_actor() + self_actor = None + try: + self_actor = ray.get_runtime_context().current_actor + except RuntimeError: + # This is the driver so there is no current actor handle. + pass assert writer == self_actor self._writer_node_id = ( @@ -447,108 +440,3 @@ def close(self) -> None: if self.is_local_node(self._reader_node_id): self.ensure_registered_as_reader() self._worker.core_worker.experimental_channel_set_error(self._reader_ref) - - -@PublicAPI(stability="alpha") -class CompositeChannel(ChannelInterface): - """ - Can be used to send data to different readers via different channels. - For example, if the reader is in the same worker process as the writer, - the data can be sent via IntraProcessChannel. If the reader is in a different - worker process, the data can be sent via shared memory channel. - - Args: - writer: The actor that may write to the channel. None signifies the driver. - readers: The actors that may read from the channel. None signifies - the driver. - """ - - def __init__( - self, - writer: Optional[ray.actor.ActorHandle], - readers: List[Optional[ray.actor.ActorHandle]], - _channel_dict: Optional[Dict[ray.ActorID, ChannelInterface]] = None, - _channels: Optional[Set[ChannelInterface]] = None, - ): - self._writer = writer - self._readers = readers - self._writer_registered = False - self._reader_registered = False - # A dictionary that maps the actor ID to the channel object. - self._channel_dict = _channel_dict or {} - # The set of channels is a deduplicated version of the _channel_dict values. - self._channels = _channels or set() - if self._channels: - # This CompositeChannel object is created by deserialization. - # We don't need to create channels again. - return - - remote_readers = [] - for reader in self._readers: - if self._writer != reader: - remote_readers.append(reader) - # There are some local readers which are the same worker process as the writer. - # Create a local channel for the writer and the local readers. - num_local_readers = len(self._readers) - len(remote_readers) - if num_local_readers > 0: - assert ( - num_local_readers == 1 - ), "Only support one reader on the same actor for now." - local_channel = IntraProcessChannel(self._writer) - self._channels.add(local_channel) - actor_id = self._get_actor_id(self._writer) - self._channel_dict[actor_id] = local_channel - # There are some remote readers which are not the same Ray actor as the writer. - # Create a shared memory channel for the writer and the remote readers. - if len(remote_readers) != 0: - remote_channel = Channel(self._writer, remote_readers) - self._channels.add(remote_channel) - for reader in remote_readers: - actor_id = self._get_actor_id(reader) - self._channel_dict[actor_id] = remote_channel - - def _get_actor_id(self, reader: Optional[ray.actor.ActorHandle]) -> str: - if reader is None: - return None - return reader._actor_id.hex() - - def ensure_registered_as_writer(self) -> None: - if self._writer_registered: - return - for channel in self._channels: - channel.ensure_registered_as_writer() - self._writer_registered = True - - def ensure_registered_as_reader(self) -> None: - if self._reader_registered: - return - for channel in self._channels: - channel.ensure_registered_as_reader() - self._reader_registered = True - - def __reduce__(self): - return CompositeChannel, ( - self._writer, - self._readers, - self._channel_dict, - self._channels, - ) - - def write(self, value: Any): - self.ensure_registered_as_writer() - for channel in self._channels: - channel.write(value) - - def begin_read(self) -> Any: - self.ensure_registered_as_reader() - actor_id = ray.get_runtime_context().get_actor_id() - return self._channel_dict[actor_id].begin_read() - - def end_read(self): - self.ensure_registered_as_reader() - actor_id = ray.get_runtime_context().get_actor_id() - return self._channel_dict[actor_id].end_read() - - def close(self) -> None: - for channel in self._channels: - channel.close() diff --git a/python/ray/tests/test_channel.py b/python/ray/tests/test_channel.py index 785369783cad..7836213dfb2b 100644 --- a/python/ray/tests/test_channel.py +++ b/python/ray/tests/test_channel.py @@ -514,7 +514,7 @@ def read(self, num_reads): # All readers have received the channel. ray.get([reader.pass_channel.remote(channel) for reader in readers]) - for _ in range(num_iterations): + for j in range(num_iterations): work = [reader.read.remote(num_writes) for reader in readers] start = time.perf_counter() for i in range(num_writes): @@ -591,195 +591,6 @@ def close(self): ray.get(reads) -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "darwin", - reason="Requires Linux or Mac.", -) -def test_intra_process_channel(ray_start_cluster): - """ - (1) Test whether an actor can read/write from an IntraProcessChannel. - (2) Test whether the _SerializationContext cleans up the - data after all readers have read it. - (3) Test whether the actor can write again after reading 1 time. - """ - # This node is for both the driver and the Reader actors. - cluster = ray_start_cluster - cluster.add_node(num_cpus=1) - ray.init(address=cluster.address) - - @ray.remote(num_cpus=1) - class Actor: - def __init__(self): - pass - - def pass_channel(self, channel): - self._chan = channel - - def read(self): - return self._chan.begin_read() - - def write(self, value): - self._chan.write(value) - - def get_ctx_buffer_size(self): - ctx = ray_channel.ChannelContext.get_current().serialization_context - return len(ctx.intra_process_channel_buffers) - - actor = Actor.remote() - channel = ray_channel.IntraProcessChannel(actor) - ray.get(actor.pass_channel.remote(channel)) - - ray.get(actor.write.remote("hello")) - assert ray.get(actor.read.remote()) == "hello" - - # The _SerializationContext should clean up the data after a read. - assert ray.get(actor.get_ctx_buffer_size.remote()) == 0 - - # Write again after reading num_readers times. - ray.get(actor.write.remote("world")) - assert ray.get(actor.read.remote()) == "world" - - # The _SerializationContext should clean up the data after a read. - assert ray.get(actor.get_ctx_buffer_size.remote()) == 0 - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "darwin", - reason="Requires Linux or Mac.", -) -def test_composite_channel_single_reader(ray_start_cluster): - """ - (1) The driver can write data to CompositeChannel and an actor can read it. - (2) An actor can write data to CompositeChannel and the actor itself can read it. - (3) An actor can write data to CompositeChannel and another actor can read it. - (4) An actor can write data to CompositeChannel and the driver can read it. - """ - # This node is for both the driver and the Reader actors. - cluster = ray_start_cluster - cluster.add_node(num_cpus=2) - ray.init(address=cluster.address) - - @ray.remote(num_cpus=1) - class Actor: - def __init__(self): - pass - - def pass_channel(self, channel): - self._chan = channel - - def create_multi_channel(self, writer, readers): - self._chan = ray_channel.CompositeChannel(writer, readers) - return self._chan - - def read(self): - return self._chan.begin_read() - - def write(self, value): - self._chan.write(value) - - actor1 = Actor.remote() - actor2 = Actor.remote() - - # Create a channel to communicate between driver process and actor1. - driver_to_actor1_channel = ray_channel.CompositeChannel(None, [actor1]) - ray.get(actor1.pass_channel.remote(driver_to_actor1_channel)) - driver_to_actor1_channel.write("hello") - assert ray.get(actor1.read.remote()) == "hello" - - # Create a channel to communicate between two tasks in actor1. - ray.get(actor1.create_multi_channel.remote(actor1, [actor1])) - ray.get(actor1.write.remote("world")) - assert ray.get(actor1.read.remote()) == "world" - - # Create a channel to communicate between actor1 and actor2. - actor1_to_actor2_channel = ray.get( - actor1.create_multi_channel.remote(actor1, [actor2]) - ) - ray.get(actor2.pass_channel.remote(actor1_to_actor2_channel)) - ray.get(actor1.write.remote("hello world")) - assert ray.get(actor2.read.remote()) == "hello world" - - # Create a channel to communicate between actor2 and driver process. - actor2_to_driver_channel = ray.get( - actor2.create_multi_channel.remote(actor2, [None]) - ) - ray.get(actor2.write.remote("world hello")) - assert actor2_to_driver_channel.begin_read() == "world hello" - - -@pytest.mark.skipif( - sys.platform != "linux" and sys.platform != "darwin", - reason="Requires Linux or Mac.", -) -def test_composite_channel_multiple_readers(ray_start_cluster): - """ - Test the behavior of CompositeChannel when there are multiple readers. - - (1) The driver can write data to CompositeChannel and two actors can read it. - (2) An actor can write data to CompositeChannel and another actor, as well as - itself, can read it. - (3) An actor writes data to CompositeChannel and two Ray tasks on the same - actor read it. This is not supported and should raise an exception. - """ - # This node is for both the driver and the Reader actors. - cluster = ray_start_cluster - cluster.add_node(num_cpus=2) - ray.init(address=cluster.address) - - @ray.remote(num_cpus=1) - class Actor: - def __init__(self): - pass - - def pass_channel(self, channel): - self._chan = channel - - def create_multi_channel(self, writer, readers): - self._chan = ray_channel.CompositeChannel(writer, readers) - return self._chan - - def read(self): - return self._chan.begin_read() - - def end_read(self): - return self._chan.end_read() - - def write(self, value): - self._chan.write(value) - - actor1 = Actor.remote() - actor2 = Actor.remote() - - # The driver writes data to CompositeChannel and actor1 and actor2 read it. - driver_output_channel = ray_channel.CompositeChannel(None, [actor1, actor2]) - ray.get(actor1.pass_channel.remote(driver_output_channel)) - ray.get(actor2.pass_channel.remote(driver_output_channel)) - driver_output_channel.write("hello") - assert ray.get([actor1.read.remote(), actor2.read.remote()]) == ["hello"] * 2 - - # actor1 writes data to CompositeChannel and actor1 and actor2 read it. - actor1_output_channel = ray.get( - actor1.create_multi_channel.remote(actor1, [actor1, actor2]) - ) - ray.get(actor2.pass_channel.remote(actor1_output_channel)) - ray.get(actor1.write.remote("world")) - assert ray.get([actor1.read.remote(), actor2.read.remote()]) == ["world"] * 2 - - with pytest.raises(ray.exceptions.RayTaskError): - # actor1 writes data to CompositeChannel and two Ray tasks on actor1 read it. - # This is not supported and should raise an exception. - actor1_output_channel = ray.get( - actor1.create_multi_channel.remote(actor1, [actor1, actor1]) - ) - - """ - TODO (kevin85421): Add tests for the following cases: - (1) actor1 writes data to CompositeChannel and two Ray tasks on actor2 read it. - (2) actor1 writes data to CompositeChannel and actor2 and the driver reads it. - Currently, (1) is not supported, and (2) is blocked by the reference count issue. - """ - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))