From 5a65213b91bf0ec27a29b048c05ed2676065f5fc Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 15 Oct 2024 14:10:53 -0700 Subject: [PATCH] [Data] Allow specifing both `num_cpus` and `num_gpus` for map APIs (#47995) ## Why are these changes needed? Currently, we enforce that `num_cpus` and `num_gpus` cannot be both set for map operations. This PR enables the user to specify both of these parameters, as we believe that with the recent improvements to the scheduler, Ray Data should be able to smoothly support this scenario. However, we will warn users that this is still an experimental feature. Updated doc example (updated for all map APIs): ![Screenshot at Oct 11 19-00-23](https://github.com/user-attachments/assets/2bfd6902-e2b4-4ae8-9550-604ec59d3b6b) ## Related issue number ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Scott Lee --- .../execution/operators/map_operator.py | 23 +++++++++++-------- python/ray/data/dataset.py | 18 +++++++++++++++ python/ray/data/grouped_data.py | 6 +++++ .../test_executor_resource_management.py | 20 +++++++++------- 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index a3d98bbeee3b..6a42e0c760af 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -1,6 +1,7 @@ import copy import functools import itertools +import logging from abc import ABC, abstractmethod from collections import defaultdict, deque from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union @@ -37,6 +38,8 @@ from ray.data.context import DataContext from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +logger = logging.getLogger(__name__) + class MapOperator(OneToOneOperator, ABC): """A streaming operator that maps input bundles 1:1 to output bundles. @@ -645,16 +648,16 @@ def _canonicalize_ray_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, and should not be a serious limitation for users. """ ray_remote_args = ray_remote_args.copy() + + if ray_remote_args.get("num_cpus") and ray_remote_args.get("num_gpus"): + logger.warning( + "Specifying both num_cpus and num_gpus for map tasks is experimental, " + "and may result in scheduling or stability issues. " + "Please report any issues to the Ray team: " + "https://github.com/ray-project/ray/issues/new/choose" + ) + if "num_cpus" not in ray_remote_args and "num_gpus" not in ray_remote_args: ray_remote_args["num_cpus"] = 1 - if ray_remote_args.get("num_gpus", 0) > 0: - if ray_remote_args.get("num_cpus", 0) != 0: - raise ValueError( - "It is not allowed to specify both num_cpus and num_gpus for map tasks." - ) - elif ray_remote_args.get("num_cpus", 0) > 0: - if ray_remote_args.get("num_gpus", 0) != 0: - raise ValueError( - "It is not allowed to specify both num_cpus and num_gpus for map tasks." - ) + return ray_remote_args diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 2316afe958b8..ea0efdca8a14 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -272,6 +272,12 @@ def map( If your transformation is vectorized like most NumPy or pandas operations, :meth:`~Dataset.map_batches` might be faster. + .. warning:: + Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental, + and may result in scheduling or stability issues. Please + `report any issues `_ + to the Ray team. + Examples: .. testcode:: @@ -417,6 +423,12 @@ def map_batches( If ``fn`` doesn't mutate its input, set ``zero_copy_batch=True`` to improve performance and decrease memory utilization. + .. warning:: + Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental, + and may result in scheduling or stability issues. Please + `report any issues `_ + to the Ray team. + Examples: Call :meth:`~Dataset.map_batches` to transform your data. @@ -973,6 +985,12 @@ def flat_map( transformation is vectorized like most NumPy and pandas operations, it might be faster. + .. warning:: + Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental, + and may result in scheduling or stability issues. Please + `report any issues `_ + to the Ray team. + Examples: .. testcode:: diff --git a/python/ray/data/grouped_data.py b/python/ray/data/grouped_data.py index c76d6cee7615..e479908136a2 100644 --- a/python/ray/data/grouped_data.py +++ b/python/ray/data/grouped_data.py @@ -129,6 +129,12 @@ def map_groups( In general, prefer to use aggregate() instead of map_groups(). + .. warning:: + Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental, + and may result in scheduling or stability issues. Please + `report any issues `_ + to the Ray team. + Examples: >>> # Return a single record per group (list of multiple records in, >>> # list of a single record out). diff --git a/python/ray/data/tests/test_executor_resource_management.py b/python/ray/data/tests/test_executor_resource_management.py index d4d73956d684..8bf3984ba7cc 100644 --- a/python/ray/data/tests/test_executor_resource_management.py +++ b/python/ray/data/tests/test_executor_resource_management.py @@ -123,14 +123,18 @@ def test_resource_canonicalization(ray_start_10_cpus_shared): ) assert op._ray_remote_args == {"num_gpus": 2} - with pytest.raises(ValueError): - MapOperator.create( - _mul2_map_data_prcessor, - input_op=input_op, - name="TestMapper", - compute_strategy=TaskPoolStrategy(), - ray_remote_args={"num_gpus": 2, "num_cpus": 1}, - ) + op = MapOperator.create( + _mul2_map_data_prcessor, + input_op=input_op, + name="TestMapper", + compute_strategy=TaskPoolStrategy(), + ray_remote_args={"num_gpus": 2, "num_cpus": 1}, + ) + assert op.base_resource_usage() == ExecutionResources() + assert op.incremental_resource_usage() == ExecutionResources( + cpu=1, gpu=2, object_store_memory=inc_obj_store_mem + ) + assert op._ray_remote_args == {"num_gpus": 2, "num_cpus": 1} def test_execution_options_resource_limit():