Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove legacy Dask-cuDF handling #1417

Open
wants to merge 10 commits into
base: branch-25.02
Choose a base branch
from
49 changes: 2 additions & 47 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ set_exit_code() {
trap set_exit_code ERR
set +e

rapids-logger "pytest dask-cuda (dask-expr)"
rapids-logger "pytest dask-cuda"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=True \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
Expand All @@ -65,67 +64,40 @@ timeout 90m pytest \
tests -k "not ucxx"
popd

rapids-logger "pytest explicit-comms (legacy dd)"
pushd dask_cuda
DASK_DATAFRAME__QUERY_PLANNING=False \
DASK_CUDA_TEST_SINGLE_GPU=1 \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
timeout 60m pytest \
-vv \
--durations=50 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \
--cov-config=../pyproject.toml \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \
--cov-report=term \
tests/test_explicit_comms.py -k "not ucxx"
popd

rapids-logger "Run local benchmark (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True \
rapids-logger "Run local benchmark"
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--disable-rmm-pool \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -134,7 +106,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -144,7 +115,6 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

DASK_DATAFRAME__QUERY_PLANNING=True \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--rmm-pool-size 2GiB \
--rmm-maximum-pool-size 4GiB \
Expand All @@ -154,20 +124,5 @@ python dask_cuda/benchmarks/local_cudf_shuffle.py \
--runs 1 \
--backend explicit-comms

rapids-logger "Run local benchmark (legacy dd)"
DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend dask

DASK_DATAFRAME__QUERY_PLANNING=False \
python dask_cuda/benchmarks/local_cudf_shuffle.py \
--partition-size="1 KiB" \
-d 0 \
--runs 1 \
--backend explicit-comms

rapids-logger "Test script exiting with latest error code: $EXITCODE"
exit ${EXITCODE}
34 changes: 9 additions & 25 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,28 @@
import dask.utils
import dask.dataframe.core
import dask.dataframe.shuffle
import dask.dataframe.multi
import dask.bag.core
from .explicit_comms.dataframe.shuffle import patch_shuffle_expression
from dask.dataframe import DASK_EXPR_ENABLED
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.protocol.serialize import dask_deserialize, dask_serialize

from ._version import __git_commit__, __version__
from .cuda_worker import CUDAWorker
from .explicit_comms.dataframe.shuffle import (
get_rearrange_by_column_wrapper,
get_default_shuffle_method,
)

from .local_cuda_cluster import LocalCUDACluster
from .proxify_device_objects import proxify_decorator, unproxify_decorator


if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get(
"explicit-comms", False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the shuffle "
"API directly, or use the legacy dask-dataframe API "
"(set the 'dataframe.query-planning' config to `False`"
"before importing `dask.dataframe`).",
if not DASK_EXPR_ENABLED:
raise ValueError(
"Dask-CUDA no longer supports the legacy Dask DataFrame API. "
"Please set the 'dataframe.query-planning' config to `True` "
"or None, or downgrade RAPIDS to <=24.12."
)


# Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True`
dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper(
dask.dataframe.shuffle.rearrange_by_column
)
# We have to replace all modules that imports Dask's `get_default_shuffle_method()`
# TODO: introduce a shuffle-algorithm dispatcher in Dask so we don't need this hack
dask.dataframe.shuffle.get_default_shuffle_method = get_default_shuffle_method
dask.dataframe.multi.get_default_shuffle_method = get_default_shuffle_method
dask.bag.core.get_default_shuffle_method = get_default_shuffle_method


patch_shuffle_expression()
# Monkey patching Dask to make use of proxify and unproxify in compatibility mode
dask.dataframe.shuffle.shuffle_group = proxify_decorator(
dask.dataframe.shuffle.shuffle_group
Expand Down
1 change: 0 additions & 1 deletion dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ def parse_args():
return parse_benchmark_args(
description="Distributed shuffle (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


Expand Down
1 change: 0 additions & 1 deletion dask_cuda/benchmarks/read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ def parse_args():
args = parse_benchmark_args(
description="Parquet read benchmark",
args_list=special_args,
check_explicit_comms=False,
)
args.no_show_p2p_bandwidth = True
return args
Expand Down
20 changes: 0 additions & 20 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import numpy as np
import pandas as pd

from dask import config
from dask.distributed import Client, SSHCluster
from dask.utils import format_bytes, format_time, parse_bytes
from distributed.comm.addressing import get_address_host
Expand Down Expand Up @@ -52,7 +51,6 @@ def as_noop(dsk):
def parse_benchmark_args(
description="Generic dask-cuda Benchmark",
args_list=[],
check_explicit_comms=True,
):
parser = argparse.ArgumentParser(description=description)
worker_args = parser.add_argument_group(description="Worker configuration")
Expand Down Expand Up @@ -377,24 +375,6 @@ def parse_benchmark_args(
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")

# Raise error early if "explicit-comms" is not allowed
if (
check_explicit_comms
and args.backend == "explicit-comms"
and config.get(
"dataframe.query-planning",
None,
)
is not False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API by setting the following environment "
"variable before executing:",
" DASK_DATAFRAME__QUERY_PLANNING=False",
)

return args


Expand Down
79 changes: 43 additions & 36 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations

import asyncio
import functools
import inspect
from collections import defaultdict
from math import ceil
from operator import getitem
Expand Down Expand Up @@ -570,40 +568,49 @@ def _use_explicit_comms() -> bool:
return False


def get_rearrange_by_column_wrapper(func):
"""Returns a function wrapper that dispatch the shuffle to explicit-comms.
def patch_shuffle_expression() -> None:
"""Patch Dasks Shuffle expression.

Notice, this is monkey patched into Dask at dask_cuda import
Notice, this is monkey patched into Dask at dask_cuda
import, and it changes `Shuffle._layer` to lower into
an `ECShuffle` expression when the 'explicit-comms'
config is set to `True`.
"""
import dask_expr

class ECShuffle(dask_expr._shuffle.TaskShuffle):
"""Explicit-Comms Shuffle Expression."""

def _layer(self):
# Execute an explicit-comms shuffle
if not hasattr(self, "_ec_shuffled"):
on = self.partitioning_index
df = dask_expr._collection.new_collection(self.frame)
self._ec_shuffled = shuffle(
df,
[on] if isinstance(on, str) else on,
self.npartitions_out,
self.ignore_index,
)
graph = self._ec_shuffled.dask.copy()
shuffled_name = self._ec_shuffled._name
for i in range(self.npartitions_out):
graph[(self._name, i)] = graph[(shuffled_name, i)]
return graph

_base_lower = dask_expr._shuffle.Shuffle._lower

def _patched_lower(self):
if self.method in (None, "tasks") and _use_explicit_comms():
return ECShuffle(
self.frame,
self.partitioning_index,
self.npartitions_out,
self.ignore_index,
self.options,
self.original_partitioning_index,
)
else:
return _base_lower(self)

func_sig = inspect.signature(func)

@functools.wraps(func)
def wrapper(*args, **kwargs):
if _use_explicit_comms():
# Convert `*args, **kwargs` to a dict of `keyword -> values`
kw = func_sig.bind(*args, **kwargs)
kw.apply_defaults()
kw = kw.arguments
# Notice, we only overwrite the default and the "tasks" shuffle
# algorithm. The "disk" and "p2p" algorithm, we don't touch.
if kw["shuffle_method"] in ("tasks", None):
col = kw["col"]
if isinstance(col, str):
col = [col]
return shuffle(kw["df"], col, kw["npartitions"], kw["ignore_index"])
return func(*args, **kwargs)

return wrapper


def get_default_shuffle_method() -> str:
"""Return the default shuffle algorithm used by Dask

This changes the default shuffle algorithm from "p2p" to "tasks"
when explicit comms is enabled.
"""
ret = dask.config.get("dataframe.shuffle.algorithm", None)
if ret is None and _use_explicit_comms():
return "tasks"
return dask.utils.get_default_shuffle_method()
dask_expr._shuffle.Shuffle._lower = _patched_lower
15 changes: 1 addition & 14 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@
mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")

QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False

# Skip these tests when dask-expr is active (for now)
query_planning_skip = pytest.mark.skipif(
QUERY_PLANNING_ON,
reason=(
"The 'explicit-comms' config is not supported "
"when query planning is enabled."
),
)

# Set default shuffle method to "tasks"
if dask.config.get("dataframe.shuffle.method", None) is None:
Expand Down Expand Up @@ -98,7 +88,6 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions):
pd.testing.assert_frame_equal(got, expected)


@query_planning_skip
def test_dataframe_merge_empty_partitions():
# Notice, we use more partitions than rows
p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4))
Expand Down Expand Up @@ -250,7 +239,7 @@ def check_shuffle():
):
dask.config.refresh() # Trigger re-read of the environment variables
with pytest.raises(ValueError, match="explicit-comms-batchsize"):
ddf.shuffle(on="key", npartitions=4)
ddf.shuffle(on="key", npartitions=4).dask
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

if in_cluster:
with LocalCluster(
Expand All @@ -267,7 +256,6 @@ def check_shuffle():
check_shuffle()


@query_planning_skip
@pytest.mark.parametrize("in_cluster", [True, False])
def test_dask_use_explicit_comms(in_cluster):
def _timeout(process, function, timeout):
Expand Down Expand Up @@ -330,7 +318,6 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers):
assert_eq(got, expected)


@query_planning_skip
@pytest.mark.parametrize("nworkers", [1, 2, 4])
@pytest.mark.parametrize("backend", ["pandas", "cudf"])
@pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"])
Expand Down
7 changes: 0 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ filterwarnings = [
"error::FutureWarning",
# remove after https://github.com/rapidsai/dask-cuda/issues/1087 is closed
"ignore:There is no current event loop:DeprecationWarning:tornado",
# This warning must be filtered until dask-expr support
# is enabled in both dask-cudf and dask-cuda.
# See: https://github.com/rapidsai/dask-cuda/issues/1311
"ignore:Dask DataFrame implementation is deprecated:DeprecationWarning",
# Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437
# When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False`
"ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning",
]

[tool.rapids-build-backend]
Expand Down
Loading