From 1af30be62db94c0faa0ba1f8da692ee07fbec038 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 16 Dec 2024 12:06:43 -0800 Subject: [PATCH 1/8] remove legacy support and update explicit-comms config support --- ci/test_python.sh | 49 +--------------- dask_cuda/__init__.py | 18 +----- dask_cuda/benchmarks/local_cudf_shuffle.py | 1 - dask_cuda/benchmarks/read_parquet.py | 1 - dask_cuda/benchmarks/utils.py | 20 ------- dask_cuda/explicit_comms/dataframe/shuffle.py | 57 +++++++++---------- dask_cuda/tests/test_explicit_comms.py | 13 ----- pyproject.toml | 7 --- 8 files changed, 32 insertions(+), 134 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index 319efef2..fadd473e 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -44,9 +44,8 @@ set_exit_code() { trap set_exit_code ERR set +e -rapids-logger "pytest dask-cuda (dask-expr)" +rapids-logger "pytest dask-cuda" pushd dask_cuda -DASK_DATAFRAME__QUERY_PLANNING=True \ DASK_CUDA_TEST_SINGLE_GPU=1 \ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ @@ -65,43 +64,19 @@ timeout 90m pytest \ tests -k "not ucxx" popd -rapids-logger "pytest explicit-comms (legacy dd)" -pushd dask_cuda -DASK_DATAFRAME__QUERY_PLANNING=False \ -DASK_CUDA_TEST_SINGLE_GPU=1 \ -DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ -UCXPY_IFNAME=eth0 \ -UCX_WARN_UNUSED_ENV_VARS=n \ -UCX_MEMTYPE_CACHE=n \ -timeout 60m pytest \ - -vv \ - --durations=50 \ - --capture=no \ - --cache-clear \ - --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \ - --cov-config=../pyproject.toml \ - --cov=dask_cuda \ - --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \ - --cov-report=term \ - tests/test_explicit_comms.py -k "not ucxx" -popd - -rapids-logger "Run local benchmark (dask-expr)" -DASK_DATAFRAME__QUERY_PLANNING=True \ +rapids-logger "Run local benchmark" python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend dask -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --disable-rmm \ --partition-size="1 KiB" \ @@ -109,7 +84,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --disable-rmm-pool \ --partition-size="1 KiB" \ @@ -117,7 +91,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --partition-size="1 KiB" \ @@ -125,7 +98,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -134,7 +106,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -144,7 +115,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -154,20 +124,5 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -rapids-logger "Run local benchmark (legacy dd)" -DASK_DATAFRAME__QUERY_PLANNING=False \ -python dask_cuda/benchmarks/local_cudf_shuffle.py \ - --partition-size="1 KiB" \ - -d 0 \ - --runs 1 \ - --backend dask - -DASK_DATAFRAME__QUERY_PLANNING=False \ -python dask_cuda/benchmarks/local_cudf_shuffle.py \ - --partition-size="1 KiB" \ - -d 0 \ - --runs 1 \ - --backend explicit-comms - rapids-logger "Test script exiting with latest error code: $EXITCODE" exit ${EXITCODE} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 5711ac08..d9a775ff 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -15,29 +15,15 @@ from ._version import __git_commit__, __version__ from .cuda_worker import CUDAWorker from .explicit_comms.dataframe.shuffle import ( - get_rearrange_by_column_wrapper, get_default_shuffle_method, + patch_shuffle_expression, ) from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator -if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get( - "explicit-comms", False -): - raise NotImplementedError( - "The 'explicit-comms' config is not yet supported when " - "query-planning is enabled in dask. Please use the shuffle " - "API directly, or use the legacy dask-dataframe API " - "(set the 'dataframe.query-planning' config to `False`" - "before importing `dask.dataframe`).", - ) - - # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` -dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( - dask.dataframe.shuffle.rearrange_by_column -) +patch_shuffle_expression() # We have to replace all modules that imports Dask's `get_default_shuffle_method()` # TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack dask.dataframe.shuffle.get_default_shuffle_method = get_default_shuffle_method diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 3a0955c4..25c47fd8 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -246,7 +246,6 @@ def parse_args(): return parse_benchmark_args( description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args, - check_explicit_comms=False, ) diff --git a/dask_cuda/benchmarks/read_parquet.py b/dask_cuda/benchmarks/read_parquet.py index bce69673..4b34fd26 100644 --- a/dask_cuda/benchmarks/read_parquet.py +++ b/dask_cuda/benchmarks/read_parquet.py @@ -251,7 +251,6 @@ def parse_args(): args = parse_benchmark_args( description="Parquet read benchmark", args_list=special_args, - check_explicit_comms=False, ) args.no_show_p2p_bandwidth = True return args diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 4f87a025..84557f05 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -11,7 +11,6 @@ import numpy as np import pandas as pd -from dask import config from dask.distributed import Client, SSHCluster from dask.utils import format_bytes, format_time, parse_bytes from distributed.comm.addressing import get_address_host @@ -52,7 +51,6 @@ def as_noop(dsk): def parse_benchmark_args( description="Generic dask-cuda Benchmark", args_list=[], - check_explicit_comms=True, ): parser = argparse.ArgumentParser(description=description) worker_args = parser.add_argument_group(description="Worker configuration") @@ -377,24 +375,6 @@ def parse_benchmark_args( if args.multi_node and len(args.hosts.split(",")) < 2: raise ValueError("--multi-node requires at least 2 hosts") - # Raise error early if "explicit-comms" is not allowed - if ( - check_explicit_comms - and args.backend == "explicit-comms" - and config.get( - "dataframe.query-planning", - None, - ) - is not False - ): - raise NotImplementedError( - "The 'explicit-comms' config is not yet supported when " - "query-planning is enabled in dask. Please use the legacy " - "dask-dataframe API by setting the following environment " - "variable before executing:", - " DASK_DATAFRAME__QUERY_PLANNING=False", - ) - return args diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 70f12335..ba596dd0 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -1,8 +1,6 @@ from __future__ import annotations import asyncio -import functools -import inspect from collections import defaultdict from math import ceil from operator import getitem @@ -570,33 +568,6 @@ def _use_explicit_comms() -> bool: return False -def get_rearrange_by_column_wrapper(func): - """Returns a function wrapper that dispatch the shuffle to explicit-comms. - - Notice, this is monkey patched into Dask at dask_cuda import - """ - - func_sig = inspect.signature(func) - - @functools.wraps(func) - def wrapper(*args, **kwargs): - if _use_explicit_comms(): - # Convert `*args, **kwargs` to a dict of `keyword -> values` - kw = func_sig.bind(*args, **kwargs) - kw.apply_defaults() - kw = kw.arguments - # Notice, we only overwrite the default and the "tasks" shuffle - # algorithm. The "disk" and "p2p" algorithm, we don't touch. - if kw["shuffle_method"] in ("tasks", None): - col = kw["col"] - if isinstance(col, str): - col = [col] - return shuffle(kw["df"], col, kw["npartitions"], kw["ignore_index"]) - return func(*args, **kwargs) - - return wrapper - - def get_default_shuffle_method() -> str: """Return the default shuffle algorithm used by Dask @@ -607,3 +578,31 @@ def get_default_shuffle_method() -> str: if ret is None and _use_explicit_comms(): return "tasks" return dask.utils.get_default_shuffle_method() + + +def patch_shuffle_expression() -> None: + """Patch Dasks Shuffle expression. + + This changes ``Shuffle._lower`` to apply explicit-comms + shuffling when the 'explicit-comms' config is enabled. + """ + from dask_expr._collection import new_collection + from dask_expr._shuffle import Shuffle as DXShuffle + + _base_lower = DXShuffle._lower + + def _lower(self): + if self.method in ("tasks", None) and _use_explicit_comms(): + on = self.partitioning_index + on = [on] if isinstance(on, str) else on + return shuffle( + new_collection(self.frame), + on, + self.npartitions_out, + self.ignore_index, + ).expr + else: + # Use upstream lowering logic + return _base_lower(self) + + DXShuffle._lower = _lower diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 2806dc1c..96ff91e9 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -25,16 +25,6 @@ mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") -QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False - -# Skip these tests when dask-expr is active (for now) -query_planning_skip = pytest.mark.skipif( - QUERY_PLANNING_ON, - reason=( - "The 'explicit-comms' config is not supported " - "when query planning is enabled." - ), -) # Set default shuffle method to "tasks" if dask.config.get("dataframe.shuffle.method", None) is None: @@ -98,7 +88,6 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): pd.testing.assert_frame_equal(got, expected) -@query_planning_skip def test_dataframe_merge_empty_partitions(): # Notice, we use more partitions than rows p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) @@ -267,7 +256,6 @@ def check_shuffle(): check_shuffle() -@query_planning_skip @pytest.mark.parametrize("in_cluster", [True, False]) def test_dask_use_explicit_comms(in_cluster): def _timeout(process, function, timeout): @@ -330,7 +318,6 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): assert_eq(got, expected) -@query_planning_skip @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) diff --git a/pyproject.toml b/pyproject.toml index 01c8d956..cfe5397c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,13 +124,6 @@ filterwarnings = [ "error::FutureWarning", # remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed "ignore:There is no current event loop:DeprecationWarning:tornado", - # This warning must be filtered until dask-expr support - # is enabled in both dask-cudf and dask-cuda. - # See: https://github.com/rapidsai/dask-cuda/issues/1311 - "ignore:Dask DataFrame implementation is deprecated:DeprecationWarning", - # Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437 - # When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False` - "ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning", ] [tool.rapids-build-backend] From 3cf956b643eaf1f4c0ad6f47cf70e9eb6a9bd243 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Dec 2024 07:26:33 -0800 Subject: [PATCH 2/8] introduce ExplicitCommsShuffle wrapper --- dask_cuda/explicit_comms/dataframe/shuffle.py | 57 +++++++++++++++---- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index ba596dd0..d9154e00 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -583,24 +583,61 @@ def get_default_shuffle_method() -> str: def patch_shuffle_expression() -> None: """Patch Dasks Shuffle expression. - This changes ``Shuffle._lower`` to apply explicit-comms - shuffling when the 'explicit-comms' config is enabled. + Notice, this is monkey patched into Dask at dask_cuda + import, and it changes `Shuffle._lower` to wrap the + original shuffle expression in `ExplicitCommsShuffle`. """ from dask_expr._collection import new_collection + from dask_expr._expr import Expr from dask_expr._shuffle import Shuffle as DXShuffle + class ExplicitCommsShuffle(Expr): + """Explicit Comms Shuffle.""" + + _parameters = ["wrapped"] + + @property + def original(self): + assert len(self.wrapped) == 1, f"Unexpected parameters: {self.wrapped[1:]}" + return self.wrapped[0] + + @property + def _meta(self): + return self.original.frame._meta + + def _lower(self): + return None + + def _divisions(self): + return (None,) * (self.original.frame.npartitions + 1) + + def _layer(self): + if not hasattr(self, "_shuffle_cache"): + self._shuffle_cache = {} + try: + expr = self._shuffle_cache[self._name] + except KeyError: + on = self.original.partitioning_index + expr = shuffle( + new_collection(self.original.frame), + [on] if isinstance(on, str) else on, + self.original.npartitions_out, + self.original.ignore_index, + ) + self._shuffle_cache[self._name] = expr + graph = expr.dask.copy() + graph.update( + {(self._name, i): (expr._name, i) for i in range(self.npartitions)} + ) + return graph + _base_lower = DXShuffle._lower def _lower(self): if self.method in ("tasks", None) and _use_explicit_comms(): - on = self.partitioning_index - on = [on] if isinstance(on, str) else on - return shuffle( - new_collection(self.frame), - on, - self.npartitions_out, - self.ignore_index, - ).expr + # Wrap the original Shuffle in an ExplicitCommsShuffle + # (Use list argument to encapsulate dependencies) + return ExplicitCommsShuffle([self]) else: # Use upstream lowering logic return _base_lower(self) From 504cbb70da816588c45de4fdb89aacd536d49851 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Dec 2024 13:42:41 -0800 Subject: [PATCH 3/8] progress - maybe --- dask_cuda/explicit_comms/dataframe/shuffle.py | 75 ++++++------------- 1 file changed, 24 insertions(+), 51 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index d9154e00..a7c15f4b 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -584,62 +584,35 @@ def patch_shuffle_expression() -> None: """Patch Dasks Shuffle expression. Notice, this is monkey patched into Dask at dask_cuda - import, and it changes `Shuffle._lower` to wrap the - original shuffle expression in `ExplicitCommsShuffle`. + import, and it changes `TaskShuffle._layer` to execute + an explicit-comms shuffle. """ from dask_expr._collection import new_collection - from dask_expr._expr import Expr - from dask_expr._shuffle import Shuffle as DXShuffle - - class ExplicitCommsShuffle(Expr): - """Explicit Comms Shuffle.""" - - _parameters = ["wrapped"] - - @property - def original(self): - assert len(self.wrapped) == 1, f"Unexpected parameters: {self.wrapped[1:]}" - return self.wrapped[0] - - @property - def _meta(self): - return self.original.frame._meta - - def _lower(self): - return None - - def _divisions(self): - return (None,) * (self.original.frame.npartitions + 1) - - def _layer(self): - if not hasattr(self, "_shuffle_cache"): - self._shuffle_cache = {} - try: - expr = self._shuffle_cache[self._name] - except KeyError: - on = self.original.partitioning_index - expr = shuffle( - new_collection(self.original.frame), + from dask_expr._shuffle import TaskShuffle + + _base_layer = TaskShuffle._layer + + def _layer(self): + with open("debug.txt", "a") as f: + f.write(f"USING: {_use_explicit_comms()}\n") + if _use_explicit_comms(): + # Execute an explicit-comms shuffle + if not hasattr(self, "_ec_shuffled"): + on = self.partitioning_index + df = new_collection(self.frame) + self._ec_shuffled = shuffle( + df, [on] if isinstance(on, str) else on, - self.original.npartitions_out, - self.original.ignore_index, + self.npartitions_out, + self.ignore_index, ) - self._shuffle_cache[self._name] = expr - graph = expr.dask.copy() - graph.update( - {(self._name, i): (expr._name, i) for i in range(self.npartitions)} - ) + graph = self._ec_shuffled.dask.copy() + shuffled_name = self._ec_shuffled._name + for i in range(self.npartitions_out): + graph[(self._name, i)] = graph[(shuffled_name, i)] return graph - - _base_lower = DXShuffle._lower - - def _lower(self): - if self.method in ("tasks", None) and _use_explicit_comms(): - # Wrap the original Shuffle in an ExplicitCommsShuffle - # (Use list argument to encapsulate dependencies) - return ExplicitCommsShuffle([self]) else: # Use upstream lowering logic - return _base_lower(self) + return _base_layer(self) - DXShuffle._lower = _lower + TaskShuffle._layer = _layer From 20df12ebca71d776745e36c2efd2931243b32504 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Dec 2024 13:45:06 -0800 Subject: [PATCH 4/8] remove write statement --- dask_cuda/explicit_comms/dataframe/shuffle.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index a7c15f4b..227b67d1 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -593,8 +593,6 @@ def patch_shuffle_expression() -> None: _base_layer = TaskShuffle._layer def _layer(self): - with open("debug.txt", "a") as f: - f.write(f"USING: {_use_explicit_comms()}\n") if _use_explicit_comms(): # Execute an explicit-comms shuffle if not hasattr(self, "_ec_shuffled"): From 32bcb22e94c2e2eb80fe4eafa36f7e9a06da62c9 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Dec 2024 14:05:50 -0800 Subject: [PATCH 5/8] cleanup --- dask_cuda/explicit_comms/dataframe/shuffle.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 227b67d1..79e54060 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -587,17 +587,16 @@ def patch_shuffle_expression() -> None: import, and it changes `TaskShuffle._layer` to execute an explicit-comms shuffle. """ - from dask_expr._collection import new_collection - from dask_expr._shuffle import TaskShuffle + import dask_expr - _base_layer = TaskShuffle._layer + _base_layer = dask_expr._shuffle.TaskShuffle._layer - def _layer(self): + def _patched_layer(self): if _use_explicit_comms(): # Execute an explicit-comms shuffle if not hasattr(self, "_ec_shuffled"): on = self.partitioning_index - df = new_collection(self.frame) + df = dask_expr._collection.new_collection(self.frame) self._ec_shuffled = shuffle( df, [on] if isinstance(on, str) else on, @@ -613,4 +612,4 @@ def _layer(self): # Use upstream lowering logic return _base_layer(self) - TaskShuffle._layer = _layer + dask_expr._shuffle.TaskShuffle._layer = _patched_layer From 1c0d9744e408a3284649cbce8ae3a8ab7fce347c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Dec 2024 14:23:43 -0800 Subject: [PATCH 6/8] trigger explicit-comms-batchsize error --- dask_cuda/tests/test_explicit_comms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 96ff91e9..2f79251d 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -239,7 +239,7 @@ def check_shuffle(): ): dask.config.refresh() # Trigger re-read of the environment variables with pytest.raises(ValueError, match="explicit-comms-batchsize"): - ddf.shuffle(on="key", npartitions=4) + ddf.shuffle(on="key", npartitions=4).dask if in_cluster: with LocalCluster( From a7b20f762742b7782c0edfe3600ed3c779396064 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 18 Dec 2024 06:57:27 -0800 Subject: [PATCH 7/8] avoid get_default_shuffle_method patching by patching Shuffle._lower --- dask_cuda/__init__.py | 6 --- dask_cuda/explicit_comms/dataframe/shuffle.py | 41 ++++++++++--------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index d9a775ff..d6ac9655 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -15,7 +15,6 @@ from ._version import __git_commit__, __version__ from .cuda_worker import CUDAWorker from .explicit_comms.dataframe.shuffle import ( - get_default_shuffle_method, patch_shuffle_expression, ) from .local_cuda_cluster import LocalCUDACluster @@ -24,11 +23,6 @@ # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` patch_shuffle_expression() -# We have to replace all modules that imports Dask's `get_default_shuffle_method()` -# TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack -dask.dataframe.shuffle.get_default_shuffle_method = get_default_shuffle_method -dask.dataframe.multi.get_default_shuffle_method = get_default_shuffle_method -dask.bag.core.get_default_shuffle_method = get_default_shuffle_method # Monkey patching Dask to make use of proxify and unproxify in compatibility mode diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 79e54060..600da07d 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -568,31 +568,20 @@ def _use_explicit_comms() -> bool: return False -def get_default_shuffle_method() -> str: - """Return the default shuffle algorithm used by Dask - - This changes the default shuffle algorithm from "p2p" to "tasks" - when explicit comms is enabled. - """ - ret = dask.config.get("dataframe.shuffle.algorithm", None) - if ret is None and _use_explicit_comms(): - return "tasks" - return dask.utils.get_default_shuffle_method() - - def patch_shuffle_expression() -> None: """Patch Dasks Shuffle expression. Notice, this is monkey patched into Dask at dask_cuda - import, and it changes `TaskShuffle._layer` to execute - an explicit-comms shuffle. + import, and it changes `Shuffle._layer` to lower into + an `ECShuffle` expression when the 'explicit-comms' + config is set to `True`. """ import dask_expr - _base_layer = dask_expr._shuffle.TaskShuffle._layer + class ECShuffle(dask_expr._shuffle.TaskShuffle): + """Explicit-Comms Shuffle Expression.""" - def _patched_layer(self): - if _use_explicit_comms(): + def _layer(self): # Execute an explicit-comms shuffle if not hasattr(self, "_ec_shuffled"): on = self.partitioning_index @@ -608,8 +597,20 @@ def _patched_layer(self): for i in range(self.npartitions_out): graph[(self._name, i)] = graph[(shuffled_name, i)] return graph + + _base_lower = dask_expr._shuffle.Shuffle._lower + + def _patched_lower(self): + if self.method in (None, "tasks") and _use_explicit_comms(): + return ECShuffle( + self.frame, + self.partitioning_index, + self.npartitions_out, + self.ignore_index, + self.options, + self.original_partitioning_index, + ) else: - # Use upstream lowering logic - return _base_layer(self) + return _base_lower(self) - dask_expr._shuffle.TaskShuffle._layer = _patched_layer + dask_expr._shuffle.Shuffle._lower = _patched_lower From 524aed4c9742bc025edbac8404191b4fb7932268 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 18 Dec 2024 08:50:32 -0800 Subject: [PATCH 8/8] add ValueError for legacy dask-dataframe when dask_cuda is imported --- dask_cuda/__init__.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index d6ac9655..d07634f2 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -7,24 +7,28 @@ import dask.utils import dask.dataframe.core import dask.dataframe.shuffle -import dask.dataframe.multi -import dask.bag.core +from .explicit_comms.dataframe.shuffle import patch_shuffle_expression +from dask.dataframe import DASK_EXPR_ENABLED from distributed.protocol.cuda import cuda_deserialize, cuda_serialize from distributed.protocol.serialize import dask_deserialize, dask_serialize from ._version import __git_commit__, __version__ from .cuda_worker import CUDAWorker -from .explicit_comms.dataframe.shuffle import ( - patch_shuffle_expression, -) + from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator -# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` -patch_shuffle_expression() +if not DASK_EXPR_ENABLED: + raise ValueError( + "Dask-CUDA no longer supports the legacy Dask DataFrame API. " + "Please set the 'dataframe.query-planning' config to `True` " + "or None, or downgrade RAPIDS to <=24.12." + ) +# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` +patch_shuffle_expression() # Monkey patching Dask to make use of proxify and unproxify in compatibility mode dask.dataframe.shuffle.shuffle_group = proxify_decorator( dask.dataframe.shuffle.shuffle_group