From 0cc93e54be52e50c57ef5a2427195d0f7306fcc1 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Dec 2024 08:43:59 -0800 Subject: [PATCH 1/2] address breaking changes upstream --- python/dask_cudf/dask_cudf/backends.py | 36 ++- python/dask_cudf/dask_cudf/io/csv.py | 207 +++++++++++++++++- .../dask_cudf/dask_cudf/io/tests/test_csv.py | 4 - 3 files changed, 227 insertions(+), 20 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 962a229a839..fceaaf185e8 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -714,21 +714,35 @@ def read_csv( storage_options=None, **kwargs, ): - import dask_expr as dx - from fsspec.utils import stringify_path + try: + # TODO: Remove when cudf is pinned to dask>2024.12.0 + import dask_expr as dx + from dask_expr.io.csv import ReadCSV + from fsspec.utils import stringify_path + + if not isinstance(path, str): + path = stringify_path(path) + return dx.new_collection( + ReadCSV( + path, + dtype_backend=dtype_backend, + storage_options=storage_options, + kwargs=kwargs, + header=header, + dataframe_backend="cudf", + ) + ) + except ImportError: + # Requires dask>2024.12.0 + from dask_cudf.io.csv import read_csv - if not isinstance(path, str): - path = stringify_path(path) - return dx.new_collection( - dx.io.csv.ReadCSV( + return read_csv( path, - dtype_backend=dtype_backend, - storage_options=storage_options, - kwargs=kwargs, + *args, header=header, - dataframe_backend="cudf", + storage_options=storage_options, + **kwargs, ) - ) @staticmethod def read_json(*args, **kwargs): diff --git a/python/dask_cudf/dask_cudf/io/csv.py b/python/dask_cudf/dask_cudf/io/csv.py index b22b31a591f..b0936c2ec90 100644 --- a/python/dask_cudf/dask_cudf/io/csv.py +++ b/python/dask_cudf/dask_cudf/io/csv.py @@ -1,8 +1,205 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from dask_cudf import _deprecated_api +import os +from glob import glob +from warnings import warn -read_csv = _deprecated_api( - "dask_cudf.io.csv.read_csv", - new_api="dask_cudf.read_csv", -) +from fsspec.utils import infer_compression + +from dask import dataframe as dd +from dask.dataframe.io.csv import make_reader +from dask.utils import parse_bytes + +import cudf + + +def read_csv(path, blocksize="default", **kwargs): + """ + Read CSV files into a :class:`.DataFrame`. + + This API parallelizes the :func:`cudf:cudf.read_csv` function in + the following ways: + + It supports loading many files at once using globstrings: + + >>> import dask_cudf + >>> df = dask_cudf.read_csv("myfiles.*.csv") + + In some cases it can break up large files: + + >>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB") + + It can read CSV files from external resources (e.g. S3, HTTP, FTP) + + >>> df = dask_cudf.read_csv("s3://bucket/myfiles.*.csv") + >>> df = dask_cudf.read_csv("https://www.mycloud.com/sample.csv") + + Internally ``read_csv`` uses :func:`cudf:cudf.read_csv` and + supports many of the same keyword arguments with the same + performance guarantees. See the docstring for + :func:`cudf:cudf.read_csv` for more information on available + keyword arguments. + + Parameters + ---------- + path : str, path object, or file-like object + Either a path to a file (a str, :py:class:`pathlib.Path`, or + py._path.local.LocalPath), URL (including http, ftp, and S3 + locations), or any object with a read() method (such as + builtin :py:func:`open` file handler function or + :py:class:`~io.StringIO`). + blocksize : int or str, default "256 MiB" + The target task partition size. If ``None``, a single block + is used for each file. + **kwargs : dict + Passthrough key-word arguments that are sent to + :func:`cudf:cudf.read_csv`. + + Notes + ----- + If any of `skipfooter`/`skiprows`/`nrows` are passed, + `blocksize` will default to None. + + Examples + -------- + >>> import dask_cudf + >>> ddf = dask_cudf.read_csv("sample.csv", usecols=["a", "b"]) + >>> ddf.compute() + a b + 0 1 hi + 1 2 hello + 2 3 ai + + """ + + # Handle `chunksize` deprecation + if "chunksize" in kwargs: + chunksize = kwargs.pop("chunksize", "default") + warn( + "`chunksize` is deprecated and will be removed in the future. " + "Please use `blocksize` instead.", + FutureWarning, + ) + if blocksize == "default": + blocksize = chunksize + + # Set default `blocksize` + if blocksize == "default": + if ( + kwargs.get("skipfooter", 0) != 0 + or kwargs.get("skiprows", 0) != 0 + or kwargs.get("nrows", None) is not None + ): + # Cannot read in blocks if skipfooter, + # skiprows or nrows is passed. + blocksize = None + else: + blocksize = "256 MiB" + + if "://" in str(path): + func = make_reader(cudf.read_csv, "read_csv", "CSV") + return func(path, blocksize=blocksize, **kwargs) + else: + return _internal_read_csv(path=path, blocksize=blocksize, **kwargs) + + +def _internal_read_csv(path, blocksize="256 MiB", **kwargs): + if isinstance(blocksize, str): + blocksize = parse_bytes(blocksize) + + if isinstance(path, list): + filenames = path + elif isinstance(path, str): + filenames = sorted(glob(path)) + elif hasattr(path, "__fspath__"): + filenames = sorted(glob(path.__fspath__())) + else: + raise TypeError(f"Path type not understood:{type(path)}") + + if not filenames: + msg = f"A file in: {filenames} does not exist." + raise FileNotFoundError(msg) + + compression = kwargs.get("compression", "infer") + + if compression == "infer": + # Infer compression from first path by default + compression = infer_compression(filenames[0]) + + if compression and blocksize: + # compressed CSVs reading must read the entire file + kwargs.pop("byte_range", None) + warn( + "Warning %s compression does not support breaking apart files\n" + "Please ensure that each individual file can fit in memory and\n" + "use the keyword ``blocksize=None to remove this message``\n" + "Setting ``blocksize=(size of file)``" % compression + ) + blocksize = None + + if blocksize is None: + return read_csv_without_blocksize(path, **kwargs) + + # Let dask.dataframe generate meta + dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV") + kwargs1 = kwargs.copy() + usecols = kwargs1.pop("usecols", None) + dtype = kwargs1.pop("dtype", None) + meta = dask_reader(filenames[0], **kwargs1)._meta + names = meta.columns + if usecols or dtype: + # Regenerate meta with original kwargs if + # `usecols` or `dtype` was specified + meta = dask_reader(filenames[0], **kwargs)._meta + + i = 0 + path_list = [] + kwargs_list = [] + for fn in filenames: + size = os.path.getsize(fn) + for start in range(0, size, blocksize): + kwargs2 = kwargs.copy() + kwargs2["byte_range"] = ( + start, + blocksize, + ) # specify which chunk of the file we care about + if start != 0: + kwargs2["names"] = names # no header in the middle of the file + kwargs2["header"] = None + path_list.append(fn) + kwargs_list.append(kwargs2) + i += 1 + + return dd.from_map(_read_csv, path_list, kwargs_list, meta=meta) + + +def _read_csv(fn, kwargs): + return cudf.read_csv(fn, **kwargs) + + +def read_csv_without_blocksize(path, **kwargs): + """Read entire CSV with optional compression (gzip/zip) + + Parameters + ---------- + path : str + path to files (support for glob) + """ + if isinstance(path, list): + filenames = path + elif isinstance(path, str): + filenames = sorted(glob(path)) + elif hasattr(path, "__fspath__"): + filenames = sorted(glob(path.__fspath__())) + else: + raise TypeError(f"Path type not understood:{type(path)}") + + meta_kwargs = kwargs.copy() + if "skipfooter" in meta_kwargs: + meta_kwargs.pop("skipfooter") + if "nrows" in meta_kwargs: + meta_kwargs.pop("nrows") + # Read "head" of first file (first 5 rows). + # Convert to empty df for metadata. + meta = cudf.read_csv(filenames[0], nrows=5, **meta_kwargs).iloc[:0] + return dd.from_map(cudf.read_csv, filenames, meta=meta, **kwargs) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_csv.py b/python/dask_cudf/dask_cudf/io/tests/test_csv.py index a0acb86f5a9..bf8c0e0b320 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_csv.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_csv.py @@ -275,7 +275,3 @@ def test_deprecated_api_paths(tmp_path): with pytest.warns(match="dask_cudf.io.read_csv is now deprecated"): df2 = dask_cudf.io.read_csv(csv_path) dd.assert_eq(df, df2, check_divisions=False) - - with pytest.warns(match="dask_cudf.io.csv.read_csv is now deprecated"): - df2 = dask_cudf.io.csv.read_csv(csv_path) - dd.assert_eq(df, df2, check_divisions=False) From 7f3b9686136247c0004edd05be3dede5e7ad36bd Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 17 Dec 2024 08:47:29 -0800 Subject: [PATCH 2/2] remove chunksize support --- python/dask_cudf/dask_cudf/io/csv.py | 12 ------------ python/dask_cudf/dask_cudf/io/tests/test_csv.py | 5 ----- 2 files changed, 17 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/csv.py b/python/dask_cudf/dask_cudf/io/csv.py index b0936c2ec90..29f98b14511 100644 --- a/python/dask_cudf/dask_cudf/io/csv.py +++ b/python/dask_cudf/dask_cudf/io/csv.py @@ -71,18 +71,6 @@ def read_csv(path, blocksize="default", **kwargs): 2 3 ai """ - - # Handle `chunksize` deprecation - if "chunksize" in kwargs: - chunksize = kwargs.pop("chunksize", "default") - warn( - "`chunksize` is deprecated and will be removed in the future. " - "Please use `blocksize` instead.", - FutureWarning, - ) - if blocksize == "default": - blocksize = chunksize - # Set default `blocksize` if blocksize == "default": if ( diff --git a/python/dask_cudf/dask_cudf/io/tests/test_csv.py b/python/dask_cudf/dask_cudf/io/tests/test_csv.py index bf8c0e0b320..ddfd1c1adac 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_csv.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_csv.py @@ -185,11 +185,6 @@ def test_read_csv_blocksize_none(tmp_path, compression, size): df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ) dd.assert_eq(df, df2) - # Test chunksize deprecation - with pytest.warns(FutureWarning, match="deprecated"): - df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ) - dd.assert_eq(df, df3) - @pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None]) def test_csv_reader_usecols(tmp_path, dtype):