Skip to content

Commit

Permalink
Fix dask_cudf.read_csv (#17612)
Browse files Browse the repository at this point in the history
Recent changes in dask and dask-expr have broken `dask_cudf.read_csv` (dask/dask-expr#1178, dask/dask#11603). Fortunately, the breaking changes help us avoid legacy CSV code in the long run.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #17612
  • Loading branch information
rjzamora authored Dec 17, 2024
1 parent becfacc commit 0058b52
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 25 deletions.
36 changes: 25 additions & 11 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,21 +714,35 @@ def read_csv(
storage_options=None,
**kwargs,
):
import dask_expr as dx
from fsspec.utils import stringify_path
try:
# TODO: Remove when cudf is pinned to dask>2024.12.0
import dask_expr as dx
from dask_expr.io.csv import ReadCSV
from fsspec.utils import stringify_path

if not isinstance(path, str):
path = stringify_path(path)
return dx.new_collection(
ReadCSV(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
dataframe_backend="cudf",
)
)
except ImportError:
# Requires dask>2024.12.0
from dask_cudf.io.csv import read_csv

if not isinstance(path, str):
path = stringify_path(path)
return dx.new_collection(
dx.io.csv.ReadCSV(
return read_csv(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
*args,
header=header,
dataframe_backend="cudf",
storage_options=storage_options,
**kwargs,
)
)

@staticmethod
def read_json(*args, **kwargs):
Expand Down
195 changes: 190 additions & 5 deletions python/dask_cudf/dask_cudf/io/csv.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,193 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from dask_cudf import _deprecated_api
import os
from glob import glob
from warnings import warn

read_csv = _deprecated_api(
"dask_cudf.io.csv.read_csv",
new_api="dask_cudf.read_csv",
)
from fsspec.utils import infer_compression

from dask import dataframe as dd
from dask.dataframe.io.csv import make_reader
from dask.utils import parse_bytes

import cudf


def read_csv(path, blocksize="default", **kwargs):
"""
Read CSV files into a :class:`.DataFrame`.
This API parallelizes the :func:`cudf:cudf.read_csv` function in
the following ways:
It supports loading many files at once using globstrings:
>>> import dask_cudf
>>> df = dask_cudf.read_csv("myfiles.*.csv")
In some cases it can break up large files:
>>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB")
It can read CSV files from external resources (e.g. S3, HTTP, FTP)
>>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv")
>>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv")
Internally ``read_csv`` uses :func:`cudf:cudf.read_csv` and
supports many of the same keyword arguments with the same
performance guarantees. See the docstring for
:func:`cudf:cudf.read_csv` for more information on available
keyword arguments.
Parameters
----------
path : str, path object, or file-like object
Either a path to a file (a str, :py:class:`pathlib.Path`, or
py._path.local.LocalPath), URL (including http, ftp, and S3
locations), or any object with a read() method (such as
builtin :py:func:`open` file handler function or
:py:class:`~io.StringIO`).
blocksize : int or str, default "256 MiB"
The target task partition size. If ``None``, a single block
is used for each file.
**kwargs : dict
Passthrough key-word arguments that are sent to
:func:`cudf:cudf.read_csv`.
Notes
-----
If any of `skipfooter`/`skiprows`/`nrows` are passed,
`blocksize` will default to None.
Examples
--------
>>> import dask_cudf
>>> ddf = dask_cudf.read_csv("sample.csv", usecols=["a", "b"])
>>> ddf.compute()
a b
0 1 hi
1 2 hello
2 3 ai
"""
# Set default `blocksize`
if blocksize == "default":
if (
kwargs.get("skipfooter", 0) != 0
or kwargs.get("skiprows", 0) != 0
or kwargs.get("nrows", None) is not None
):
# Cannot read in blocks if skipfooter,
# skiprows or nrows is passed.
blocksize = None
else:
blocksize = "256 MiB"

if "://" in str(path):
func = make_reader(cudf.read_csv, "read_csv", "CSV")
return func(path, blocksize=blocksize, **kwargs)
else:
return _internal_read_csv(path=path, blocksize=blocksize, **kwargs)


def _internal_read_csv(path, blocksize="256 MiB", **kwargs):
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)

if isinstance(path, list):
filenames = path
elif isinstance(path, str):
filenames = sorted(glob(path))
elif hasattr(path, "__fspath__"):
filenames = sorted(glob(path.__fspath__()))
else:
raise TypeError(f"Path type not understood:{type(path)}")

if not filenames:
msg = f"A file in: {filenames} does not exist."
raise FileNotFoundError(msg)

compression = kwargs.get("compression", "infer")

if compression == "infer":
# Infer compression from first path by default
compression = infer_compression(filenames[0])

if compression and blocksize:
# compressed CSVs reading must read the entire file
kwargs.pop("byte_range", None)
warn(
"Warning %s compression does not support breaking apart files\n"
"Please ensure that each individual file can fit in memory and\n"
"use the keyword ``blocksize=None to remove this message``\n"
"Setting ``blocksize=(size of file)``" % compression
)
blocksize = None

if blocksize is None:
return read_csv_without_blocksize(path, **kwargs)

# Let dask.dataframe generate meta
dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV")
kwargs1 = kwargs.copy()
usecols = kwargs1.pop("usecols", None)
dtype = kwargs1.pop("dtype", None)
meta = dask_reader(filenames[0], **kwargs1)._meta
names = meta.columns
if usecols or dtype:
# Regenerate meta with original kwargs if
# `usecols` or `dtype` was specified
meta = dask_reader(filenames[0], **kwargs)._meta

i = 0
path_list = []
kwargs_list = []
for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, blocksize):
kwargs2 = kwargs.copy()
kwargs2["byte_range"] = (
start,
blocksize,
) # specify which chunk of the file we care about
if start != 0:
kwargs2["names"] = names # no header in the middle of the file
kwargs2["header"] = None
path_list.append(fn)
kwargs_list.append(kwargs2)
i += 1

return dd.from_map(_read_csv, path_list, kwargs_list, meta=meta)


def _read_csv(fn, kwargs):
return cudf.read_csv(fn, **kwargs)


def read_csv_without_blocksize(path, **kwargs):
"""Read entire CSV with optional compression (gzip/zip)
Parameters
----------
path : str
path to files (support for glob)
"""
if isinstance(path, list):
filenames = path
elif isinstance(path, str):
filenames = sorted(glob(path))
elif hasattr(path, "__fspath__"):
filenames = sorted(glob(path.__fspath__()))
else:
raise TypeError(f"Path type not understood:{type(path)}")

meta_kwargs = kwargs.copy()
if "skipfooter" in meta_kwargs:
meta_kwargs.pop("skipfooter")
if "nrows" in meta_kwargs:
meta_kwargs.pop("nrows")
# Read "head" of first file (first 5 rows).
# Convert to empty df for metadata.
meta = cudf.read_csv(filenames[0], nrows=5, **meta_kwargs).iloc[:0]
return dd.from_map(cudf.read_csv, filenames, meta=meta, **kwargs)
9 changes: 0 additions & 9 deletions python/dask_cudf/dask_cudf/io/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ def test_read_csv_blocksize_none(tmp_path, compression, size):
df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ)
dd.assert_eq(df, df2)

# Test chunksize deprecation
with pytest.warns(FutureWarning, match="deprecated"):
df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ)
dd.assert_eq(df, df3)


@pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None])
def test_csv_reader_usecols(tmp_path, dtype):
Expand Down Expand Up @@ -275,7 +270,3 @@ def test_deprecated_api_paths(tmp_path):
with pytest.warns(match="dask_cudf.io.read_csv is now deprecated"):
df2 = dask_cudf.io.read_csv(csv_path)
dd.assert_eq(df, df2, check_divisions=False)

with pytest.warns(match="dask_cudf.io.csv.read_csv is now deprecated"):
df2 = dask_cudf.io.csv.read_csv(csv_path)
dd.assert_eq(df, df2, check_divisions=False)

0 comments on commit 0058b52

Please sign in to comment.