Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove "legacy" Dask DataFrame support from Dask cuDF #17558

Draft
wants to merge 13 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions ci/test_python_other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ EXITCODE=0
trap "EXITCODE=1" ERR
set +e

rapids-logger "pytest dask_cudf (dask-expr)"
DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \
rapids-logger "pytest dask_cudf"
./ci/run_dask_cudf_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \
--numprocesses=8 \
--dist=worksteal \
Expand All @@ -34,13 +34,6 @@ DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cudf-coverage.xml" \
--cov-report=term

rapids-logger "pytest dask_cudf (legacy)"
DASK_DATAFRAME__QUERY_PLANNING=False ./ci/run_dask_cudf_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \
--numprocesses=8 \
--dist=worksteal \
.

rapids-logger "pytest cudf_kafka"
./ci/run_cudf_kafka_pytests.sh \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cudf-kafka.xml"
Expand Down
14 changes: 2 additions & 12 deletions ci/test_wheel_dask_cudf.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,11 @@ RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${RESULTS_DIR}/test-results"}/
mkdir -p "${RAPIDS_TESTS_DIR}"

# Run tests in dask_cudf/tests and dask_cudf/io/tests
rapids-logger "pytest dask_cudf (dask-expr)"
rapids-logger "pytest dask_cudf"
pushd python/dask_cudf/dask_cudf
DASK_DATAFRAME__QUERY_PLANNING=True python -m pytest \
python -m pytest \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf.xml" \
--numprocesses=8 \
--dist=worksteal \
.
popd

# Run tests in dask_cudf/tests and dask_cudf/io/tests (legacy)
rapids-logger "pytest dask_cudf (legacy)"
pushd python/dask_cudf/dask_cudf
DASK_DATAFRAME__QUERY_PLANNING=False python -m pytest \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-legacy.xml" \
--numprocesses=8 \
--dist=worksteal \
.
popd
52 changes: 13 additions & 39 deletions python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
# Copyright (c) 2018-2024, NVIDIA CORPORATION.

import warnings
from importlib import import_module

import dask.dataframe as dd
from dask import config
from dask.dataframe import from_delayed

import cudf

from . import backends # noqa: F401
from . import backends, io # noqa: F401
from ._expr.expr import _patch_dask_expr
from ._version import __git_commit__, __version__ # noqa: F401
from .core import DataFrame, Index, Series, concat, from_cudf
from .core import DataFrame, Index, Series, _deprecated_api, concat, from_cudf

QUERY_PLANNING_ON = dd.DASK_EXPR_ENABLED
if not (QUERY_PLANNING_ON := dd.DASK_EXPR_ENABLED):
raise ValueError(
"The legacy DataFrame API is not supported in dask_cudf>24.12. "
"Please enable query-planning, or downgrade to dask_cudf<=24.12"
Comment on lines +16 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"The legacy DataFrame API is not supported in dask_cudf>24.12. "
"Please enable query-planning, or downgrade to dask_cudf<=24.12"
"The legacy DataFrame API is not supported in dask_cudf>24.12."
"Please enable query-planning, or downgrade to dask_cudf<=24.12"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want a space between the period and the first word of the next sentence?

)


def read_csv(*args, **kwargs):
Expand All @@ -36,46 +38,18 @@ def read_parquet(*args, **kwargs):
return dd.read_parquet(*args, **kwargs)


def _deprecated_api(old_api, new_api=None, rec=None):
def inner_func(*args, **kwargs):
if new_api:
# Use alternative
msg = f"{old_api} is now deprecated. "
msg += rec or f"Please use {new_api} instead."
warnings.warn(msg, FutureWarning)
new_attr = new_api.split(".")
module = import_module(".".join(new_attr[:-1]))
return getattr(module, new_attr[-1])(*args, **kwargs)

# No alternative - raise an error
raise NotImplementedError(
f"{old_api} is no longer supported. " + (rec or "")
)

return inner_func


if QUERY_PLANNING_ON:
from . import io
from ._expr.expr import _patch_dask_expr

groupby_agg = _deprecated_api("dask_cudf.groupby_agg")
read_text = DataFrame.read_text
_patch_dask_expr()

else:
from . import io # noqa: F401
from ._legacy.groupby import groupby_agg # noqa: F401
from ._legacy.io import read_text # noqa: F401


groupby_agg = _deprecated_api("dask_cudf.groupby_agg")
read_text = DataFrame.read_text
to_orc = _deprecated_api(
"dask_cudf.to_orc",
new_api="dask_cudf._legacy.io.to_orc",
rec="Please use DataFrame.to_orc instead.",
)


_patch_dask_expr()


