Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAG]Enable NPU (hccl) communication for CG #47658

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e1743e9
fix
Bye-legumes Sep 13, 2024
38bd814
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Sep 19, 2024
90cc57a
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Sep 23, 2024
305a930
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Sep 25, 2024
3ef20be
fix
Bye-legumes Sep 25, 2024
791a728
fix
Bye-legumes Sep 25, 2024
6df1ee6
fix
Bye-legumes Sep 25, 2024
eed8f50
fix
Bye-legumes Sep 25, 2024
20a9d24
fix
Bye-legumes Sep 25, 2024
90d8536
fix
Bye-legumes Sep 25, 2024
66b3a90
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Sep 26, 2024
94eb57a
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Oct 2, 2024
e3cfcce
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Oct 4, 2024
1adf211
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Oct 4, 2024
2a84356
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Oct 9, 2024
b846fd6
fix
Bye-legumes Oct 9, 2024
5084059
fix
Bye-legumes Oct 10, 2024
f1cf7a3
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Oct 11, 2024
c5729e3
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Oct 15, 2024
aeb12de
fix
Bye-legumes Nov 18, 2024
3bcc211
fix
Bye-legumes Nov 18, 2024
13c77e8
fix
Bye-legumes Nov 18, 2024
4c8fd87
fix
Bye-legumes Nov 18, 2024
81da1f0
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Nov 19, 2024
ff13bed
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Nov 19, 2024
408f82b
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Nov 19, 2024
f225933
Update python/ray/experimental/channel/torch_tensor_type.py
Bye-legumes Nov 22, 2024
bc1a005
Dfix
Bye-legumes Nov 22, 2024
e55ea68
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Nov 22, 2024
ed8def5
Merge branch 'master' into AcceleratorCommunicator
Bye-legumes Nov 22, 2024
87cddcf
fix
Bye-legumes Nov 22, 2024
3ffdef5
fix
Bye-legumes Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/experimental/channel/gpu_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@DeveloperAPI
class GPUCommunicator(ABC):
"""
Communicator for a group of aDAG actors on Nvidia GPU.
Communicator for a group of aDAG actors on Nvidia GPU or other XPUs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably change the class name to a more general one if this is to support other XPUs. This is not yet used externally so backward compatibility is not an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Next step I prefer to change it to AcceleratorCommunicator or just Communicator for all. Currently, this GPUCommunicator is also called from some top level so I just keep it now.


The aDAG execution leverages this internally to support communication
between actors in the group.
Expand Down
168 changes: 168 additions & 0 deletions python/ray/experimental/channel/hccl_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
import os

import ray
from ray.exceptions import RayChannelError
from ray.experimental.channel.gpu_communicator import GPUCommunicator,


if TYPE_CHECKING:
import torch


logger = logging.getLogger(__name__)


class _HcclGroup(GPUCommunicator):
"""
Represents an actor's HCCL communicator using NPUs.

This is the default HCCL communicator to be used in aDAG if a custom communicator is not provided.

This class is not thread-safe.
"""

def __init__(
self,
world_size: int,
comm_id: int,
rank: Optional[int],
actor_handles: List["ray.actor.ActorHandle"],
device_id: Optional[int],
):
"""
Initialize an HCCL communicator that can be used to communicate p2p with
other NPU actors.

This method blocks until the same call has been made on all other
actors in the group, with the same arguments for world_size and comm_id.

Args:
world_size: The number of participating actors/devices.
comm_id: A unique communicator ID.
rank: The rank of this actor. If None, then the caller is not a
participant of the HCCL group.
actor_handles: A list of actor handles, in rank order.
device_id: The NPU device id to use for HCCL operations.
"""
self._world_size = world_size
self._rank: Optional[int] = rank
self._actor_handles = actor_handles
self._device_id = device_id

if rank is not None:
assert ray.get_gpu_ids(), "HCCL actor has no NPUs assigned"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray.get_gpu_ids() seems to only get GPU IDs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I just changed it. Also I think there should be a API to get all Accelerator id?

assert device_id is not None, "HCCL actor must specify device_id"

expected_rank = self.get_rank(ray.get_runtime_context().current_actor)
assert (
rank == expected_rank
), f"HCCL actor's rank {rank} does not match expected rank {expected_rank}"

import torch
import torch_npu
import torch.distributed as dist

# Initialize HCCL process group
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
os.environ['HCCL_WHITELIST_DISABLE'] = '1'
Bye-legumes marked this conversation as resolved.
Show resolved Hide resolved
torch_npu.npu.set_device(device_id)
dist.init_process_group(backend='hccl', world_size=world_size, rank=rank)

self._comm = dist
else:
self._comm = None

self._closed = False

def initialize(self, rank: int) -> None:
# No additional initialization is needed.
pass

def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
return self._actor_handles

def get_rank(self, actor: ray.actor.ActorHandle) -> int:
"""
Return the given actor's rank in the HCCL communicator.

