-
Notifications
You must be signed in to change notification settings - Fork 914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce basic "cudf" backend for Dask Expressions #14805
Changes from 50 commits
0da06d0
3a4a5a0
675964c
b26c270
b6a4cac
a6ee37a
40687bd
04a1b24
b582d16
9cd5c0b
eb9fc88
61fae84
92a36d5
4fd7db4
c3d69f9
ed11879
7b984b2
ef1f82a
dd32b2a
8b1da68
befc090
8850093
bcdd924
78975a4
399f618
c01002b
32d75fe
90defeb
b43e7a2
4df89a0
08e5761
c3f61e8
58a3350
b385950
880e999
fdc59a0
793b56e
838a897
59e9b8b
0aacea6
916a0e4
6a4bbae
9e296ff
3a2beb4
b28b816
c9f522d
68be23c
542f696
a271851
20d6f31
e922d46
2634419
8e8005b
c75e3b6
f417619
5cd5b5e
976144e
13a7d5a
3485237
706c55b
5f1fc47
2046aa8
dd9faf7
146fd7d
6de8946
65dd6ad
d0118ec
1a8eabe
e68d107
faa54fc
569d01f
ccf14fd
1ace543
f7cd393
2091cb0
d741963
0334951
286f60e
6844647
18c6dbf
d8f77e9
8c60154
0d5f6e7
ae44004
5a6b1c2
ed9d2aa
bdf6fdc
4f2fa9f
d9d077f
e51ebcc
8a584c2
15c6b23
3475276
da4422a
a2ad915
f536837
7614926
d3a97c7
daafd09
02547e9
e2ed6ff
f4baec8
ef2fefc
c9087ba
25f8d2b
47392c3
481c655
576a601
026b98b
67f8be9
9db0b5f
f6aa070
4a0763f
be88773
32bfff3
79a32d3
185b506
ac4b9f0
a681fa8
ff4e438
363d203
78be3f9
a330d06
2a3dd01
6d8e7ac
7e57099
cfb67cb
dddd130
4432a4d
3dfd819
738d940
fef0d67
d222235
acaf920
29977b0
fcd4f06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,69 @@ | ||
# Copyright (c) 2018-2023, NVIDIA CORPORATION. | ||
# Copyright (c) 2018-2024, NVIDIA CORPORATION. | ||
|
||
import dask.dataframe as dd | ||
from dask import config | ||
from dask.dataframe import from_delayed | ||
|
||
import cudf | ||
|
||
from . import backends | ||
from ._version import __git_commit__, __version__ | ||
from .core import DataFrame, Series, concat, from_cudf, from_dask_dataframe | ||
from .groupby import groupby_agg | ||
from .io import read_csv, read_json, read_orc, read_text, to_orc | ||
from .core import concat, from_cudf, from_dask_dataframe | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from .expr import DASK_EXPR_ENABLED | ||
|
||
|
||
def read_csv(*args, **kwargs): | ||
with config.set({"dataframe.backend": "cudf"}): | ||
return dd.read_csv(*args, **kwargs) | ||
|
||
|
||
def read_json(*args, **kwargs): | ||
with config.set({"dataframe.backend": "cudf"}): | ||
return dd.read_json(*args, **kwargs) | ||
|
||
|
||
def read_orc(*args, **kwargs): | ||
with config.set({"dataframe.backend": "cudf"}): | ||
return dd.read_orc(*args, **kwargs) | ||
|
||
|
||
def read_parquet(*args, **kwargs): | ||
with config.set({"dataframe.backend": "cudf"}): | ||
return dd.read_parquet(*args, **kwargs) | ||
|
||
|
||
def raise_not_implemented_error(attr_name): | ||
def inner_func(*args, **kwargs): | ||
raise NotImplementedError( | ||
f"Top-level {attr_name} API is not available for dask-expr." | ||
) | ||
|
||
return inner_func | ||
|
||
|
||
if DASK_EXPR_ENABLED: | ||
from .expr._collection import DataFrame, Index, Series | ||
|
||
groupby_agg = raise_not_implemented_error("groupby_agg") | ||
read_text = raise_not_implemented_error("read_text") | ||
to_orc = raise_not_implemented_error("to_orc") | ||
else: | ||
from .core import DataFrame, Index, Series | ||
from .groupby import groupby_agg | ||
from .io import read_text, to_orc | ||
|
||
try: | ||
from .io import read_parquet | ||
except ImportError: | ||
pass | ||
|
||
__all__ = [ | ||
"DataFrame", | ||
"Series", | ||
"Index", | ||
"from_cudf", | ||
"from_dask_dataframe", | ||
"concat", | ||
"from_delayed", | ||
] | ||
|
||
|
||
if not hasattr(cudf.DataFrame, "mean"): | ||
cudf.DataFrame.mean = None | ||
del cudf |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ | |
import pandas as pd | ||
from tlz import partition_all | ||
|
||
from dask import dataframe as dd | ||
from dask import config, dataframe as dd | ||
from dask.base import normalize_token, tokenize | ||
from dask.dataframe.core import ( | ||
Scalar, | ||
|
@@ -690,13 +690,20 @@ def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): | |
"dask_cudf does not support MultiIndex Dataframes." | ||
) | ||
|
||
name = name or ("from_cudf-" + tokenize(data, npartitions or chunksize)) | ||
# Dask-expr doesn't support the `name` argument | ||
name = {} | ||
if not config.get("dataframe.query-planning", False): | ||
name = { | ||
"name": name | ||
or ("from_cudf-" + tokenize(data, npartitions or chunksize)) | ||
} | ||
|
||
return dd.from_pandas( | ||
data, | ||
npartitions=npartitions, | ||
chunksize=chunksize, | ||
sort=sort, | ||
name=name, | ||
**name, | ||
) | ||
|
||
|
||
|
@@ -711,7 +718,9 @@ def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): | |
rather than pandas objects.\n | ||
""" | ||
) | ||
+ textwrap.dedent(dd.from_pandas.__doc__) | ||
# TODO: `dd.from_pandas.__doc__` is empty when | ||
# `DASK_DATAFRAME__QUERY_PLANNING=True` | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
+ textwrap.dedent(dd.from_pandas.__doc__ or "") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly haven't had time to understand this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because the doc-string is currently missing in dask-expr |
||
) | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||
|
||
from dask import config | ||
|
||
DASK_EXPR_ENABLED = False | ||
if config.get("dataframe.query-planning", False): | ||
# Make sure custom expressions and collections are defined | ||
try: | ||
import dask_cudf.expr._collection | ||
import dask_cudf.expr._expr | ||
|
||
DASK_EXPR_ENABLED = True | ||
except ImportError: | ||
# Dask Expressions not installed. | ||
# Dask DataFrame should have already thrown an error | ||
# before we got here. | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||
|
||
from dask_expr import ( | ||
DataFrame as DXDataFrame, | ||
FrameBase, | ||
Index as DXIndex, | ||
Series as DXSeries, | ||
get_collection_type, | ||
) | ||
|
||
from dask import config | ||
|
||
import cudf | ||
|
||
## | ||
## Custom collection classes | ||
## | ||
|
||
|
||
class DataFrame(DXDataFrame): | ||
@classmethod | ||
def from_dict(cls, *args, **kwargs): | ||
with config.set({"dataframe.backend": "cudf"}): | ||
return DXDataFrame.from_dict(*args, **kwargs) | ||
|
||
def groupby( | ||
self, | ||
by, | ||
group_keys=True, | ||
sort=None, | ||
observed=None, | ||
dropna=None, | ||
**kwargs, | ||
): | ||
from dask_cudf.expr._groupby import GroupBy | ||
|
||
if isinstance(by, FrameBase) and not isinstance(by, DXSeries): | ||
raise ValueError( | ||
f"`by` must be a column name or list of columns, got {by}." | ||
) | ||
Comment on lines
+69
to
+72
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question (non-blocking): What are the things that you might otherwise group on that we don't support? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is mostly guarding against a |
||
|
||
return GroupBy( | ||
self, | ||
by, | ||
group_keys=group_keys, | ||
sort=sort, | ||
observed=observed, | ||
dropna=dropna, | ||
**kwargs, | ||
) | ||
|
||
|
||
class Series(DXSeries): | ||
def groupby(self, by, **kwargs): | ||
from dask_cudf.expr._groupby import SeriesGroupBy | ||
|
||
return SeriesGroupBy(self, by, **kwargs) | ||
|
||
|
||
class Index(DXIndex): | ||
pass # Same as pandas (for now) | ||
|
||
|
||
get_collection_type.register(cudf.DataFrame, lambda _: DataFrame) | ||
get_collection_type.register(cudf.Series, lambda _: Series) | ||
get_collection_type.register(cudf.BaseIndex, lambda _: Index) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would consider the logic in this class to be the most "fragile". We are literally patching |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||
|
||
from dask_expr._cumulative import CumulativeBlockwise, TakeLast | ||
|
||
## | ||
## Custom expression patching | ||
## | ||
|
||
|
||
class PatchCumulativeBlockwise(CumulativeBlockwise): | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@property | ||
def _args(self) -> list: | ||
return self.operands[:1] | ||
|
||
@property | ||
def _kwargs(self) -> dict: | ||
# Must pass axis and skipna as kwargs in cudf | ||
return {"axis": self.axis, "skipna": self.skipna} | ||
|
||
|
||
CumulativeBlockwise._args = PatchCumulativeBlockwise._args | ||
CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs | ||
|
||
|
||
def _takelast(a, skipna=True): | ||
if not len(a): | ||
return a | ||
if skipna: | ||
a = a.bfill() | ||
# Cannot use `squeeze` with cudf | ||
return a.tail(n=1).iloc[0] | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
TakeLast.operation = staticmethod(_takelast) | ||
rjzamora marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to use
dask_expr_tip
here to cut down on duplication, but have no idea if something likeWould work here as I'm not the most familiar with
rapids-dependency-file-generator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fine to me. As you know, I'm just deferring to your advice for these kinds of changes :)
Another background note: I'm expecting dask-expr to be copied entirely into dask/dask proper before the "query-planning" default is changed. When that happens, we should be able to remove all this stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - I'm not having much luck with this. Not sure if we can avoid the duplication in this case.