__all__ = [
"DataFrame",
"Index",
Expand Down
18 changes: 3 additions & 15 deletions python/dask_cudf/dask_cudf/_expr/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@

import cudf

_LEGACY_WORKAROUND = (
"To enable the 'legacy' dask-cudf API, set the "
"global 'dataframe.query-planning' config to "
"`False` before dask is imported. This can also "
"be done by setting an environment variable: "
"`DASK_DATAFRAME__QUERY_PLANNING=False` "
)


##
## Custom collection classes
##
Expand Down Expand Up @@ -103,9 +94,8 @@ def set_index(
divisions = None
warnings.warn(
"Ignoring divisions='quantile'. This option is now "
"deprecated. Please use the legacy API and raise an "
"issue on github if this feature is necessary."
f"\n{_LEGACY_WORKAROUND}",
"deprecated. Please raise an issue on github if this "
"feature is necessary.",
FutureWarning,
)

Expand Down Expand Up @@ -135,9 +125,7 @@ def groupby(

if kwargs.pop("as_index") is not True:
raise NotImplementedError(
f"{msg} Please reset the index after aggregating, or "
"use the legacy API if `as_index=False` is required.\n"
f"{_LEGACY_WORKAROUND}"
f"{msg} Please reset the index after aggregating."
)
else:
warnings.warn(msg, FutureWarning)
Expand Down
126 changes: 21 additions & 105 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from packaging.version import Version
from pandas.api.types import is_scalar

import dask.dataframe as dd
from dask import config
from dask.array.dispatch import percentile_lookup
from dask.dataframe.backends import (
Expand All @@ -28,6 +27,7 @@
hash_object_dispatch,
is_categorical_dtype_dispatch,
make_meta_dispatch,
partd_encode_dispatch,
pyarrow_schema_dispatch,
to_pyarrow_table_dispatch,
tolist_dispatch,
Expand Down Expand Up @@ -464,28 +464,21 @@ def sizeof_cudf_series_index(obj):
return obj.memory_usage()


# TODO: Remove try/except when cudf is pinned to dask>=2023.10.0
try:
from dask.dataframe.dispatch import partd_encode_dispatch
@partd_encode_dispatch.register(cudf.DataFrame)
def _simple_cudf_encode(_):
# Basic pickle-based encoding for a partd k-v store
import pickle

@partd_encode_dispatch.register(cudf.DataFrame)
def _simple_cudf_encode(_):
# Basic pickle-based encoding for a partd k-v store
import pickle
import partd

import partd

def join(dfs):
if not dfs:
return cudf.DataFrame()
else:
return cudf.concat(dfs)

dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
return partial(partd.Encode, dumps, pickle.loads, join)
def join(dfs):
if not dfs:
return cudf.DataFrame()
else:
return cudf.concat(dfs)

except ImportError:
pass
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
return partial(partd.Encode, dumps, pickle.loads, join)


def _default_backend(func, *args, **kwargs):
Expand Down Expand Up @@ -557,105 +550,28 @@ def to_cudf_dispatch_from_cudf(data, **kwargs):
return data


# Define "cudf" backend engine to be registered with Dask
class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for Dask-DataFrame
# Define the "cudf" backend for "legacy" Dask DataFrame
class LegacyCudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for legacy Dask-DataFrame

This class is registered under the name "cudf" for the
``dask.dataframe.backends`` entrypoint in ``setup.cfg``.
Dask-DataFrame will use the methods defined in this class
in place of ``dask.dataframe.<creation-method>`` when the
"dataframe.backend" configuration is set to "cudf":

Examples
--------
>>> import dask
>>> import dask.dataframe as dd
>>> with dask.config.set({"dataframe.backend": "cudf"}):
... ddf = dd.from_dict({"a": range(10)})
>>> type(ddf)
<class 'dask_cudf._legacy.core.DataFrame'>
``dask.dataframe.backends`` entrypoint in ``pyproject.toml``.
This "legacy" backend is only used for CSV support.
"""

@classmethod
def to_backend_dispatch(cls):
return to_cudf_dispatch

@classmethod
def to_backend(cls, data: dd.core._Frame, **kwargs):
if isinstance(data._meta, (cudf.DataFrame, cudf.Series, cudf.Index)):
# Already a cudf-backed collection
_unsupported_kwargs("cudf", "cudf", kwargs)
return data
return data.map_partitions(cls.to_backend_dispatch(), **kwargs)

@staticmethod
def from_dict(
data,
npartitions,
orient="columns",
dtype=None,
columns=None,
constructor=cudf.DataFrame,
):
return _default_backend(
dd.from_dict,
data,
npartitions=npartitions,
orient=orient,
dtype=dtype,
columns=columns,
constructor=constructor,
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf._legacy.io.parquet import CudfEngine

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dd.read_parquet,
*args,
engine=CudfEngine,
**kwargs,
)

@staticmethod
def read_json(*args, **kwargs):
from dask_cudf._legacy.io.json import read_json

return read_json(*args, **kwargs)

@staticmethod
def read_orc(*args, **kwargs):
from dask_cudf._legacy.io import read_orc

return read_orc(*args, **kwargs)

@staticmethod
def read_csv(*args, **kwargs):
from dask_cudf._legacy.io import read_csv

return read_csv(*args, **kwargs)

@staticmethod
def read_hdf(*args, **kwargs):
# HDF5 reader not yet implemented in cudf
warnings.warn(
"read_hdf is not yet implemented in cudf/dask_cudf. "
"Moving to cudf from pandas. Expect poor performance!"
)
return _default_backend(dd.read_hdf, *args, **kwargs).to_backend(
"cudf"
)


# Define "cudf" backend entrypoint for dask-expr
class CudfDXBackendEntrypoint(DataFrameBackendEntrypoint):
# Define the "cudf" backend for expr-based Dask DataFrame
class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
"""Backend-entrypoint class for Dask-Expressions

This class is registered under the name "cudf" for the
``dask-expr.dataframe.backends`` entrypoint in ``setup.cfg``.
``dask_expr.dataframe.backends`` entrypoint in ``pyproject.toml``.
Dask-DataFrame will use the methods defined in this class
in place of ``dask_expr.<creation-method>`` when the
"dataframe.backend" configuration is set to "cudf":
Expand Down
Loading
Loading