Args:
actor: The actor handle to look up.
"""
actor_ids = [a._ray_actor_id for a in self._actor_handles]
try:
rank = actor_ids.index(actor._ray_actor_id)
except ValueError:
raise ValueError("Actor is not in the HCCL group.")
return rank

def get_self_rank(self) -> Optional[int]:
"""
Return this actor's rank.
"""
return self._rank

def get_world_size(self) -> int:
"""
Return the number of ranks in the HCCL communicator.
"""
return self._world_size

def send(self, value: "torch.Tensor", peer_rank: int) -> None:
"""
Send a torch.Tensor to a peer.

Args:
value: The torch.Tensor to send. It should already be on this
actor's NPU device.
peer_rank: The rank of the actor to send to.
"""
if self._closed:
raise RayChannelError("HCCL group has been destroyed.")

self._comm.send(tensor=value, dst=peer_rank)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question I have is how is this different than nccl_collective_group send/recv? It seems nccl_collective_group just abstracts it higher level as _point2point, but otherwise identical to nccl_group.

If it's supposed to channel-only then we can merge this hccl_group, then later we'll open another PR for hccl_collective_group

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think collective is a more general module that can be used for all other ray module. While here we need a module specify for aDAG channel. I think we can try to have another PR for hccl_collective_group so it can be used as a utils so we can use the NPU easier. In collective we can try to solve the double import or other problems that we meet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in yesterday's aDAG meeting someone mentioned nccl_collective_group is actually old code, and nccl_group send/receive is what's currently used. We can discuss more to see how to extend it to support collectives it as apart of the refactor proposal.

Copy link

@arcyleung arcyleung Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another PR to support collective fn as a node type #47621

I see they implemented collective/allreduce.py which calls allreduce of the GPUCommunicator in nccl_group.py


def recv(
self,
shape: Tuple[int],
dtype: "torch.dtype",
peer_rank: int,
allocator: Optional[Callable[[Tuple[int], "torch.dtype"], "torch.Tensor"]] = None,
) -> "torch.Tensor":
"""
Receive a torch.Tensor from a peer.

Args:
shape: The shape of the tensor to receive.
dtype: The dtype of the tensor to receive.
peer_rank: The rank of the actor to receive from.
allocator: A function to allocate the tensor to receive into.
"""
if self._closed:
raise RayChannelError("HCCL group has been destroyed.")
assert allocator is not None, "HCCL group requires a tensor allocator"

# Allocate the receive buffer
buf = allocator(shape, dtype)
self._comm.recv(tensor=buf, src=peer_rank)
return buf

def destroy(self) -> None:
"""
Destroy the HCCL group.
"""
if self._closed:
return

self._closed = True

if self._comm is not None:
logger.info(
"Destructing HCCL group on actor: "
f"{ray.get_runtime_context().current_actor}"
)
# Clean up the HCCL process group
self._comm.destroy_process_group()

28 changes: 27 additions & 1 deletion python/ray/experimental/channel/torch_tensor_nccl_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
GPUCommunicator,
TorchTensorAllocator,
)
from ray.experimental.channel.nccl_group import _NcclGroup
from ray.experimental.channel.shared_memory_channel import SharedMemoryType
from ray.experimental.channel.torch_tensor_type import TENSOR_METADATA_SIZE_BYTES
from ray.util.annotations import DeveloperAPI
Expand All @@ -29,6 +28,33 @@
# entry/init points.
logger = logging.getLogger(__name__)

def _get_current_device_type() -> str:
"""
Check the current device type (GPU or NPU) and return its name.

Returns:
A string indicating the device type, either 'cuda' for GPU or 'npu' for NPU.
"""
import torch

# Get the current device type
if torch.cuda.is_available():
return "cuda"
elif hasattr(torch, "npu") and torch.npu.is_available():
return "npu"
else:
raise RuntimeError("No supported accelerator device (GPU or NPU) found.")


# Determine which communicator to use based on the current device type
device_type = _get_current_device_type()
if device_type == "npu":
from ray.experimental.channel.nccl_group import _NcclGroup

else:
from ray.experimental.channel.hccl_group import _HcclGroup as _NcclGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this looks like a hack. Do you plan to change to a cleaner approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I just remove this hack and left a comments in the test. After we refactor the channel we can have better solution.




class NestedTorchTensorNcclChannel(ChannelInterface):
def __init__(
Expand Down