Skip to content

Commit

Permalink
Avoid "p2p" shuffle as a default when dask_cudf is imported (#15469)
Browse files Browse the repository at this point in the history
I was looking through some dask-related test failures in rapidsai/cuml#5819 and noticed that the "p2p" shuffle is causing some problems when query-planning is enabled. This PR sets the global default to "tasks". It *may* make sense to roll back this change once we fix the underlying problem(s), but I doubt it.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #15469
  • Loading branch information
rjzamora authored Apr 8, 2024
1 parent 7750afc commit 44e0640
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
3 changes: 3 additions & 0 deletions python/dask_cudf/dask_cudf/expr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

# Register custom expressions and collections
if QUERY_PLANNING_ON:
# Broadly avoid "p2p" and "disk" defaults for now
config.set({"dataframe.shuffle.method": "tasks"})

try:
import dask_cudf.expr._collection
import dask_cudf.expr._expr
Expand Down
25 changes: 22 additions & 3 deletions python/dask_cudf/dask_cudf/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
dask_cuda = pytest.importorskip("dask_cuda")


def more_than_two_gpus():
def at_least_n_gpus(n):
ngpus = len(numba.cuda.gpus)
return ngpus >= 2
return ngpus >= n


@pytest.mark.parametrize("delayed", [True, False])
Expand Down Expand Up @@ -54,7 +54,7 @@ def test_merge():


@pytest.mark.skipif(
not more_than_two_gpus(), reason="Machine does not have more than two GPUs"
not at_least_n_gpus(2), reason="Machine does not have two GPUs"
)
def test_ucx_seriesgroupby():
pytest.importorskip("ucp")
Expand Down Expand Up @@ -97,3 +97,22 @@ def test_p2p_shuffle():
ddf.compute().sort_values("x"),
check_index=False,
)


@pytest.mark.skipif(
not at_least_n_gpus(3),
reason="Machine does not have three GPUs",
)
def test_unique():
# Using `"p2p"` can produce dispatching problems
# TODO: Test "p2p" after dask > 2024.4.1 is required
# See: https://github.com/dask/dask/pull/11040
with dask_cuda.LocalCUDACluster(n_workers=3) as cluster:
with Client(cluster):
df = cudf.DataFrame({"x": ["a", "b", "c", "a", "a"]})
ddf = dask_cudf.from_cudf(df, npartitions=2)
dd.assert_eq(
df.x.unique(),
ddf.x.unique().compute(),
check_index=False,
)

0 comments on commit 44e0640

Please sign in to comment.