Skip to content

Commit

Permalink
[Data] Allow specifing both num_cpus and num_gpus for map APIs (#…
Browse files Browse the repository at this point in the history
…47995)

## Why are these changes needed?

Currently, we enforce that `num_cpus` and `num_gpus` cannot be both set
for map operations. This PR enables the user to specify both of these
parameters, as we believe that with the recent improvements to the
scheduler, Ray Data should be able to smoothly support this scenario.
However, we will warn users that this is still an experimental feature.

Updated doc example (updated for all map APIs):
![Screenshot at Oct 11
19-00-23](https://github.com/user-attachments/assets/2bfd6902-e2b4-4ae8-9550-604ec59d3b6b)

## Related issue number

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Scott Lee <[email protected]>
  • Loading branch information
scottjlee authored Oct 15, 2024
1 parent 680468e commit 5a65213
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 18 deletions.
23 changes: 13 additions & 10 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import functools
import itertools
import logging
from abc import ABC, abstractmethod
from collections import defaultdict, deque
from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union
Expand Down Expand Up @@ -37,6 +38,8 @@
from ray.data.context import DataContext
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

logger = logging.getLogger(__name__)


class MapOperator(OneToOneOperator, ABC):
"""A streaming operator that maps input bundles 1:1 to output bundles.
Expand Down Expand Up @@ -645,16 +648,16 @@ def _canonicalize_ray_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str,
and should not be a serious limitation for users.
"""
ray_remote_args = ray_remote_args.copy()

if ray_remote_args.get("num_cpus") and ray_remote_args.get("num_gpus"):
logger.warning(
"Specifying both num_cpus and num_gpus for map tasks is experimental, "
"and may result in scheduling or stability issues. "
"Please report any issues to the Ray team: "
"https://github.com/ray-project/ray/issues/new/choose"
)

if "num_cpus" not in ray_remote_args and "num_gpus" not in ray_remote_args:
ray_remote_args["num_cpus"] = 1
if ray_remote_args.get("num_gpus", 0) > 0:
if ray_remote_args.get("num_cpus", 0) != 0:
raise ValueError(
"It is not allowed to specify both num_cpus and num_gpus for map tasks."
)
elif ray_remote_args.get("num_cpus", 0) > 0:
if ray_remote_args.get("num_gpus", 0) != 0:
raise ValueError(
"It is not allowed to specify both num_cpus and num_gpus for map tasks."
)

return ray_remote_args
18 changes: 18 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ def map(
If your transformation is vectorized like most NumPy or pandas operations,
:meth:`~Dataset.map_batches` might be faster.
.. warning::
Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
and may result in scheduling or stability issues. Please
`report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
to the Ray team.
Examples:
.. testcode::
Expand Down Expand Up @@ -417,6 +423,12 @@ def map_batches(
If ``fn`` doesn't mutate its input, set ``zero_copy_batch=True`` to improve
performance and decrease memory utilization.
.. warning::
Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
and may result in scheduling or stability issues. Please
`report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
to the Ray team.
Examples:
Call :meth:`~Dataset.map_batches` to transform your data.
Expand Down Expand Up @@ -973,6 +985,12 @@ def flat_map(
transformation is vectorized like most NumPy and pandas operations,
it might be faster.
.. warning::
Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
and may result in scheduling or stability issues. Please
`report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
to the Ray team.
Examples:
.. testcode::
Expand Down
6 changes: 6 additions & 0 deletions python/ray/data/grouped_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ def map_groups(
In general, prefer to use aggregate() instead of map_groups().
.. warning::
Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
and may result in scheduling or stability issues. Please
`report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
to the Ray team.
Examples:
>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out).
Expand Down
20 changes: 12 additions & 8 deletions python/ray/data/tests/test_executor_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,18 @@ def test_resource_canonicalization(ray_start_10_cpus_shared):
)
assert op._ray_remote_args == {"num_gpus": 2}

with pytest.raises(ValueError):
MapOperator.create(
_mul2_map_data_prcessor,
input_op=input_op,
name="TestMapper",
compute_strategy=TaskPoolStrategy(),
ray_remote_args={"num_gpus": 2, "num_cpus": 1},
)
op = MapOperator.create(
_mul2_map_data_prcessor,
input_op=input_op,
name="TestMapper",
compute_strategy=TaskPoolStrategy(),
ray_remote_args={"num_gpus": 2, "num_cpus": 1},
)
assert op.base_resource_usage() == ExecutionResources()
assert op.incremental_resource_usage() == ExecutionResources(
cpu=1, gpu=2, object_store_memory=inc_obj_store_mem
)
assert op._ray_remote_args == {"num_gpus": 2, "num_cpus": 1}


def test_execution_options_resource_limit():
Expand Down

0 comments on commit 5a65213

Please sign in to comment.