-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix dask_cudf.read_csv
#17612
Fix dask_cudf.read_csv
#17612
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE: This code was translated from the "legacy" code in |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,8 +1,193 @@ | ||||||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||||||
|
||||||
from dask_cudf import _deprecated_api | ||||||
import os | ||||||
from glob import glob | ||||||
from warnings import warn | ||||||
|
||||||
read_csv = _deprecated_api( | ||||||
"dask_cudf.io.csv.read_csv", | ||||||
new_api="dask_cudf.read_csv", | ||||||
) | ||||||
from fsspec.utils import infer_compression | ||||||
|
||||||
from dask import dataframe as dd | ||||||
from dask.dataframe.io.csv import make_reader | ||||||
from dask.utils import parse_bytes | ||||||
|
||||||
import cudf | ||||||
|
||||||
|
||||||
def read_csv(path, blocksize="default", **kwargs): | ||||||
""" | ||||||
Read CSV files into a :class:`.DataFrame`. | ||||||
|
||||||
This API parallelizes the :func:`cudf:cudf.read_csv` function in | ||||||
the following ways: | ||||||
|
||||||
It supports loading many files at once using globstrings: | ||||||
|
||||||
>>> import dask_cudf | ||||||
>>> df = dask_cudf.read_csv("myfiles.*.csv") | ||||||
|
||||||
In some cases it can break up large files: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What cases are those? Or is it always dependent upon the file size and the value of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I didn't spend any time reviewing these doc-strings, because they were directly copied from |
||||||
|
||||||
>>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB") | ||||||
|
||||||
It can read CSV files from external resources (e.g. S3, HTTP, FTP) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
>>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv") | ||||||
>>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv") | ||||||
|
||||||
Internally ``read_csv`` uses :func:`cudf:cudf.read_csv` and | ||||||
supports many of the same keyword arguments with the same | ||||||
performance guarantees. See the docstring for | ||||||
:func:`cudf:cudf.read_csv` for more information on available | ||||||
keyword arguments. | ||||||
|
||||||
Parameters | ||||||
---------- | ||||||
path : str, path object, or file-like object | ||||||
Either a path to a file (a str, :py:class:`pathlib.Path`, or | ||||||
py._path.local.LocalPath), URL (including http, ftp, and S3 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it could also just be :py:class: |
||||||
locations), or any object with a read() method (such as | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
builtin :py:func:`open` file handler function or | ||||||
:py:class:`~io.StringIO`). | ||||||
blocksize : int or str, default "256 MiB" | ||||||
The target task partition size. If ``None``, a single block | ||||||
is used for each file. | ||||||
**kwargs : dict | ||||||
Passthrough key-word arguments that are sent to | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
:func:`cudf:cudf.read_csv`. | ||||||
|
||||||
Notes | ||||||
----- | ||||||
If any of `skipfooter`/`skiprows`/`nrows` are passed, | ||||||
`blocksize` will default to None. | ||||||
|
||||||
Examples | ||||||
-------- | ||||||
>>> import dask_cudf | ||||||
>>> ddf = dask_cudf.read_csv("sample.csv", usecols=["a", "b"]) | ||||||
>>> ddf.compute() | ||||||
a b | ||||||
0 1 hi | ||||||
1 2 hello | ||||||
2 3 ai | ||||||
|
||||||
""" | ||||||
# Set default `blocksize` | ||||||
if blocksize == "default": | ||||||
if ( | ||||||
kwargs.get("skipfooter", 0) != 0 | ||||||
or kwargs.get("skiprows", 0) != 0 | ||||||
or kwargs.get("nrows", None) is not None | ||||||
): | ||||||
# Cannot read in blocks if skipfooter, | ||||||
# skiprows or nrows is passed. | ||||||
blocksize = None | ||||||
else: | ||||||
blocksize = "256 MiB" | ||||||
|
||||||
if "://" in str(path): | ||||||
func = make_reader(cudf.read_csv, "read_csv", "CSV") | ||||||
return func(path, blocksize=blocksize, **kwargs) | ||||||
else: | ||||||
return _internal_read_csv(path=path, blocksize=blocksize, **kwargs) | ||||||
|
||||||
|
||||||
def _internal_read_csv(path, blocksize="256 MiB", **kwargs): | ||||||
if isinstance(blocksize, str): | ||||||
blocksize = parse_bytes(blocksize) | ||||||
|
||||||
if isinstance(path, list): | ||||||
filenames = path | ||||||
elif isinstance(path, str): | ||||||
filenames = sorted(glob(path)) | ||||||
elif hasattr(path, "__fspath__"): | ||||||
filenames = sorted(glob(path.__fspath__())) | ||||||
else: | ||||||
raise TypeError(f"Path type not understood:{type(path)}") | ||||||
|
||||||
if not filenames: | ||||||
msg = f"A file in: {filenames} does not exist." | ||||||
raise FileNotFoundError(msg) | ||||||
|
||||||
compression = kwargs.get("compression", "infer") | ||||||
|
||||||
if compression == "infer": | ||||||
# Infer compression from first path by default | ||||||
compression = infer_compression(filenames[0]) | ||||||
|
||||||
if compression and blocksize: | ||||||
# compressed CSVs reading must read the entire file | ||||||
kwargs.pop("byte_range", None) | ||||||
warn( | ||||||
"Warning %s compression does not support breaking apart files\n" | ||||||
"Please ensure that each individual file can fit in memory and\n" | ||||||
"use the keyword ``blocksize=None to remove this message``\n" | ||||||
"Setting ``blocksize=(size of file)``" % compression | ||||||
) | ||||||
blocksize = None | ||||||
|
||||||
if blocksize is None: | ||||||
return read_csv_without_blocksize(path, **kwargs) | ||||||
|
||||||
# Let dask.dataframe generate meta | ||||||
dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV") | ||||||
kwargs1 = kwargs.copy() | ||||||
usecols = kwargs1.pop("usecols", None) | ||||||
dtype = kwargs1.pop("dtype", None) | ||||||
meta = dask_reader(filenames[0], **kwargs1)._meta | ||||||
names = meta.columns | ||||||
if usecols or dtype: | ||||||
# Regenerate meta with original kwargs if | ||||||
# `usecols` or `dtype` was specified | ||||||
meta = dask_reader(filenames[0], **kwargs)._meta | ||||||
|
||||||
i = 0 | ||||||
path_list = [] | ||||||
kwargs_list = [] | ||||||
for fn in filenames: | ||||||
size = os.path.getsize(fn) | ||||||
for start in range(0, size, blocksize): | ||||||
kwargs2 = kwargs.copy() | ||||||
kwargs2["byte_range"] = ( | ||||||
start, | ||||||
blocksize, | ||||||
) # specify which chunk of the file we care about | ||||||
if start != 0: | ||||||
kwargs2["names"] = names # no header in the middle of the file | ||||||
kwargs2["header"] = None | ||||||
path_list.append(fn) | ||||||
kwargs_list.append(kwargs2) | ||||||
i += 1 | ||||||
|
||||||
return dd.from_map(_read_csv, path_list, kwargs_list, meta=meta) | ||||||
|
||||||
|
||||||
def _read_csv(fn, kwargs): | ||||||
return cudf.read_csv(fn, **kwargs) | ||||||
|
||||||
|
||||||
def read_csv_without_blocksize(path, **kwargs): | ||||||
"""Read entire CSV with optional compression (gzip/zip) | ||||||
|
||||||
Parameters | ||||||
---------- | ||||||
path : str | ||||||
path to files (support for glob) | ||||||
""" | ||||||
if isinstance(path, list): | ||||||
filenames = path | ||||||
elif isinstance(path, str): | ||||||
filenames = sorted(glob(path)) | ||||||
elif hasattr(path, "__fspath__"): | ||||||
filenames = sorted(glob(path.__fspath__())) | ||||||
else: | ||||||
raise TypeError(f"Path type not understood:{type(path)}") | ||||||
|
||||||
meta_kwargs = kwargs.copy() | ||||||
if "skipfooter" in meta_kwargs: | ||||||
meta_kwargs.pop("skipfooter") | ||||||
if "nrows" in meta_kwargs: | ||||||
meta_kwargs.pop("nrows") | ||||||
# Read "head" of first file (first 5 rows). | ||||||
# Convert to empty df for metadata. | ||||||
meta = cudf.read_csv(filenames[0], nrows=5, **meta_kwargs).iloc[:0] | ||||||
return dd.from_map(cudf.read_csv, filenames, meta=meta, **kwargs) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,11 +185,6 @@ def test_read_csv_blocksize_none(tmp_path, compression, size): | |
df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ) | ||
dd.assert_eq(df, df2) | ||
|
||
# Test chunksize deprecation | ||
with pytest.warns(FutureWarning, match="deprecated"): | ||
df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ) | ||
dd.assert_eq(df, df3) | ||
Comment on lines
-188
to
-191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was deprecated a long time ago, the new code path doesn't try to catch this anymore. |
||
|
||
|
||
@pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None]) | ||
def test_csv_reader_usecols(tmp_path, dtype): | ||
|
@@ -275,7 +270,3 @@ def test_deprecated_api_paths(tmp_path): | |
with pytest.warns(match="dask_cudf.io.read_csv is now deprecated"): | ||
df2 = dask_cudf.io.read_csv(csv_path) | ||
dd.assert_eq(df, df2, check_divisions=False) | ||
|
||
with pytest.warns(match="dask_cudf.io.csv.read_csv is now deprecated"): | ||
df2 = dask_cudf.io.csv.read_csv(csv_path) | ||
dd.assert_eq(df, df2, check_divisions=False) | ||
Comment on lines
-279
to
-281
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new csv code now lives at this "expected" path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense for us to do this?
rapids-dask-dependency
should always pick the latest Dask, no?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "when cudf is pinned", I mean "when rapids-dask-dependency is pinned". Is that your question?
rapids-dask-dependency
currently pins to>=2024.11.2
. This means another package with adask<2024.12.0
requirement can still give usdask-2024.11.2
in practice, no?