diff --git a/ci/test_python.sh b/ci/test_python.sh index 319efef2..fadd473e 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -44,9 +44,8 @@ set_exit_code() { trap set_exit_code ERR set +e -rapids-logger "pytest dask-cuda (dask-expr)" +rapids-logger "pytest dask-cuda" pushd dask_cuda -DASK_DATAFRAME__QUERY_PLANNING=True \ DASK_CUDA_TEST_SINGLE_GPU=1 \ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ @@ -65,43 +64,19 @@ timeout 90m pytest \ tests -k "not ucxx" popd -rapids-logger "pytest explicit-comms (legacy dd)" -pushd dask_cuda -DASK_DATAFRAME__QUERY_PLANNING=False \ -DASK_CUDA_TEST_SINGLE_GPU=1 \ -DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ -UCXPY_IFNAME=eth0 \ -UCX_WARN_UNUSED_ENV_VARS=n \ -UCX_MEMTYPE_CACHE=n \ -timeout 60m pytest \ - -vv \ - --durations=50 \ - --capture=no \ - --cache-clear \ - --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \ - --cov-config=../pyproject.toml \ - --cov=dask_cuda \ - --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \ - --cov-report=term \ - tests/test_explicit_comms.py -k "not ucxx" -popd - -rapids-logger "Run local benchmark (dask-expr)" -DASK_DATAFRAME__QUERY_PLANNING=True \ +rapids-logger "Run local benchmark" python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend dask -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --disable-rmm \ --partition-size="1 KiB" \ @@ -109,7 +84,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --disable-rmm-pool \ --partition-size="1 KiB" \ @@ -117,7 +91,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --partition-size="1 KiB" \ @@ -125,7 +98,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -134,7 +106,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -144,7 +115,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -DASK_DATAFRAME__QUERY_PLANNING=True \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --rmm-pool-size 2GiB \ --rmm-maximum-pool-size 4GiB \ @@ -154,20 +124,5 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --runs 1 \ --backend explicit-comms -rapids-logger "Run local benchmark (legacy dd)" -DASK_DATAFRAME__QUERY_PLANNING=False \ -python dask_cuda/benchmarks/local_cudf_shuffle.py \ - --partition-size="1 KiB" \ - -d 0 \ - --runs 1 \ - --backend dask - -DASK_DATAFRAME__QUERY_PLANNING=False \ -python dask_cuda/benchmarks/local_cudf_shuffle.py \ - --partition-size="1 KiB" \ - -d 0 \ - --runs 1 \ - --backend explicit-comms - rapids-logger "Test script exiting with latest error code: $EXITCODE" exit ${EXITCODE} diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 5711ac08..d07634f2 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -7,44 +7,28 @@ import dask.utils import dask.dataframe.core import dask.dataframe.shuffle -import dask.dataframe.multi -import dask.bag.core +from .explicit_comms.dataframe.shuffle import patch_shuffle_expression +from dask.dataframe import DASK_EXPR_ENABLED from distributed.protocol.cuda import cuda_deserialize, cuda_serialize from distributed.protocol.serialize import dask_deserialize, dask_serialize from ._version import __git_commit__, __version__ from .cuda_worker import CUDAWorker -from .explicit_comms.dataframe.shuffle import ( - get_rearrange_by_column_wrapper, - get_default_shuffle_method, -) + from .local_cuda_cluster import LocalCUDACluster from .proxify_device_objects import proxify_decorator, unproxify_decorator -if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get( - "explicit-comms", False -): - raise NotImplementedError( - "The 'explicit-comms' config is not yet supported when " - "query-planning is enabled in dask. Please use the shuffle " - "API directly, or use the legacy dask-dataframe API " - "(set the 'dataframe.query-planning' config to `False`" - "before importing `dask.dataframe`).", +if not DASK_EXPR_ENABLED: + raise ValueError( + "Dask-CUDA no longer supports the legacy Dask DataFrame API. " + "Please set the 'dataframe.query-planning' config to `True` " + "or None, or downgrade RAPIDS to <=24.12." ) # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` -dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( - dask.dataframe.shuffle.rearrange_by_column -) -# We have to replace all modules that imports Dask's `get_default_shuffle_method()` -# TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack -dask.dataframe.shuffle.get_default_shuffle_method = get_default_shuffle_method -dask.dataframe.multi.get_default_shuffle_method = get_default_shuffle_method -dask.bag.core.get_default_shuffle_method = get_default_shuffle_method - - +patch_shuffle_expression() # Monkey patching Dask to make use of proxify and unproxify in compatibility mode dask.dataframe.shuffle.shuffle_group = proxify_decorator( dask.dataframe.shuffle.shuffle_group diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 3a0955c4..25c47fd8 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -246,7 +246,6 @@ def parse_args(): return parse_benchmark_args( description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args, - check_explicit_comms=False, ) diff --git a/dask_cuda/benchmarks/read_parquet.py b/dask_cuda/benchmarks/read_parquet.py index bce69673..4b34fd26 100644 --- a/dask_cuda/benchmarks/read_parquet.py +++ b/dask_cuda/benchmarks/read_parquet.py @@ -251,7 +251,6 @@ def parse_args(): args = parse_benchmark_args( description="Parquet read benchmark", args_list=special_args, - check_explicit_comms=False, ) args.no_show_p2p_bandwidth = True return args diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 4f87a025..84557f05 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -11,7 +11,6 @@ import numpy as np import pandas as pd -from dask import config from dask.distributed import Client, SSHCluster from dask.utils import format_bytes, format_time, parse_bytes from distributed.comm.addressing import get_address_host @@ -52,7 +51,6 @@ def as_noop(dsk): def parse_benchmark_args( description="Generic dask-cuda Benchmark", args_list=[], - check_explicit_comms=True, ): parser = argparse.ArgumentParser(description=description) worker_args = parser.add_argument_group(description="Worker configuration") @@ -377,24 +375,6 @@ def parse_benchmark_args( if args.multi_node and len(args.hosts.split(",")) < 2: raise ValueError("--multi-node requires at least 2 hosts") - # Raise error early if "explicit-comms" is not allowed - if ( - check_explicit_comms - and args.backend == "explicit-comms" - and config.get( - "dataframe.query-planning", - None, - ) - is not False - ): - raise NotImplementedError( - "The 'explicit-comms' config is not yet supported when " - "query-planning is enabled in dask. Please use the legacy " - "dask-dataframe API by setting the following environment " - "variable before executing:", - " DASK_DATAFRAME__QUERY_PLANNING=False", - ) - return args diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 70f12335..600da07d 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -1,8 +1,6 @@ from __future__ import annotations import asyncio -import functools -import inspect from collections import defaultdict from math import ceil from operator import getitem @@ -570,40 +568,49 @@ def _use_explicit_comms() -> bool: return False -def get_rearrange_by_column_wrapper(func): - """Returns a function wrapper that dispatch the shuffle to explicit-comms. +def patch_shuffle_expression() -> None: + """Patch Dasks Shuffle expression. - Notice, this is monkey patched into Dask at dask_cuda import + Notice, this is monkey patched into Dask at dask_cuda + import, and it changes `Shuffle._layer` to lower into + an `ECShuffle` expression when the 'explicit-comms' + config is set to `True`. """ + import dask_expr + + class ECShuffle(dask_expr._shuffle.TaskShuffle): + """Explicit-Comms Shuffle Expression.""" + + def _layer(self): + # Execute an explicit-comms shuffle + if not hasattr(self, "_ec_shuffled"): + on = self.partitioning_index + df = dask_expr._collection.new_collection(self.frame) + self._ec_shuffled = shuffle( + df, + [on] if isinstance(on, str) else on, + self.npartitions_out, + self.ignore_index, + ) + graph = self._ec_shuffled.dask.copy() + shuffled_name = self._ec_shuffled._name + for i in range(self.npartitions_out): + graph[(self._name, i)] = graph[(shuffled_name, i)] + return graph + + _base_lower = dask_expr._shuffle.Shuffle._lower + + def _patched_lower(self): + if self.method in (None, "tasks") and _use_explicit_comms(): + return ECShuffle( + self.frame, + self.partitioning_index, + self.npartitions_out, + self.ignore_index, + self.options, + self.original_partitioning_index, + ) + else: + return _base_lower(self) - func_sig = inspect.signature(func) - - @functools.wraps(func) - def wrapper(*args, **kwargs): - if _use_explicit_comms(): - # Convert `*args, **kwargs` to a dict of `keyword -> values` - kw = func_sig.bind(*args, **kwargs) - kw.apply_defaults() - kw = kw.arguments - # Notice, we only overwrite the default and the "tasks" shuffle - # algorithm. The "disk" and "p2p" algorithm, we don't touch. - if kw["shuffle_method"] in ("tasks", None): - col = kw["col"] - if isinstance(col, str): - col = [col] - return shuffle(kw["df"], col, kw["npartitions"], kw["ignore_index"]) - return func(*args, **kwargs) - - return wrapper - - -def get_default_shuffle_method() -> str: - """Return the default shuffle algorithm used by Dask - - This changes the default shuffle algorithm from "p2p" to "tasks" - when explicit comms is enabled. - """ - ret = dask.config.get("dataframe.shuffle.algorithm", None) - if ret is None and _use_explicit_comms(): - return "tasks" - return dask.utils.get_default_shuffle_method() + dask_expr._shuffle.Shuffle._lower = _patched_lower diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 2806dc1c..2f79251d 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -25,16 +25,6 @@ mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") -QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False - -# Skip these tests when dask-expr is active (for now) -query_planning_skip = pytest.mark.skipif( - QUERY_PLANNING_ON, - reason=( - "The 'explicit-comms' config is not supported " - "when query planning is enabled." - ), -) # Set default shuffle method to "tasks" if dask.config.get("dataframe.shuffle.method", None) is None: @@ -98,7 +88,6 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): pd.testing.assert_frame_equal(got, expected) -@query_planning_skip def test_dataframe_merge_empty_partitions(): # Notice, we use more partitions than rows p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) @@ -250,7 +239,7 @@ def check_shuffle(): ): dask.config.refresh() # Trigger re-read of the environment variables with pytest.raises(ValueError, match="explicit-comms-batchsize"): - ddf.shuffle(on="key", npartitions=4) + ddf.shuffle(on="key", npartitions=4).dask if in_cluster: with LocalCluster( @@ -267,7 +256,6 @@ def check_shuffle(): check_shuffle() -@query_planning_skip @pytest.mark.parametrize("in_cluster", [True, False]) def test_dask_use_explicit_comms(in_cluster): def _timeout(process, function, timeout): @@ -330,7 +318,6 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): assert_eq(got, expected) -@query_planning_skip @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) diff --git a/pyproject.toml b/pyproject.toml index 01c8d956..cfe5397c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,13 +124,6 @@ filterwarnings = [ "error::FutureWarning", # remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed "ignore:There is no current event loop:DeprecationWarning:tornado", - # This warning must be filtered until dask-expr support - # is enabled in both dask-cudf and dask-cuda. - # See: https://github.com/rapidsai/dask-cuda/issues/1311 - "ignore:Dask DataFrame implementation is deprecated:DeprecationWarning", - # Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437 - # When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False` - "ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning", ] [tool.rapids-build-backend]