Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove "legacy" Dask DataFrame support from Dask cuDF #17558

Draft
wants to merge 13 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions ci/test_python_other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ EXITCODE=0
trap "EXITCODE=1" ERR
set +e

rapids-logger "pytest dask_cudf (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \
rapids-logger "pytest dask_cudf"
./ci/run_dask_cudf_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \
--numprocesses=8 \
--dist=worksteal \
Expand All @@ -34,13 +34,6 @@ DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cudf-coverage.xml" \
--cov-report=term

rapids-logger "pytest dask_cudf (legacy)"
DASK_DATAFRAME__QUERY_PLANNING=False ./ci/run_dask_cudf_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \
--numprocesses=8 \
--dist=worksteal \
.

rapids-logger "pytest cudf_kafka"
./ci/run_cudf_kafka_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cudf-kafka.xml"
Expand Down
14 changes: 2 additions & 12 deletions ci/test_wheel_dask_cudf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,11 @@ RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${RESULTS_DIR}/test-results"}/
mkdir -p "${RAPIDS_TESTS_DIR}"

# Run tests in dask_cudf/tests and dask_cudf/io/tests
rapids-logger "pytest dask_cudf (dask-expr)"
rapids-logger "pytest dask_cudf"
pushd python/dask_cudf/dask_cudf
DASK_DATAFRAME__QUERY_PLANNING=True python -m pytest \
python -m pytest \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \
--numprocesses=8 \
--dist=worksteal \
.
popd

# Run tests in dask_cudf/tests and dask_cudf/io/tests (legacy)
rapids-logger "pytest dask_cudf (legacy)"
pushd python/dask_cudf/dask_cudf
DASK_DATAFRAME__QUERY_PLANNING=False python -m pytest \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \
--numprocesses=8 \
--dist=worksteal \
.
popd
54 changes: 14 additions & 40 deletions python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
# Copyright (c) 2018-2024, NVIDIA CORPORATION.

import warnings
from importlib import import_module

import dask.dataframe as dd
from dask import config
from dask.dataframe import from_delayed

import cudf

from . import backends # noqa: F401
from . import backends, io # noqa: F401
from ._expr.expr import _patch_dask_expr
from ._version import __git_commit__, __version__ # noqa: F401
from .core import DataFrame, Index, Series, concat, from_cudf
from .core import DataFrame, Index, Series, _deprecated_api, concat, from_cudf

QUERY_PLANNING_ON = dd.DASK_EXPR_ENABLED
if not (QUERY_PLANNING_ON := dd.DASK_EXPR_ENABLED):
raise ValueError(
"The legacy DataFrame API is not supported in dask_cudf>24.12. "
"Please enable query-planning, or downgrade to dask_cudf<=24.12"
Comment on lines +16 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The legacy DataFrame API is not supported in dask_cudf>24.12. "
"Please enable query-planning, or downgrade to dask_cudf<=24.12"
"The legacy DataFrame API is not supported in dask_cudf>24.12."
"Please enable query-planning, or downgrade to dask_cudf<=24.12"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want a space between the period and the first word of the next sentence?

)


def read_csv(*args, **kwargs):
Expand All @@ -36,46 +38,18 @@ def read_parquet(*args, **kwargs):
return dd.read_parquet(*args, **kwargs)


def _deprecated_api(old_api, new_api=None, rec=None):
def inner_func(*args, **kwargs):
if new_api:
# Use alternative
msg = f"{old_api} is now deprecated. "
msg += rec or f"Please use {new_api} instead."
warnings.warn(msg, FutureWarning)
new_attr = new_api.split(".")
module = import_module(".".join(new_attr[:-1]))
return getattr(module, new_attr[-1])(*args, **kwargs)

# No alternative - raise an error
raise NotImplementedError(
f"{old_api} is no longer supported. " + (rec or "")
)

return inner_func


if QUERY_PLANNING_ON:
from . import io
from ._expr.expr import _patch_dask_expr

groupby_agg = _deprecated_api("dask_cudf.groupby_agg")
read_text = DataFrame.read_text
_patch_dask_expr()

else:
from . import io # noqa: F401
from ._legacy.groupby import groupby_agg # noqa: F401
from ._legacy.io import read_text # noqa: F401


groupby_agg = _deprecated_api("dask_cudf.groupby_agg")
read_text = DataFrame.read_text
to_orc = _deprecated_api(
"dask_cudf.to_orc",
new_api="dask_cudf._legacy.io.to_orc",
new_api="dask_cudf.io.to_orc",
rec="Please use DataFrame.to_orc instead.",
)


_patch_dask_expr()


__all__ = [
"DataFrame",
"Index",
Expand Down
26 changes: 7 additions & 19 deletions python/dask_cudf/dask_cudf/_expr/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@

import cudf

_LEGACY_WORKAROUND = (
"To enable the 'legacy' dask-cudf API, set the "
"global 'dataframe.query-planning' config to "
"`False` before dask is imported. This can also "
"be done by setting an environment variable: "
"`DASK_DATAFRAME__QUERY_PLANNING=False` "
)


##
## Custom collection classes
##
Expand Down Expand Up @@ -103,9 +94,8 @@ def set_index(
divisions = None
warnings.warn(
"Ignoring divisions='quantile'. This option is now "
"deprecated. Please use the legacy API and raise an "
"issue on github if this feature is necessary."
f"\n{_LEGACY_WORKAROUND}",
"deprecated. Please raise an issue on github if this "
"feature is necessary.",
FutureWarning,
)

Expand Down Expand Up @@ -135,9 +125,7 @@ def groupby(

if kwargs.pop("as_index") is not True:
raise NotImplementedError(
f"{msg} Please reset the index after aggregating, or "
"use the legacy API if `as_index=False` is required.\n"
f"{_LEGACY_WORKAROUND}"
f"{msg} Please reset the index after aggregating."
)
else:
warnings.warn(msg, FutureWarning)
Expand All @@ -153,15 +141,15 @@ def groupby(
)

def to_orc(self, *args, **kwargs):
from dask_cudf._legacy.io import to_orc
from dask_cudf.io.orc import to_orc as to_orc_impl

return to_orc(self, *args, **kwargs)
return to_orc_impl(self, *args, **kwargs)

@staticmethod
def read_text(*args, **kwargs):
from dask_cudf._legacy.io.text import read_text as legacy_read_text
from dask_cudf.io.text import read_text as read_text_impl

return legacy_read_text(*args, **kwargs)
return read_text_impl(*args, **kwargs)

def clip(self, lower=None, upper=None, axis=1):
if axis not in (None, 1):
Expand Down
Loading
Loading