Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[core][experimental] Avoid serialization for data passed between two tasks on the same actor" #45925

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def do_allocate_channel(

Args:
readers: The actor handles of the readers.
typ: The output type hint for the channel.
buffer_size_bytes: The maximum size of messages in the channel.

Returns:
The allocated channel.
Expand Down Expand Up @@ -131,7 +131,7 @@ def _exec_task(self, task: "ExecutableTask", idx: int) -> bool:
True if we are done executing all tasks of this actor, False otherwise.
"""
# TODO: for cases where output is passed as input to a task on
# the same actor, introduce a "IntraProcessChannel" to avoid the overhead
# the same actor, introduce a "LocalChannel" to avoid the overhead
# of serialization/deserialization and synchronization.
method = getattr(self, task.method_name)
input_reader = self._input_readers[idx]
Expand Down Expand Up @@ -683,12 +683,12 @@ def _get_or_compile(
# `readers` is the nodes that are ordered after the current one (`task`)
# in the DAG.
readers = [self.idx_to_task[idx] for idx in task.downstream_node_idxs]
assert len(readers) == 1

def _get_node_id(self):
return ray.get_runtime_context().get_node_id()

if isinstance(readers[0].dag_node, MultiOutputNode):
assert len(readers) == 1
# This node is a multi-output node, which means that it will only be
# read by the driver, not an actor. Thus, we handle this case by
# setting `reader_handles` to `[self._driver_actor]`.
Expand Down
113 changes: 0 additions & 113 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,119 +646,6 @@ async def main():
compiled_dag.teardown()


class TestCompositeChannel:
def test_composite_channel_one_actor(self, ray_start_regular_shared):
"""
In this test, there are three 'inc' tasks on the same Ray actor, chained
together. Therefore, the DAG will look like this:

Driver -> a.inc -> a.inc -> a.inc -> Driver

All communication between the driver and the actor will be done through remote
channels, i.e., shared memory channels. All communication between the actor
tasks will be conducted through local channels, i.e., IntraProcessChannel in
this case.

To elaborate, all output channels of the actor DAG nodes will be
CompositeChannel, and the first two will have a local channel, while the last
one will have a remote channel.
"""
a = Actor.remote(0)
with InputNode() as inp:
dag = a.inc.bind(inp)
dag = a.inc.bind(dag)
dag = a.inc.bind(dag)

compiled_dag = dag.experimental_compile()
output_channel = compiled_dag.execute(1)
result = output_channel.begin_read()
assert result == 4
output_channel.end_read()

output_channel = compiled_dag.execute(2)
result = output_channel.begin_read()
assert result == 24
output_channel.end_read()

output_channel = compiled_dag.execute(3)
result = output_channel.begin_read()
assert result == 108
output_channel.end_read()

compiled_dag.teardown()

def test_composite_channel_two_actors(self, ray_start_regular_shared):
"""
In this test, there are three 'inc' tasks on the two Ray actors, chained
together. Therefore, the DAG will look like this:

Driver -> a.inc -> b.inc -> a.inc -> Driver

All communication between the driver and actors will be done through remote
channels. Also, all communication between the actor tasks will be conducted
through remote channels, i.e., shared memory channel in this case because no
consecutive tasks are on the same actor.
"""
a = Actor.remote(0)
b = Actor.remote(100)
with InputNode() as inp:
dag = a.inc.bind(inp)
dag = b.inc.bind(dag)
dag = a.inc.bind(dag)

# a: 0+1 -> b: 100+1 -> a: 1+101
compiled_dag = dag.experimental_compile()
output_channel = compiled_dag.execute(1)
result = output_channel.begin_read()
assert result == 102
output_channel.end_read()

# a: 102+2 -> b: 101+104 -> a: 104+205
output_channel = compiled_dag.execute(2)
result = output_channel.begin_read()
assert result == 309
output_channel.end_read()

# a: 309+3 -> b: 205+312 -> a: 312+517
output_channel = compiled_dag.execute(3)
result = output_channel.begin_read()
assert result == 829
output_channel.end_read()

compiled_dag.teardown()

def test_composite_channel_multi_output(self, ray_start_regular_shared):
"""
Driver -> a.inc -> a.inc ---> Driver
| |
-> b.inc -

All communication in this DAG will be done through CompositeChannel.
Under the hood, the communication between two `a.inc` tasks will
be done through a local channel, i.e., IntraProcessChannel in this
case, while the communication between `a.inc` and `b.inc` will be
done through a shared memory channel.
"""
a = Actor.remote(0)
b = Actor.remote(100)
with InputNode() as inp:
dag = a.inc.bind(inp)
dag = MultiOutputNode([a.inc.bind(dag), b.inc.bind(dag)])

compiled_dag = dag.experimental_compile()
output_channel = compiled_dag.execute(1)
result = output_channel.begin_read()
assert result == [2, 101]
output_channel.end_read()

output_channel = compiled_dag.execute(3)
result = output_channel.begin_read()
assert result == [10, 106]
output_channel.end_read()

compiled_dag.teardown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
5 changes: 1 addition & 4 deletions python/ray/experimental/channel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
SynchronousWriter,
WriterInterface,
)
from ray.experimental.channel.intra_process_channel import IntraProcessChannel
from ray.experimental.channel.shared_memory_channel import Channel, CompositeChannel
from ray.experimental.channel.shared_memory_channel import Channel
from ray.experimental.channel.torch_tensor_nccl_channel import TorchTensorNcclChannel

__all__ = [
Expand All @@ -23,6 +22,4 @@
"WriterInterface",
"ChannelContext",
"TorchTensorNcclChannel",
"IntraProcessChannel",
"CompositeChannel",
]
6 changes: 0 additions & 6 deletions python/ray/experimental/channel/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,9 @@ def __init__(
pass

def ensure_registered_as_writer(self):
"""
Check whether the process is a valid writer. This method must be idempotent.
"""
raise NotImplementedError

def ensure_registered_as_reader(self):
"""
Check whether the process is a valid reader. This method must be idempotent.
"""
raise NotImplementedError

def write(self, value: Any) -> None:
Expand Down
65 changes: 0 additions & 65 deletions python/ray/experimental/channel/intra_process_channel.py

This file was deleted.

23 changes: 1 addition & 22 deletions python/ray/experimental/channel/serialization_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, List, Union
from typing import TYPE_CHECKING, List, Union

if TYPE_CHECKING:
import numpy as np
Expand All @@ -9,31 +9,10 @@ class _SerializationContext:
def __init__(self):
self.use_external_transport: bool = False
self.tensors: List["torch.Tensor"] = []
# Buffer for transferring data between tasks in the same worker process.
# The key is the channel ID, and the value is the data. We don't use a
# lock when reading/writing the buffer because a DAG node actor will only
# execute one task at a time in `do_exec_tasks`. It will not execute multiple
# Ray tasks on a single actor simultaneously.
self.intra_process_channel_buffers: Dict[str, Any] = {}

def set_use_external_transport(self, use_external_transport: bool) -> None:
self.use_external_transport = use_external_transport

def set_data(self, channel_id: str, value: Any) -> None:
assert (
channel_id not in self.intra_process_channel_buffers
), f"Channel {channel_id} already exists in the buffer."
self.intra_process_channel_buffers[channel_id] = value

def get_data(self, channel_id: str) -> Any:
assert (
channel_id in self.intra_process_channel_buffers
), f"Channel {channel_id} does not exist in the buffer."
return self.intra_process_channel_buffers.pop(channel_id)

def reset_data(self, channel_id: str) -> None:
self.intra_process_channel_buffers.pop(channel_id, None)

def reset_tensors(self, tensors: List["torch.Tensor"]) -> List["torch.Tensor"]:
prev_tensors = self.tensors
self.tensors = tensors
Expand Down
Loading
Loading