Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aDAG] Support all reduce collective in aDAG #47621

Merged
merged 96 commits into from
Oct 21, 2024

Conversation

dengwxn
Copy link
Contributor

@dengwxn dengwxn commented Sep 12, 2024

Why are these changes needed?

aDAG currently does not support collective APIs. We would like to add support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar ray.experimental.collective.allreduce.bind. The bind accepts arguments input_nodes, op, and transport. It returns a list of CollectiveOutputNode as the allreduce results, with the same size of input_nodes. The allreduce results write to newly allocated tensors. In the COMPUTE operation of CollectiveOutputNode, the corresponding NCCL collective API is called. There are no required changes for the input and output channels of CollectiveOutputNode.

Proposed new API:

import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)

API Requirements:

  1. Input nodes are unique.
  2. Actor handles are unique.
  3. Actor handles match the custom NCCL group if specified.
  4. All tensors have the same shape.

Requirements 1-3 are checked in the _CollectiveOperation constructor. Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective operation. When a NCCL collective node is selected, all the corresponding collective nodes in the collective group should be selected as well.

Related issue number

Meta-issue: #47983

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

The input index is used when a task returns multiple values. An index is used to get the corresponding value of the tuple.

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
The downstream tasks of a TaskReturnNode should be readers of a output channel. The task of a TaskRetureNode should have a copy of the output channel in the task of its upstream ClassMethodNode.

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
AndyUB and others added 9 commits October 17, 2024 16:26
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Left a few remaining comments for cleanups and to see if we can further cut down on the tests that need GPUs to run. The only functionality we really need to test for GPUs is:

  • does it provide the expected allreduce result?
  • does passing a custom communicator actually result in calling the custom communicator during execution?

Otherwise, all logic should go in unit tests that don't need GPUs to run.

python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
python/ray/dag/dag_node_operation.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_collective_dag.py Outdated Show resolved Hide resolved
python/ray/experimental/collective/allreduce.py Outdated Show resolved Hide resolved
python/ray/experimental/collective/allreduce.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_torch_tensor_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_torch_tensor_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_torch_tensor_dag.py Outdated Show resolved Hide resolved
AndyUB and others added 12 commits October 18, 2024 16:40
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Move Deduplicate P2P & Collective
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Weixin Deng <[email protected]>
@stephanie-wang stephanie-wang merged commit 2c68a4b into ray-project:master Oct 21, 2024
5 checks passed
@dengwxn dengwxn deleted the ccar-0905 branch October 25, 2024 22:32
Jay-ju pushed a commit to Jay-ju/ray that referenced this pull request Nov 5, 2024
aDAG currently does not support collective APIs. We would like to add
support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar
`ray.experimental.collective.allreduce.bind`. The `bind` accepts
arguments `input_nodes`, `op`, and `transport`. It returns a list of
`CollectiveOutputNode` as the allreduce results, with the same size of
`input_nodes`. The allreduce results write to newly allocated tensors.
In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding
NCCL collective API is called. There are no required changes for the
input and output channels of `CollectiveOutputNode`.

Proposed new API:

```python
import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)
```

API Requirements:
1. Input nodes are unique.
2. Actor handles are unique.
3. Actor handles match the custom NCCL group if specified.
4. All tensors have the same shape.

Requirements 1-3 are checked in the `_CollectiveOperation` constructor.
Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective
operation. When a NCCL collective node is selected, all the
corresponding collective nodes in the collective group should be
selected as well.

Meta-issue: ray-project#47983


---------

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Co-authored-by: Yuhan Ruan <[email protected]>
JP-sDEV pushed a commit to JP-sDEV/ray that referenced this pull request Nov 14, 2024
aDAG currently does not support collective APIs. We would like to add
support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar
`ray.experimental.collective.allreduce.bind`. The `bind` accepts
arguments `input_nodes`, `op`, and `transport`. It returns a list of
`CollectiveOutputNode` as the allreduce results, with the same size of
`input_nodes`. The allreduce results write to newly allocated tensors.
In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding
NCCL collective API is called. There are no required changes for the
input and output channels of `CollectiveOutputNode`.

Proposed new API:

```python
import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)
```

API Requirements:
1. Input nodes are unique.
2. Actor handles are unique.
3. Actor handles match the custom NCCL group if specified.
4. All tensors have the same shape.

Requirements 1-3 are checked in the `_CollectiveOperation` constructor.
Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective
operation. When a NCCL collective node is selected, all the
corresponding collective nodes in the collective group should be
selected as well.

Meta-issue: ray-project#47983


---------

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Co-authored-by: Yuhan Ruan <[email protected]>
mohitjain2504 pushed a commit to mohitjain2504/ray that referenced this pull request Nov 15, 2024
aDAG currently does not support collective APIs. We would like to add
support for collective APIs, starting from allreduce.

This PR support allreduce by introducing a syntax sugar
`ray.experimental.collective.allreduce.bind`. The `bind` accepts
arguments `input_nodes`, `op`, and `transport`. It returns a list of
`CollectiveOutputNode` as the allreduce results, with the same size of
`input_nodes`. The allreduce results write to newly allocated tensors.
In the `COMPUTE` operation of `CollectiveOutputNode`, the corresponding
NCCL collective API is called. There are no required changes for the
input and output channels of `CollectiveOutputNode`.

Proposed new API:

```python
import ray.experimental.collective as collective

with InputNode() as inp:
    dag = [worker.return_tensor.bind(inp) for worker in workers]
    dag = collective.allreduce.bind(dag, ReduceOp.SUM)
    dag = MultiOutputNode(dag)
```

API Requirements:
1. Input nodes are unique.
2. Actor handles are unique.
3. Actor handles match the custom NCCL group if specified.
4. All tensors have the same shape.

Requirements 1-3 are checked in the `_CollectiveOperation` constructor.
Requirement 4 is checked by runtime timeout.

The operation scheduling is also updated to consider the NCCL collective
operation. When a NCCL collective node is selected, all the
corresponding collective nodes in the collective group should be
selected as well.

Meta-issue: ray-project#47983

---------

Signed-off-by: Weixin Deng <[email protected]>
Signed-off-by: Yuhan Ruan <[email protected]>
Co-authored-by: Yuhan Ruan <[email protected]>
Signed-off-by: mohitjain2504 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. compiled-graphs core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants