From a51086d97f4c65169f6461f25bc2a57e30090944 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 18 Apr 2024 18:12:55 -0700 Subject: [PATCH 01/24] WIP: initial implementation start --- src/dask_nested/__init__.py | 5 +- src/dask_nested/accessor.py | 7 ++ src/dask_nested/backends.py | 43 +++++++++++ src/dask_nested/core.py | 116 ++++++++++++++++++++++++++++++ src/dask_nested/example_module.py | 23 ------ 5 files changed, 168 insertions(+), 26 deletions(-) create mode 100644 src/dask_nested/accessor.py create mode 100644 src/dask_nested/backends.py create mode 100644 src/dask_nested/core.py delete mode 100644 src/dask_nested/example_module.py diff --git a/src/dask_nested/__init__.py b/src/dask_nested/__init__.py index b564b85..111072e 100644 --- a/src/dask_nested/__init__.py +++ b/src/dask_nested/__init__.py @@ -1,3 +1,2 @@ -from .example_module import greetings, meaning - -__all__ = ["greetings", "meaning"] +from . import backends, accessor +from .core import NestedFrame \ No newline at end of file diff --git a/src/dask_nested/accessor.py b/src/dask_nested/accessor.py new file mode 100644 index 0000000..b9fd87c --- /dev/null +++ b/src/dask_nested/accessor.py @@ -0,0 +1,7 @@ +from dask.dataframe.extensions import make_array_nonempty, make_scalar, register_series_accessor +import nested_pandas as npd + + +@register_series_accessor("nest") +class DaskNestSeriesAccessor(npd.NestSeriesAccessor): + pass diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py new file mode 100644 index 0000000..fdf4ae8 --- /dev/null +++ b/src/dask_nested/backends.py @@ -0,0 +1,43 @@ +from dask.dataframe.utils import meta_nonempty +from dask.dataframe.dispatch import make_meta_dispatch, pyarrow_schema_dispatch +from dask.dataframe.backends import _nonempty_index, meta_nonempty_dataframe, _nonempty_series +from dask.dataframe.extensions import make_array_nonempty, make_scalar, register_series_accessor +import dask.dataframe as dd + +from dask_expr import get_collection_type + +import nested_pandas as npd +from nested_pandas.series.ext_array import NestedExtensionArray +from nested_pandas import NestedDtype, NestSeriesAccessor + +import pandas as pd + +from .core import NestedFrame + +get_collection_type.register(npd.NestedFrame, lambda _: NestedFrame) + + +@make_meta_dispatch.register(npd.NestedFrame) +def make_meta_frame(x, index=None): + # Create an empty NestedFrame to use as Dask's underlying object meta. + result = x.head(0) + return result + + +@meta_nonempty.register(npd.NestedFrame) +def _nonempty_nestedframe(x, index=None): + # Construct a new NestedFrame with the same underlying data. + df = meta_nonempty_dataframe(x) + return npd.NestedFrame(df) + + +@make_array_nonempty.register(npd.NestedDtype) +def _(dtype): + #return pd.DataFrame() # should it be dd.Dataframe()? + return NestedExtensionArray._from_sequence([pd.DataFrame()], dtype=dtype) + #return NestedExtensionArray. #TODO: Figure out + +#@register_series_accessor("nest") +#class NestSeriesAccessor(npd.NestSeriesAccessor) + + diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py new file mode 100644 index 0000000..526c90d --- /dev/null +++ b/src/dask_nested/core.py @@ -0,0 +1,116 @@ +import dask +import dask.dataframe as dd +import dask_expr as dx +from dask_expr import get_collection_type +from dask_expr._collection import new_collection, from_dict +from dask_expr._expr import _emulate, ApplyConcatApply + +import nested_pandas as npd + + +class _Frame(dx.FrameBase): + """Base class for extensions of Dask Dataframes that track additional + Ensemble-related metadata. + """ + + _partition_type = npd.NestedFrame + + def __init__(self, expr, label=None, ensemble=None): + super().__init__(expr) + + @property + def _args(self): + # Ensure our Dask extension can correctly be used by pickle. + # See https://github.com/geopandas/dask-geopandas/issues/237 + return super()._args + + def optimize(self, fuse: bool = True): + result = new_collection(self.expr.optimize(fuse=fuse)) + return result + + def __dask_postpersist__(self): + func, args = super().__dask_postpersist__() + + return self._rebuild, (func, args) + + def _rebuild(self, graph, func, args): + collection = func(graph, *args) + return collection + + +class NestedFrame( + _Frame, dd.DataFrame +): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0) + """An extension for a Dask Dataframe for Nested. + + The underlying non-parallel dataframes are TapeFrames and TapeSeries which extend Pandas frames. + + Examples + ---------- + Instatiation:: + + import tape + ens = tape.Ensemble() + data = {...} # Some data you want tracked by the Ensemble + ensemble_frame = tape.EnsembleFrame.from_dict(data, label="my_frame", ensemble=ens) + """ + + _partition_type = npd.NestedFrame # Tracks the underlying data type + + def __getitem__(self, key): + result = super().__getitem__(key) + return result + + @classmethod + def from_nestedpandas(cls, data, npartitions=None, chunksize=None, sort=True, label=None, ensemble=None): + """Returns an EnsembleFrame constructed from a TapeFrame. + + Parameters + ---------- + data: `TapeFrame` + Frame containing the underlying data fro the EnsembleFram + npartitions: `int`, optional + The number of partitions of the index to create. Note that depending on + the size and index of the dataframe, the output may have fewer + partitions than requested. + chunksize: `int`, optional + Size of the individual chunks of data in non-parallel objects that make up Dask frames. + sort: `bool`, optional + Whether to sort the frame by a default index. + label: `str`, optional + The label used to by the Ensemble to identify the frame. + ensemble: `tape.Ensemble`, optional + A link to the Ensemble object that owns this frame. + + Returns + ---------- + result: `tape.EnsembleFrame` + The constructed EnsembleFrame object. + """ + result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort) + return result + + @classmethod + def from_dask_dataframe(cl, df, ensemble=None, label=None): + """Returns an EnsembleFrame constructed from a Dask dataframe. + + Parameters + ---------- + df: `dask.dataframe.DataFrame` or `list` + a Dask dataframe to convert to an EnsembleFrame + ensemble: `tape.ensemble.Ensemble`, optional + A link to the Ensemble object that owns this frame. + label: `str`, optional + The label used to by the Ensemble to identify the frame. + + Returns + ---------- + result: `tape.EnsembleFrame` + The constructed EnsembleFrame object. + """ + # Create a EnsembleFrame by mapping the partitions to the appropriate meta, TapeFrame + # TODO(wbeebe@uw.edu): Determine if there is a better method + result = df.map_partitions(npd.NestedFrame) + result.ensemble = ensemble + result.label = label + return result \ No newline at end of file diff --git a/src/dask_nested/example_module.py b/src/dask_nested/example_module.py deleted file mode 100644 index f76e837..0000000 --- a/src/dask_nested/example_module.py +++ /dev/null @@ -1,23 +0,0 @@ -"""An example module containing simplistic functions.""" - - -def greetings() -> str: - """A friendly greeting for a future friend. - - Returns - ------- - str - A typical greeting from a software engineer. - """ - return "Hello from LINCC-Frameworks!" - - -def meaning() -> int: - """The meaning of life, the universe, and everything. - - Returns - ------- - int - The meaning of life. - """ - return 42 From 5d40646cd1a98442770054c14a84ea04af8172e3 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 24 Apr 2024 12:25:46 -0700 Subject: [PATCH 02/24] add fixed array_nonempty --- src/dask_nested/accessor.py | 26 +++++++++++++++++++++++++- src/dask_nested/backends.py | 6 +++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/dask_nested/accessor.py b/src/dask_nested/accessor.py index b9fd87c..acdc0f8 100644 --- a/src/dask_nested/accessor.py +++ b/src/dask_nested/accessor.py @@ -1,7 +1,31 @@ from dask.dataframe.extensions import make_array_nonempty, make_scalar, register_series_accessor import nested_pandas as npd +from nested_pandas import NestedDtype, NestSeriesAccessor +import dask @register_series_accessor("nest") class DaskNestSeriesAccessor(npd.NestSeriesAccessor): - pass + + def __init__(self, series): + self._check_series(series) + + self._series = series + + @staticmethod + def _check_series(series): + dtype = series.dtype + if not isinstance(dtype, NestedDtype): + raise AttributeError(f"Can only use .nest accessor with a Series of NestedDtype, got {dtype}") + + @property + def fields(self) -> list[str]: + """Names of the nested columns""" + return self._series.head(0).nest.fields + #hacky + #return self._series.partitions[0:1].map_partitions(lambda x: x.nest.fields) + #return self._series.array.field_names + + @dask.delayed + def test_fields(self): + return self._series.head(0).nest.fields diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py index fdf4ae8..92f73d4 100644 --- a/src/dask_nested/backends.py +++ b/src/dask_nested/backends.py @@ -33,9 +33,9 @@ def _nonempty_nestedframe(x, index=None): @make_array_nonempty.register(npd.NestedDtype) def _(dtype): - #return pd.DataFrame() # should it be dd.Dataframe()? - return NestedExtensionArray._from_sequence([pd.DataFrame()], dtype=dtype) - #return NestedExtensionArray. #TODO: Figure out + # must be two values + return NestedExtensionArray._from_sequence([pd.NA, pd.NA], dtype=dtype) + #@register_series_accessor("nest") #class NestSeriesAccessor(npd.NestSeriesAccessor) From 68ff5f705d0f5c46c6cb066b74b28a612eea6ea6 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 30 Apr 2024 10:13:18 -0700 Subject: [PATCH 03/24] adding functions --- src/dask_nested/__init__.py | 5 +- src/dask_nested/backends.py | 21 +--- src/dask_nested/core.py | 198 ++++++++++++++++++++++++++++++++++-- src/dask_nested/io.py | 21 ++++ 4 files changed, 219 insertions(+), 26 deletions(-) create mode 100644 src/dask_nested/io.py diff --git a/src/dask_nested/__init__.py b/src/dask_nested/__init__.py index 111072e..cb7fe12 100644 --- a/src/dask_nested/__init__.py +++ b/src/dask_nested/__init__.py @@ -1,2 +1,3 @@ -from . import backends, accessor -from .core import NestedFrame \ No newline at end of file +from . import backends, accessor # noqa +from .core import NestedFrame # noqa +from .io import read_parquet # noqa diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py index 92f73d4..d040b19 100644 --- a/src/dask_nested/backends.py +++ b/src/dask_nested/backends.py @@ -1,16 +1,11 @@ +import nested_pandas as npd +import pandas as pd +from dask.dataframe.backends import meta_nonempty_dataframe +from dask.dataframe.dispatch import make_meta_dispatch +from dask.dataframe.extensions import make_array_nonempty from dask.dataframe.utils import meta_nonempty -from dask.dataframe.dispatch import make_meta_dispatch, pyarrow_schema_dispatch -from dask.dataframe.backends import _nonempty_index, meta_nonempty_dataframe, _nonempty_series -from dask.dataframe.extensions import make_array_nonempty, make_scalar, register_series_accessor -import dask.dataframe as dd - from dask_expr import get_collection_type - -import nested_pandas as npd from nested_pandas.series.ext_array import NestedExtensionArray -from nested_pandas import NestedDtype, NestSeriesAccessor - -import pandas as pd from .core import NestedFrame @@ -35,9 +30,3 @@ def _nonempty_nestedframe(x, index=None): def _(dtype): # must be two values return NestedExtensionArray._from_sequence([pd.NA, pd.NA], dtype=dtype) - - -#@register_series_accessor("nest") -#class NestSeriesAccessor(npd.NestSeriesAccessor) - - diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index 526c90d..5af4390 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -1,11 +1,11 @@ -import dask import dask.dataframe as dd import dask_expr as dx -from dask_expr import get_collection_type -from dask_expr._collection import new_collection, from_dict -from dask_expr._expr import _emulate, ApplyConcatApply - import nested_pandas as npd +from dask_expr._collection import new_collection +from nested_pandas.series.packer import pack_flat +from pandas._libs import lib +from pandas._typing import AnyAll, Axis, IndexLabel +from pandas.api.extensions import no_default class _Frame(dx.FrameBase): @@ -23,7 +23,7 @@ def _args(self): # Ensure our Dask extension can correctly be used by pickle. # See https://github.com/geopandas/dask-geopandas/issues/237 return super()._args - + def optimize(self, fuse: bool = True): result = new_collection(self.expr.optimize(fuse=fuse)) return result @@ -89,7 +89,7 @@ def from_nestedpandas(cls, data, npartitions=None, chunksize=None, sort=True, la """ result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort) return result - + @classmethod def from_dask_dataframe(cl, df, ensemble=None, label=None): """Returns an EnsembleFrame constructed from a Dask dataframe. @@ -113,4 +113,186 @@ def from_dask_dataframe(cl, df, ensemble=None, label=None): result = df.map_partitions(npd.NestedFrame) result.ensemble = ensemble result.label = label - return result \ No newline at end of file + return result + + def add_nested(self, nested, name): + """Packs a dataframe into a nested column + + Parameters + ---------- + nested: + A flat dataframe to pack into a nested column + name: + The name given to the nested column + + Returns + ------- + `dask_nested.NestedFrame` + """ + nested = nested.map_partitions(lambda x: pack_flat(x)).rename(name) + return self.join(nested, how="outer") + + def query(self, expr): + """ + Query the columns of a NestedFrame with a boolean expression. Specified + queries can target nested columns in addition to the typical column set + + Docstring copied from nested-pandas query + + Parameters + ---------- + expr : str + The query string to evaluate. + + Access nested columns using `nested_df.nested_col` (where + `nested_df` refers to a particular nested dataframe and + `nested_col` is a column of that nested dataframe). + + You can refer to variables + in the environment by prefixing them with an '@' character like + ``@a + b``. + + You can refer to column names that are not valid Python variable names + by surrounding them in backticks. Thus, column names containing spaces + or punctuations (besides underscores) or starting with digits must be + surrounded by backticks. (For example, a column named "Area (cm^2)" would + be referenced as ```Area (cm^2)```). Column names which are Python keywords + (like "list", "for", "import", etc) cannot be used. + + For example, if one of your columns is called ``a a`` and you want + to sum it with ``b``, your query should be ```a a` + b``. + + Returns + ------- + DataFrame + DataFrame resulting from the provided query expression. + + Notes + ----- + Queries that target a particular nested structure return a dataframe + with rows of that particular nested structure filtered. For example, + querying the NestedFrame "df" with nested structure "my_nested" as + below will return all rows of df, but with mynested filtered by the + condition: + + >>> df.query("mynested.a > 2") + """ + return self.map_partitions(lambda x: x.query(expr)) + + def dropna( + self, + *, + axis: Axis = 0, + how: AnyAll | lib.NoDefault = no_default, + thresh: int | lib.NoDefault = no_default, + on_nested: bool = False, + subset: IndexLabel | None = None, + inplace: bool = False, + ignore_index: bool = False, + ): + """ + Remove missing values for one layer of the NestedFrame. + + Parameters + ---------- + axis : {0 or 'index', 1 or 'columns'}, default 0 + Determine if rows or columns which contain missing values are + removed. + + * 0, or 'index' : Drop rows which contain missing values. + * 1, or 'columns' : Drop columns which contain missing value. + + Only a single axis is allowed. + + how : {'any', 'all'}, default 'any' + Determine if row or column is removed from DataFrame, when we have + at least one NA or all NA. + + * 'any' : If any NA values are present, drop that row or column. + * 'all' : If all values are NA, drop that row or column. + thresh : int, optional + Require that many non-NA values. Cannot be combined with how. + on_nested : str or bool, optional + If not False, applies the call to the nested dataframe in the + column with label equal to the provided string. If specified, + the nested dataframe should align with any columns given in + `subset`. + subset : column label or sequence of labels, optional + Labels along other axis to consider, e.g. if you are dropping rows + these would be a list of columns to include. + + Access nested columns using `nested_df.nested_col` (where + `nested_df` refers to a particular nested dataframe and + `nested_col` is a column of that nested dataframe). + inplace : bool, default False + Whether to modify the DataFrame rather than creating a new one. + ignore_index : bool, default ``False`` + If ``True``, the resulting axis will be labeled 0, 1, …, n - 1. + + .. versionadded:: 2.0.0 + + Returns + ------- + DataFrame or None + DataFrame with NA entries dropped from it or None if ``inplace=True``. + + Notes + ----- + Operations that target a particular nested structure return a dataframe + with rows of that particular nested structure affected. + + Values for `on_nested` and `subset` should be consistent in pointing + to a single layer, multi-layer operations are not supported at this + time. + """ + # grab meta from head, assumes row-based operation + meta = self.head(0) + return self.map_partitions( + lambda x: x.dropna( + axis=axis, + how=how, + thresh=thresh, + on_nested=on_nested, + subset=subset, + inplace=inplace, + ignore_index=ignore_index, + ), + meta=meta, + ) + + def reduce(self, func, *args, **kwargs): + """ + Takes a function and applies it to each top-level row of the NestedFrame. + + docstring copied from nested-pandas + + The user may specify which columns the function is applied to, with + columns from the 'base' layer being passsed to the function as + scalars and columns from the nested layers being passed as numpy arrays. + + Parameters + ---------- + func : callable + Function to apply to each nested dataframe. The first arguments to `func` should be which + columns to apply the function to. + args : positional arguments + Positional arguments to pass to the function, the first *args should be the names of the + columns to apply the function to. + kwargs : keyword arguments, optional + Keyword arguments to pass to the function. + + Returns + ------- + `NestedFrame` + `NestedFrame` with the results of the function applied to the columns of the frame. + + Notes + ----- + The recommend return value of func should be a `pd.Series` where the indices are the names of the + output columns in the dataframe returned by `reduce`. Note however that in cases where func + returns a single value there may be a performance benefit to returning the scalar value + rather than a `pd.Series`. + """ + + # apply nested_pandas reduce via map_partitions + return self.map_partitions(lambda x: x.reduce(func, *args, **kwargs)) diff --git a/src/dask_nested/io.py b/src/dask_nested/io.py new file mode 100644 index 0000000..5249adb --- /dev/null +++ b/src/dask_nested/io.py @@ -0,0 +1,21 @@ +def read_parquet( + path, + columns=None, + filters=None, + categories=None, + index=None, + storage_options=None, + engine="auto", + use_nullable_dtypes: bool | None = None, + dtype_backend=None, + calculate_divisions=None, + ignore_metadata_file=False, + metadata_task_size=None, + split_row_groups="infer", + blocksize="default", + aggregate_files=None, + parquet_file_extension=(".parq", ".parquet", ".pq"), + filesystem=None, + **kwargs, +): + pass From 7f82621b1efb51409bd25a7c13b3cf74d4e83c90 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 30 Apr 2024 15:10:07 -0700 Subject: [PATCH 04/24] further implementation --- src/dask_nested/core.py | 54 ++++++++++++++++++++++++++--------------- src/dask_nested/io.py | 28 ++++++++++++++++++++- 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index 5af4390..2b758bb 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -2,6 +2,7 @@ import dask_expr as dx import nested_pandas as npd from dask_expr._collection import new_collection +from nested_pandas.series.dtype import NestedDtype from nested_pandas.series.packer import pack_flat from pandas._libs import lib from pandas._typing import AnyAll, Axis, IndexLabel @@ -91,29 +92,42 @@ def from_nestedpandas(cls, data, npartitions=None, chunksize=None, sort=True, la return result @classmethod - def from_dask_dataframe(cl, df, ensemble=None, label=None): - """Returns an EnsembleFrame constructed from a Dask dataframe. + def from_dask_dataframe(cl, df): + """Converts a Dask Dataframe to a Dask-Nested NestedFrame Parameters ---------- - df: `dask.dataframe.DataFrame` or `list` - a Dask dataframe to convert to an EnsembleFrame - ensemble: `tape.ensemble.Ensemble`, optional - A link to the Ensemble object that owns this frame. - label: `str`, optional - The label used to by the Ensemble to identify the frame. + df: + A Dask Dataframe to convert Returns - ---------- - result: `tape.EnsembleFrame` - The constructed EnsembleFrame object. + ------- + `dask_nested.NestedFrame` """ - # Create a EnsembleFrame by mapping the partitions to the appropriate meta, TapeFrame - # TODO(wbeebe@uw.edu): Determine if there is a better method - result = df.map_partitions(npd.NestedFrame) - result.ensemble = ensemble - result.label = label - return result + return df.map_partitions(npd.NestedFrame) + + def compute(self, **kwargs): + """Compute this Dask collection, returning the underlying dataframe or series.""" + return npd.NestedFrame(super().compute(**kwargs)) + + @property + def all_columns(self) -> dict: + """returns a dictionary of columns for each base/nested dataframe""" + all_columns = {"base": self.columns} + for column in self.columns: + if isinstance(self[column].dtype, NestedDtype): + nest_cols = list(self.dtypes[column].fields.keys()) + all_columns[column] = nest_cols + return all_columns + + @property + def nested_columns(self) -> list: + """retrieves the base column names for all nested dataframes""" + nest_cols = [] + for column in self.columns: + if isinstance(self[column].dtype, NestedDtype): + nest_cols.append(column) + return nest_cols def add_nested(self, nested, name): """Packs a dataframe into a nested column @@ -177,7 +191,7 @@ def query(self, expr): >>> df.query("mynested.a > 2") """ - return self.map_partitions(lambda x: x.query(expr)) + return self.map_partitions(lambda x: x.query(expr), meta=self._meta) def dropna( self, @@ -260,7 +274,7 @@ def dropna( meta=meta, ) - def reduce(self, func, *args, **kwargs): + def reduce(self, func, *args, meta=None, **kwargs): """ Takes a function and applies it to each top-level row of the NestedFrame. @@ -295,4 +309,4 @@ def reduce(self, func, *args, **kwargs): """ # apply nested_pandas reduce via map_partitions - return self.map_partitions(lambda x: x.reduce(func, *args, **kwargs)) + return self.map_partitions(lambda x: x.reduce(func, *args, **kwargs), meta=meta) diff --git a/src/dask_nested/io.py b/src/dask_nested/io.py index 5249adb..8a46e64 100644 --- a/src/dask_nested/io.py +++ b/src/dask_nested/io.py @@ -1,3 +1,8 @@ +import dask.dataframe as dd + +from .core import NestedFrame + + def read_parquet( path, columns=None, @@ -18,4 +23,25 @@ def read_parquet( filesystem=None, **kwargs, ): - pass + return NestedFrame.from_dask_dataframe( + dd.read_parquet( + path=path, + columns=columns, + filters=filters, + categories=categories, + index=index, + storage_options=storage_options, + engine=engine, + use_nullable_dtypes=use_nullable_dtypes, + dtype_backend=dtype_backend, + calculate_divisions=calculate_divisions, + ignore_metadata_file=ignore_metadata_file, + metadata_task_size=metadata_task_size, + split_row_groups=split_row_groups, + blocksize=blocksize, + aggregate_files=aggregate_files, + parquet_file_extension=parquet_file_extension, + filesystem=filesystem, + **kwargs, + ) + ) From 18fc5d29e2c906e4d8b6c6326789b879d121203a Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 7 May 2024 15:51:34 -0700 Subject: [PATCH 05/24] more work, starting tests --- src/dask_nested/io.py | 187 +++++++++++++++++++++++ tests/dask_nested/conftest.py | 54 +++++++ tests/dask_nested/test_example_module.py | 13 -- tests/dask_nested/test_nestedframe.py | 51 +++++++ 4 files changed, 292 insertions(+), 13 deletions(-) delete mode 100644 tests/dask_nested/test_example_module.py create mode 100644 tests/dask_nested/test_nestedframe.py diff --git a/src/dask_nested/io.py b/src/dask_nested/io.py index 8a46e64..6eef903 100644 --- a/src/dask_nested/io.py +++ b/src/dask_nested/io.py @@ -23,6 +23,193 @@ def read_parquet( filesystem=None, **kwargs, ): + """ + Read a Parquet file into a Dask DataFrame + + This reads a directory of Parquet data into a Dask.dataframe, one file per + partition. It selects the index among the sorted columns if any exist. + + Docstring copied from `dask.dataframe.read_parquet` + + Parameters + ---------- + path : str or list + Source directory for data, or path(s) to individual parquet files. + Prefix with a protocol like ``s3://`` to read from alternative + filesystems. To read from multiple files you can pass a globstring or a + list of paths, with the caveat that they must all have the same + protocol. + columns : str or list, default None + Field name(s) to read in as columns in the output. By default all + non-index fields will be read (as determined by the pandas parquet + metadata, if present). Provide a single field name instead of a list to + read in the data as a Series. + filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None + List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. + Using this argument will result in row-wise filtering of the final partitions. + + Predicates can be expressed in disjunctive normal form (DNF). This means that + the inner-most tuple describes a single column predicate. These inner predicates + are combined with an AND conjunction into a larger predicate. The outer-most + list then combines all of the combined filters with an OR disjunction. + + Predicates can also be expressed as a ``List[Tuple]``. These are evaluated + as an AND conjunction. To express OR in predicates, one must use the + (preferred for "pyarrow") ``List[List[Tuple]]`` notation. + index : str, list or False, default None + Field name(s) to use as the output frame index. By default will be + inferred from the pandas parquet file metadata, if present. Use ``False`` + to read all fields as columns. + categories : list or dict, default None + For any fields listed here, if the parquet encoding is Dictionary, + the column will be created with dtype category. Use only if it is + guaranteed that the column is encoded as dictionary in all row-groups. + If a list, assumes up to 2**16-1 labels; if a dict, specify the number + of labels expected; if None, will load categories automatically for + data written by dask, not otherwise. + storage_options : dict, default None + Key/value pairs to be passed on to the file-system backend, if any. + Note that the default file-system backend can be configured with the + ``filesystem`` argument, described below. + open_file_options : dict, default None + Key/value arguments to be passed along to ``AbstractFileSystem.open`` + when each parquet data file is open for reading. Experimental + (optimized) "precaching" for remote file systems (e.g. S3, GCS) can + be enabled by adding ``{"method": "parquet"}`` under the + ``"precache_options"`` key. Also, a custom file-open function can be + used (instead of ``AbstractFileSystem.open``), by specifying the + desired function under the ``"open_file_func"`` key. + engine : {'auto', 'pyarrow'} + Parquet library to use. Defaults to 'auto', which uses ``pyarrow`` if + it is installed, and falls back to the deprecated ``fastparquet`` otherwise. + Note that ``fastparquet`` does not support all functionality offered by + ``pyarrow``. + This is also used by third-party packages (e.g. CuDF) to inject bespoke engines. + use_nullable_dtypes : {False, True} + Whether to use extension dtypes for the resulting ``DataFrame``. + + .. note:: + + This option is deprecated. Use "dtype_backend" instead. + + dtype_backend : {'numpy_nullable', 'pyarrow'}, defaults to NumPy backed DataFrames + Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, + nullable dtypes are used for all dtypes that have a nullable implementation + when 'numpy_nullable' is set, pyarrow is used for all dtypes if 'pyarrow' + is set. + ``dtype_backend="pyarrow"`` requires ``pandas`` 1.5+. + calculate_divisions : bool, default False + Whether to use min/max statistics from the footer metadata (or global + ``_metadata`` file) to calculate divisions for the output DataFrame + collection. Divisions will not be calculated if statistics are missing. + This option will be ignored if ``index`` is not specified and there is + no physical index column specified in the custom "pandas" Parquet + metadata. Note that ``calculate_divisions=True`` may be extremely slow + when no global ``_metadata`` file is present, especially when reading + from remote storage. Set this to ``True`` only when known divisions + are needed for your workload (see :ref:`dataframe-design-partitions`). + ignore_metadata_file : bool, default False + Whether to ignore the global ``_metadata`` file (when one is present). + If ``True``, or if the global ``_metadata`` file is missing, the parquet + metadata may be gathered and processed in parallel. Parallel metadata + processing is currently supported for ``ArrowDatasetEngine`` only. + metadata_task_size : int, default configurable + If parquet metadata is processed in parallel (see ``ignore_metadata_file`` + description above), this argument can be used to specify the number of + dataset files to be processed by each task in the Dask graph. If this + argument is set to ``0``, parallel metadata processing will be disabled. + The default values for local and remote filesystems can be specified + with the "metadata-task-size-local" and "metadata-task-size-remote" + config fields, respectively (see "dataframe.parquet"). + split_row_groups : 'infer', 'adaptive', bool, or int, default 'infer' + If True, then each output dataframe partition will correspond to a single + parquet-file row-group. If False, each partition will correspond to a + complete file. If a positive integer value is given, each dataframe + partition will correspond to that number of parquet row-groups (or fewer). + If 'adaptive', the metadata of each file will be used to ensure that every + partition satisfies ``blocksize``. If 'infer' (the default), the + uncompressed storage-size metadata in the first file will be used to + automatically set ``split_row_groups`` to either 'adaptive' or ``False``. + blocksize : int or str, default 'default' + The desired size of each output ``DataFrame`` partition in terms of total + (uncompressed) parquet storage space. This argument is currently used to + set the default value of ``split_row_groups`` (using row-group metadata + from a single file), and will be ignored if ``split_row_groups`` is not + set to 'infer' or 'adaptive'. Default is 256 MiB. + aggregate_files : bool or str, default None + WARNING: Passing a string argument to ``aggregate_files`` will result + in experimental behavior. This behavior may change in the future. + + Whether distinct file paths may be aggregated into the same output + partition. This parameter is only used when `split_row_groups` is set to + 'infer', 'adaptive' or to an integer >1. A setting of True means that any + two file paths may be aggregated into the same output partition, while + False means that inter-file aggregation is prohibited. + + For "hive-partitioned" datasets, a "partition"-column name can also be + specified. In this case, we allow the aggregation of any two files + sharing a file path up to, and including, the corresponding directory name. + For example, if ``aggregate_files`` is set to ``"section"`` for the + directory structure below, ``03.parquet`` and ``04.parquet`` may be + aggregated together, but ``01.parquet`` and ``02.parquet`` cannot be. + If, however, ``aggregate_files`` is set to ``"region"``, ``01.parquet`` + may be aggregated with ``02.parquet``, and ``03.parquet`` may be aggregated + with ``04.parquet``:: + + dataset-path/ + ├── region=1/ + │ ├── section=a/ + │ │ └── 01.parquet + │ ├── section=b/ + │ └── └── 02.parquet + └── region=2/ + ├── section=a/ + │ ├── 03.parquet + └── └── 04.parquet + + Note that the default behavior of ``aggregate_files`` is ``False``. + parquet_file_extension: str, tuple[str], or None, default (".parq", ".parquet", ".pq") + A file extension or an iterable of extensions to use when discovering + parquet files in a directory. Files that don't match these extensions + will be ignored. This argument only applies when ``paths`` corresponds + to a directory and no ``_metadata`` file is present (or + ``ignore_metadata_file=True``). Passing in ``parquet_file_extension=None`` + will treat all files in the directory as parquet files. + + The purpose of this argument is to ensure that the engine will ignore + unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). + It may be necessary to change this argument if the data files in your + parquet dataset do not end in ".parq", ".parquet", or ".pq". + filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use. + dataset: dict, default None + Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object. + These options may include a "filesystem" key to configure the desired + file-system backend. However, the top-level ``filesystem`` argument will always + take precedence. + + **Note**: The ``dataset`` options may include a "partitioning" key. + However, since ``pyarrow.dataset.Partitioning`` + objects cannot be serialized, the value can be a dict of key-word + arguments for the ``pyarrow.dataset.partitioning`` API + (e.g. ``dataset={"partitioning": {"flavor": "hive", "schema": ...}}``). + Note that partitioned columns will not be converted to categorical + dtypes when a custom partitioning schema is specified in this way. + read: dict, default None + Dictionary of options to pass through to ``engine.read_partitions`` + using the ``read`` key-word argument. + arrow_to_pandas: dict, default None + Dictionary of options to use when converting from ``pyarrow.Table`` to + a pandas ``DataFrame`` object. Only used by the "arrow" engine. + **kwargs: dict (of dicts) + Options to pass through to ``engine.read_partitions`` as stand-alone + key-word arguments. Note that these options will be ignored by the + engines defined in ``dask.dataframe``, but may be used by other custom + implementations. + + Examples + -------- + >>> df = dd.read_parquet('s3://bucket/my-parquet-data') # doctest: +SKIP + """ return NestedFrame.from_dask_dataframe( dd.read_parquet( path=path, diff --git a/tests/dask_nested/conftest.py b/tests/dask_nested/conftest.py index e69de29..b025941 100644 --- a/tests/dask_nested/conftest.py +++ b/tests/dask_nested/conftest.py @@ -0,0 +1,54 @@ +import dask_nested as dn +import nested_pandas as npd +import numpy as np +import pytest + + +@pytest.fixture +def test_dataset(): + """create a toy dataset for testing purposes""" + n_base = 50 + layer_size = 500 + randomstate = np.random.RandomState(seed=1) + + # Generate base data + base_data = {"a": randomstate.random(n_base), "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + layer_data = { + "t": randomstate.random(layer_size * n_base) * 20, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index") + + base_dn = dn.NestedFrame.from_nestedpandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nestedpandas(layer_nf, npartitions=10) + + return base_dn.add_nested(layer_dn, "nested") + + +@pytest.fixture +def test_dataset_no_add_nested(): + """stop before add_nested""" + n_base = 50 + layer_size = 500 + randomstate = np.random.RandomState(seed=1) + + # Generate base data + base_data = {"a": randomstate.random(n_base), "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + layer_data = { + "t": randomstate.random(layer_size * n_base) * 20, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index") + + base_dn = dn.NestedFrame.from_nestedpandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nestedpandas(layer_nf, npartitions=10) + + return (base_dn, layer_dn) diff --git a/tests/dask_nested/test_example_module.py b/tests/dask_nested/test_example_module.py deleted file mode 100644 index 6c5f230..0000000 --- a/tests/dask_nested/test_example_module.py +++ /dev/null @@ -1,13 +0,0 @@ -from dask_nested import example_module - - -def test_greetings() -> None: - """Verify the output of the `greetings` function""" - output = example_module.greetings() - assert output == "Hello from LINCC-Frameworks!" - - -def test_meaning() -> None: - """Verify the output of the `meaning` function""" - output = example_module.meaning() - assert output == 42 diff --git a/tests/dask_nested/test_nestedframe.py b/tests/dask_nested/test_nestedframe.py new file mode 100644 index 0000000..d6a641c --- /dev/null +++ b/tests/dask_nested/test_nestedframe.py @@ -0,0 +1,51 @@ +import dask_nested as dn +from nested_pandas.series.dtype import NestedDtype + + +def test_nestedframe_construction(test_dataset): + """test the construction of a nestedframe""" + pass + + +def test_all_columns(test_dataset): + """all_columns property test""" + pass + + +def test_nested_columns(test_dataset): + """nested_columns property test""" + pass + + +def test_add_nested(test_dataset_no_add_nested): + """test the add_nested function""" + base, layer = test_dataset_no_add_nested + + base_with_nested = base.add_nested(layer, "nested") + + # Check that the result is a nestedframe + assert isinstance(base_with_nested, dn.NestedFrame) + + # Check that there's a new nested column with the correct dtype + assert "nested" in base_with_nested.columns + assert isinstance(base_with_nested.dtypes["nested"], NestedDtype) + + # Check that the nested partitions were used + assert base_with_nested.npartitions == 10 + + assert len(base_with_nested.compute()) == 50 + + +def test_query(test_dataset): + """test the query function""" + pass + + +def test_dropna(test_dataset): + """test the dropna function""" + pass + + +def test_reduce(test_dataset): + """test the reduce function""" + pass From a2a27a8376c254e67139143570994be6f57a22be Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 10 May 2024 13:36:21 -0700 Subject: [PATCH 06/24] fill out unit test suite --- tests/dask_nested/conftest.py | 30 ++++++++++++ tests/dask_nested/test_nestedframe.py | 68 +++++++++++++++++++++++---- 2 files changed, 89 insertions(+), 9 deletions(-) diff --git a/tests/dask_nested/conftest.py b/tests/dask_nested/conftest.py index b025941..bf918b6 100644 --- a/tests/dask_nested/conftest.py +++ b/tests/dask_nested/conftest.py @@ -29,6 +29,36 @@ def test_dataset(): return base_dn.add_nested(layer_dn, "nested") +@pytest.fixture +def test_dataset_with_nans(): + """stop before add_nested""" + n_base = 50 + layer_size = 500 + randomstate = np.random.RandomState(seed=1) + + # Generate base data + a = randomstate.random(n_base) + a[10] = np.nan # add a nan + base_data = {"a": a, "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + t = randomstate.random(layer_size * n_base) * 20 + t[50] = np.nan # add a nan + + layer_data = { + "t": t, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index") + + base_dn = dn.NestedFrame.from_nestedpandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nestedpandas(layer_nf, npartitions=10) + + return base_dn.add_nested(layer_dn, "nested") + + @pytest.fixture def test_dataset_no_add_nested(): """stop before add_nested""" diff --git a/tests/dask_nested/test_nestedframe.py b/tests/dask_nested/test_nestedframe.py index d6a641c..7e56f65 100644 --- a/tests/dask_nested/test_nestedframe.py +++ b/tests/dask_nested/test_nestedframe.py @@ -1,20 +1,27 @@ import dask_nested as dn +import numpy as np +import pytest from nested_pandas.series.dtype import NestedDtype def test_nestedframe_construction(test_dataset): """test the construction of a nestedframe""" - pass + assert len(test_dataset) == 50 + assert test_dataset.columns.to_list() == ["a", "b", "nested"] + assert isinstance(test_dataset["nested"].dtype, NestedDtype) def test_all_columns(test_dataset): """all_columns property test""" - pass + all_cols = test_dataset.all_columns + + assert all_cols["base"].to_list() == test_dataset.columns.to_list() + assert all_cols["nested"] == ["t", "flux", "band"] def test_nested_columns(test_dataset): """nested_columns property test""" - pass + assert test_dataset.nested_columns == ["nested"] def test_add_nested(test_dataset_no_add_nested): @@ -36,16 +43,59 @@ def test_add_nested(test_dataset_no_add_nested): assert len(base_with_nested.compute()) == 50 -def test_query(test_dataset): - """test the query function""" - pass +def test_query_on_base(test_dataset): + """test the query function on base columns""" + + # Try a few basic queries + assert len(test_dataset.query("a > 0.5").compute()) == 22 + assert len(test_dataset.query("a > 0.5 & b > 1").compute()) == 13 + assert len(test_dataset.query("a > 2").compute()) == 0 + + +def test_query_on_nested(test_dataset): + """test the query function on nested columns""" + # Try a few nested queries + res = test_dataset.query("nested.flux>75").compute() + assert len(res.loc[1]["nested"]) == 127 -def test_dropna(test_dataset): + res = test_dataset.query("nested.band == 'g'").compute() + + assert len(res.loc[1]["nested"]) == 232 + assert len(res) == 50 # make sure the base df remains unchanged + + +def test_dropna(test_dataset_with_nans): """test the dropna function""" - pass + + nan_free_base = test_dataset_with_nans.dropna(subset=["a"]) + # should just remove one row + assert len(nan_free_base) == len(test_dataset_with_nans) - 1 + + meta = test_dataset_with_nans.loc[0].head(0).nested.nest.to_flat() + + nan_free_nested = test_dataset_with_nans.dropna(subset=["nested.t"]) + + # import pdb;pdb.set_trace() + flat_nested_nan_free = nan_free_nested.map_partitions(lambda x: x.nested.nest.to_flat(), meta=meta) + flat_nested = test_dataset_with_nans.map_partitions(lambda x: x.nested.nest.to_flat(), meta=meta) + # should just remove one row + assert len(flat_nested_nan_free) == len(flat_nested) - 1 def test_reduce(test_dataset): """test the reduce function""" - pass + + def reflect_inputs(*args): + return args + + res = test_dataset.reduce(reflect_inputs, "a", "nested.t", meta=("inputs", float)) + + assert len(res) == 50 + assert isinstance(res.compute().loc[0][0], float) + assert isinstance(res.compute().loc[0][1], np.ndarray) + + res2 = test_dataset.reduce(np.mean, "nested.flux", meta=("mean", float)) + + assert pytest.approx(res2.compute()[15], 0.1) == 53.635174 + assert pytest.approx(sum(res2.compute()), 0.1) == 2488.960119 From fe39cd4b85e2bf9ef689989da49206f527211387 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 13 May 2024 12:22:23 -0700 Subject: [PATCH 07/24] typing and pre-commit fixes --- src/dask_nested/accessor.py | 28 ++++++++++++++++------------ src/dask_nested/core.py | 30 +++++++++++++++++++----------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/src/dask_nested/accessor.py b/src/dask_nested/accessor.py index acdc0f8..f68f162 100644 --- a/src/dask_nested/accessor.py +++ b/src/dask_nested/accessor.py @@ -1,12 +1,21 @@ -from dask.dataframe.extensions import make_array_nonempty, make_scalar, register_series_accessor import nested_pandas as npd -from nested_pandas import NestedDtype, NestSeriesAccessor -import dask +from dask.dataframe.extensions import register_series_accessor +from nested_pandas import NestedDtype @register_series_accessor("nest") class DaskNestSeriesAccessor(npd.NestSeriesAccessor): - + """The nested-dask version of the nested-pandas NestSeriesAccessor. + + Note that this has a very limited implementation relative to nested-pandas. + + Parameters + ---------- + series: dd.series + A series to tie to the accessor + + """ + def __init__(self, series): self._check_series(series) @@ -14,6 +23,7 @@ def __init__(self, series): @staticmethod def _check_series(series): + """chcek the validity of the tied series dtype""" dtype = series.dtype if not isinstance(dtype, NestedDtype): raise AttributeError(f"Can only use .nest accessor with a Series of NestedDtype, got {dtype}") @@ -21,11 +31,5 @@ def _check_series(series): @property def fields(self) -> list[str]: """Names of the nested columns""" - return self._series.head(0).nest.fields - #hacky - #return self._series.partitions[0:1].map_partitions(lambda x: x.nest.fields) - #return self._series.array.field_names - - @dask.delayed - def test_fields(self): - return self._series.head(0).nest.fields + + return self._series.head(0).nest.fields # hacky diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index 2b758bb..c7b59f4 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import dask.dataframe as dd import dask_expr as dx import nested_pandas as npd @@ -8,8 +10,11 @@ from pandas._typing import AnyAll, Axis, IndexLabel from pandas.api.extensions import no_default +# need this for the base _Frame class +# mypy: disable-error-code="misc" + -class _Frame(dx.FrameBase): +class _Frame(dx.FrameBase): # type: ignore """Base class for extensions of Dask Dataframes that track additional Ensemble-related metadata. """ @@ -34,7 +39,7 @@ def __dask_postpersist__(self): return self._rebuild, (func, args) - def _rebuild(self, graph, func, args): + def _rebuild(self, graph, func, args): # type: ignore collection = func(graph, *args) return collection @@ -63,7 +68,9 @@ def __getitem__(self, key): return result @classmethod - def from_nestedpandas(cls, data, npartitions=None, chunksize=None, sort=True, label=None, ensemble=None): + def from_nestedpandas( + cls, data, npartitions=None, chunksize=None, sort=True, label=None, ensemble=None + ) -> NestedFrame: """Returns an EnsembleFrame constructed from a TapeFrame. Parameters @@ -89,10 +96,10 @@ def from_nestedpandas(cls, data, npartitions=None, chunksize=None, sort=True, la The constructed EnsembleFrame object. """ result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort) - return result + return NestedFrame.from_dask_dataframe(result) @classmethod - def from_dask_dataframe(cl, df): + def from_dask_dataframe(cls, df) -> NestedFrame: """Converts a Dask Dataframe to a Dask-Nested NestedFrame Parameters @@ -129,7 +136,7 @@ def nested_columns(self) -> list: nest_cols.append(column) return nest_cols - def add_nested(self, nested, name): + def add_nested(self, nested, name) -> NestedFrame: # type: ignore[name-defined] # noqa: F821 """Packs a dataframe into a nested column Parameters @@ -146,7 +153,7 @@ def add_nested(self, nested, name): nested = nested.map_partitions(lambda x: pack_flat(x)).rename(name) return self.join(nested, how="outer") - def query(self, expr): + def query(self, expr) -> Self: # type: ignore # noqa: F821: """ Query the columns of a NestedFrame with a boolean expression. Specified queries can target nested columns in addition to the typical column set @@ -203,7 +210,7 @@ def dropna( subset: IndexLabel | None = None, inplace: bool = False, ignore_index: bool = False, - ): + ) -> Self: # type: ignore[name-defined] # noqa: F821: """ Remove missing values for one layer of the NestedFrame. @@ -260,7 +267,6 @@ def dropna( time. """ # grab meta from head, assumes row-based operation - meta = self.head(0) return self.map_partitions( lambda x: x.dropna( axis=axis, @@ -271,10 +277,10 @@ def dropna( inplace=inplace, ignore_index=ignore_index, ), - meta=meta, + meta=self._meta, ) - def reduce(self, func, *args, meta=None, **kwargs): + def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: """ Takes a function and applies it to each top-level row of the NestedFrame. @@ -292,6 +298,8 @@ def reduce(self, func, *args, meta=None, **kwargs): args : positional arguments Positional arguments to pass to the function, the first *args should be the names of the columns to apply the function to. + meta : dataframe or series-like, optional + The dask meta of the output. kwargs : keyword arguments, optional Keyword arguments to pass to the function. From f43551a039447b60ddf108fb62955c47397c3048 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 13 May 2024 14:09:26 -0700 Subject: [PATCH 08/24] add accessor to_lists and to_flat + tests --- src/dask_nested/accessor.py | 32 +++++++++- tests/dask_nested/conftest.py | 2 +- tests/dask_nested/test_accessor.py | 93 ++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 tests/dask_nested/test_accessor.py diff --git a/src/dask_nested/accessor.py b/src/dask_nested/accessor.py index f68f162..5844845 100644 --- a/src/dask_nested/accessor.py +++ b/src/dask_nested/accessor.py @@ -1,3 +1,4 @@ +import dask.dataframe as dd import nested_pandas as npd from dask.dataframe.extensions import register_series_accessor from nested_pandas import NestedDtype @@ -13,7 +14,6 @@ class DaskNestSeriesAccessor(npd.NestSeriesAccessor): ---------- series: dd.series A series to tie to the accessor - """ def __init__(self, series): @@ -33,3 +33,33 @@ def fields(self) -> list[str]: """Names of the nested columns""" return self._series.head(0).nest.fields # hacky + + def to_lists(self, fields: list[str] | None = None) -> dd.DataFrame: + """Convert nested series into dataframe of list-array columns + + Parameters + ---------- + fields : list[str] or None, optional + Names of the fields to include. Default is None, which means all fields. + + Returns + ------- + dd.DataFrame + Dataframe of list-arrays. + """ + return self._series.map_partitions(lambda x: x.nest.to_lists(fields=fields)) + + def to_flat(self, fields: list[str] | None = None) -> dd.DataFrame: + """Convert nested series into dataframe of flat arrays + + Parameters + ---------- + fields : list[str] or None, optional + Names of the fields to include. Default is None, which means all fields. + + Returns + ------- + dd.DataFrame + Dataframe of flat arrays. + """ + return self._series.map_partitions(lambda x: x.nest.to_flat(fields=fields)) diff --git a/tests/dask_nested/conftest.py b/tests/dask_nested/conftest.py index bf918b6..9ff9611 100644 --- a/tests/dask_nested/conftest.py +++ b/tests/dask_nested/conftest.py @@ -21,7 +21,7 @@ def test_dataset(): "band": randomstate.choice(["r", "g"], size=layer_size * n_base), "index": np.arange(layer_size * n_base) % n_base, } - layer_nf = npd.NestedFrame(data=layer_data).set_index("index") + layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() base_dn = dn.NestedFrame.from_nestedpandas(base_nf, npartitions=5) layer_dn = dn.NestedFrame.from_nestedpandas(layer_nf, npartitions=10) diff --git a/tests/dask_nested/test_accessor.py b/tests/dask_nested/test_accessor.py new file mode 100644 index 0000000..10d4b7b --- /dev/null +++ b/tests/dask_nested/test_accessor.py @@ -0,0 +1,93 @@ +import pandas as pd +import pyarrow as pa +import pytest + + +def test_nest_accessor(test_dataset): + """test that the nest accessor is correctly tied to columns""" + + # Make sure that nested columns have the accessor available + assert hasattr(test_dataset.nested, "nest") + + # Make sure we get an attribute error when trying to use the wrong column + with pytest.raises(AttributeError): + test_dataset.ra.nest + + +def test_fields(test_dataset): + """test the fields accessor property""" + assert test_dataset.nested.nest.fields == ["t", "flux", "band"] + + +def test_to_flat(test_dataset): + """test the to_flat function""" + flat_ztf = test_dataset.nested.nest.to_flat() + + # check dtypes + assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + assert flat_ztf.dtypes["band"] == pd.ArrowDtype(pa.large_string()) + + # Make sure we retain all rows + assert len(flat_ztf.loc[1]) == 500 + + one_row = flat_ztf.loc[1].compute().iloc[1] + assert pytest.approx(one_row["t"], 0.01) == 5.4584 + assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + assert one_row["band"] == "r" + + +def test_to_flat_with_fields(test_dataset): + """test the to_flat function""" + flat_ztf = test_dataset.nested.nest.to_flat(fields=["t", "flux"]) + + # check dtypes + assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + + # Make sure we retain all rows + assert len(flat_ztf.loc[1]) == 500 + + one_row = flat_ztf.loc[1].compute().iloc[1] + assert pytest.approx(one_row["t"], 0.01) == 5.4584 + assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + + +def test_to_lists(test_dataset): + """test the to_lists function""" + list_ztf = test_dataset.nested.nest.to_lists() + + # check dtypes + assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_ztf.dtypes["band"] == pd.ArrowDtype(pa.list_(pa.large_string())) + + # Make sure we have a single row for an id + assert len(list_ztf.loc[1]) == 1 + + # Make sure we retain all rows -- double loc for speed and pandas get_item + assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + + # spot-check values + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 + assert list_ztf.loc[1].compute().loc[1]["band"][0] == "g" + + +def test_to_lists_with_fields(test_dataset): + """test the to_lists function""" + list_ztf = test_dataset.nested.nest.to_lists(fields=["t", "flux"]) + + # check dtypes + assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + + # Make sure we have a single row for an id + assert len(list_ztf.loc[1]) == 1 + + # Make sure we retain all rows -- double loc for speed and pandas get_item + assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + + # spot-check values + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 From 4b98df8959bead84799cee425fe629afba071a4e Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 14 May 2024 10:31:38 -0700 Subject: [PATCH 09/24] add a read_parquet test --- tests/dask_nested/test_io.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 tests/dask_nested/test_io.py diff --git a/tests/dask_nested/test_io.py b/tests/dask_nested/test_io.py new file mode 100644 index 0000000..d7f11ec --- /dev/null +++ b/tests/dask_nested/test_io.py @@ -0,0 +1,31 @@ +import dask_nested as dn + + +def test_read_parquet(test_dataset, tmp_path): + """test the reproducibility of read_parquet""" + + # Setup a temporary directory for files + nested_save_path = tmp_path / "nested" + test_save_path = tmp_path / "test_dataset" + + # Save Nested to Parquet + flat_nested = test_dataset.nested.nest.to_flat() + flat_nested.to_parquet(nested_save_path, write_index=True) + + # Save Base to Parquet + test_dataset[["a", "b"]].to_parquet(test_save_path, write_index=True) + + # Now read + base = dn.read_parquet(test_save_path, calculate_divisions=True) + nested = dn.read_parquet(nested_save_path, calculate_divisions=True) + + base = base.add_nested(nested, "nested") + + # Check the loaded dataset against the original + assert base.divisions == test_dataset.divisions # equal divisions + assert base.compute().equals(test_dataset.compute()) # equal data + + # Check the flat nested datasets + base_nested_flat = base.nested.nest.to_flat().compute() + test_nested_flat = base.nested.nest.to_flat().compute() + assert base_nested_flat.equals(test_nested_flat) From 9cdba359cc9256b5c1609df2e2ca8942097ab72e Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 14 May 2024 10:41:48 -0700 Subject: [PATCH 10/24] typing and pre-commit fixes --- src/dask_nested/backends.py | 11 ++++++----- src/dask_nested/io.py | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py index d040b19..0f4e8d3 100644 --- a/src/dask_nested/backends.py +++ b/src/dask_nested/backends.py @@ -13,20 +13,21 @@ @make_meta_dispatch.register(npd.NestedFrame) -def make_meta_frame(x, index=None): - # Create an empty NestedFrame to use as Dask's underlying object meta. +def make_meta_frame(x, index=None) -> npd.NestedFrame: + """Create an empty NestedFrame to use as Dask's underlying object meta.""" result = x.head(0) return result @meta_nonempty.register(npd.NestedFrame) -def _nonempty_nestedframe(x, index=None): - # Construct a new NestedFrame with the same underlying data. +def _nonempty_nestedframe(x, index=None) -> npd.NestedFrame: + """Construct a new NestedFrame with the same underlying data.""" df = meta_nonempty_dataframe(x) return npd.NestedFrame(df) @make_array_nonempty.register(npd.NestedDtype) -def _(dtype): +def _(dtype) -> NestedExtensionArray: + """Register a valid dtype for the NestedExtensionArray""" # must be two values return NestedExtensionArray._from_sequence([pd.NA, pd.NA], dtype=dtype) diff --git a/src/dask_nested/io.py b/src/dask_nested/io.py index 6eef903..3457d4b 100644 --- a/src/dask_nested/io.py +++ b/src/dask_nested/io.py @@ -22,7 +22,7 @@ def read_parquet( parquet_file_extension=(".parq", ".parquet", ".pq"), filesystem=None, **kwargs, -): +) -> NestedFrame: """ Read a Parquet file into a Dask DataFrame @@ -181,6 +181,7 @@ def read_parquet( It may be necessary to change this argument if the data files in your parquet dataset do not end in ".parq", ".parquet", or ".pq". filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use. + Specifies the backend to use dataset: dict, default None Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object. These options may include a "filesystem" key to configure the desired From 775bbe0d59037887f7d79d7344ded5c243d4a5c6 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 14 May 2024 11:53:22 -0700 Subject: [PATCH 11/24] ruff fixes --- src/dask_nested/io.py | 2 +- tests/dask_nested/test_accessor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dask_nested/io.py b/src/dask_nested/io.py index 3457d4b..5e0d12e 100644 --- a/src/dask_nested/io.py +++ b/src/dask_nested/io.py @@ -181,7 +181,7 @@ def read_parquet( It may be necessary to change this argument if the data files in your parquet dataset do not end in ".parq", ".parquet", or ".pq". filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use. - Specifies the backend to use + Specifies the backend to use. dataset: dict, default None Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object. These options may include a "filesystem" key to configure the desired diff --git a/tests/dask_nested/test_accessor.py b/tests/dask_nested/test_accessor.py index 10d4b7b..8cc7603 100644 --- a/tests/dask_nested/test_accessor.py +++ b/tests/dask_nested/test_accessor.py @@ -11,7 +11,7 @@ def test_nest_accessor(test_dataset): # Make sure we get an attribute error when trying to use the wrong column with pytest.raises(AttributeError): - test_dataset.ra.nest + test_dataset.ra.nest # noqa def test_fields(test_dataset): From cc6d3007b8c4a2be8199a1acd08e959696339747 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 15 May 2024 11:14:35 -0700 Subject: [PATCH 12/24] add benchmarks and dataset generation --- benchmarks/benchmarks.py | 110 +++++++++++++++++++++++-- src/dask_nested/__init__.py | 1 + src/dask_nested/datasets/__init__.py | 1 + src/dask_nested/datasets/generation.py | 42 ++++++++++ src/dask_nested/example_benchmarks.py | 14 ---- tests/dask_nested/test_datasets.py | 20 +++++ 6 files changed, 167 insertions(+), 21 deletions(-) create mode 100644 src/dask_nested/datasets/__init__.py create mode 100644 src/dask_nested/datasets/generation.py delete mode 100644 src/dask_nested/example_benchmarks.py create mode 100644 tests/dask_nested/test_datasets.py diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 71f2616..eff71d6 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -3,14 +3,110 @@ For more information on writing benchmarks: https://asv.readthedocs.io/en/stable/writing_benchmarks.html.""" -from dask_nested import example_benchmarks +import dask_nested as dn +import nested_pandas as npd +import numpy as np -def time_computation(): - """Time computations are prefixed with 'time'.""" - example_benchmarks.runtime_computation() +def _generate_benchmark_data(add_nested=True): + """generate a dataset for benchmarks""" + n_base = 100 + layer_size = 1000 -def mem_list(): - """Memory computations are prefixed with 'mem' or 'peakmem'.""" - return example_benchmarks.memory_computation() + # use provided seed, "None" acts as if no seed is provided + randomstate = np.random.RandomState(seed=1) + + # Generate base data + base_data = {"a": randomstate.random(n_base), "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + layer_data = { + "t": randomstate.random(layer_size * n_base) * 20, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() + + # Convert to Dask + base_nf = dn.NestedFrame.from_nestedpandas(base_nf).repartition(npartitions=5) + layer_nf = dn.NestedFrame.from_nestedpandas(layer_nf).repartition(npartitions=50) + + # Return based on add_nested + if add_nested: + base_nf = base_nf.add_nested(layer_nf, "nested") + return base_nf + else: + return base_nf, layer_nf + + +class NestedFrameAddNested: + """Benchmark the NestedFrame.add_nested function""" + + n_base = 100 + layer_size = 1000 + base_nf = dn.NestedFrame + layer_nf = dn.NestedFrame + + def setup(self): + """Set up the benchmark environment""" + self.base_nf, self.layer_nf = _generate_benchmark_data(add_nested=False) + + def run(self): + """Run the benchmark.""" + self.base_nf.add_nested(self.layer_nf, "nested").compute() + + def time_run(self): + """Benchmark the runtime of adding a nested layer""" + self.run() + + def peakmem_run(self): + """Benchmark the memory usage of adding a nested layer""" + self.run() + + +class NestedFrameReduce: + """Benchmark the NestedFrame.reduce function""" + + nf = dn.NestedFrame + + def setup(self): + """Set up the benchmark environment""" + self.nf = _generate_benchmark_data(add_nested=True) + + def run(self): + """Run the benchmark.""" + self.nf.reduce(np.mean, "nested.flux").compute() + + def time_run(self): + """Benchmark the runtime of applying the reduce function""" + self.run() + + def peakmem_run(self): + """Benchmark the memory usage of applying the reduce function""" + self.run() + + +class NestedFrameQuery: + """Benchmark the NestedFrame.query function""" + + nf = dn.NestedFrame + + def setup(self): + """Set up the benchmark environment""" + self.nf = _generate_benchmark_data(add_nested=True) + + def run(self): + """Run the benchmark.""" + + # Apply nested layer query + self.nf = self.nf.query("nested.band == 'g'").compute() + + def time_run(self): + """Benchmark the runtime of applying the two queries""" + self.run() + + def peakmem_run(self): + """Benchmark the memory usage of applying the two queries""" + self.run() diff --git a/src/dask_nested/__init__.py b/src/dask_nested/__init__.py index cb7fe12..faad669 100644 --- a/src/dask_nested/__init__.py +++ b/src/dask_nested/__init__.py @@ -1,3 +1,4 @@ from . import backends, accessor # noqa from .core import NestedFrame # noqa from .io import read_parquet # noqa +from .datasets import generate_data # noqa diff --git a/src/dask_nested/datasets/__init__.py b/src/dask_nested/datasets/__init__.py new file mode 100644 index 0000000..4b8e827 --- /dev/null +++ b/src/dask_nested/datasets/__init__.py @@ -0,0 +1 @@ +from .generation import * # noqa diff --git a/src/dask_nested/datasets/generation.py b/src/dask_nested/datasets/generation.py new file mode 100644 index 0000000..205ee1b --- /dev/null +++ b/src/dask_nested/datasets/generation.py @@ -0,0 +1,42 @@ +from nested_pandas import datasets + +import dask_nested as dn + + +def generate_data(n_base, n_layer, npartitions=1, seed=None) -> dn.NestedFrame: + """Generates a toy dataset. + + Docstring copied from nested-pandas. + + Parameters + ---------- + n_base : int + The number of rows to generate for the base layer + n_layer : int, or dict + The number of rows per n_base row to generate for a nested layer. + Alternatively, a dictionary of layer label, layer_size pairs may be + specified to created multiple nested columns with custom sizing. + npartitions: int + The number of partitions to split the data into. + seed : int + A seed to use for random generation of data + + Returns + ------- + NestedFrame + The constructed Dask NestedFrame. + + Examples + -------- + >>> import dask_nested as dn + >>> dn.datasets.generate_data(10,100) + >>> dn.datasets.generate_data(10, {"nested_a": 100, "nested_b": 200}) + """ + + # Use nested-pandas generator + base_nf = datasets.generate_data(n_base, n_layer, seed=seed) + + # Convert to dask-nested + base_nf = dn.NestedFrame.from_nestedpandas(base_nf).repartition(npartitions=npartitions) + + return base_nf diff --git a/src/dask_nested/example_benchmarks.py b/src/dask_nested/example_benchmarks.py deleted file mode 100644 index 5a77b06..0000000 --- a/src/dask_nested/example_benchmarks.py +++ /dev/null @@ -1,14 +0,0 @@ -"""An example module containing simplistic methods under benchmarking.""" - -import random -import time - - -def runtime_computation(): - """Runtime computation consuming between 0 and 5 seconds.""" - time.sleep(random.uniform(0, 5)) - - -def memory_computation(): - """Memory computation for a random list up to 512 samples.""" - return [0] * random.randint(0, 512) diff --git a/tests/dask_nested/test_datasets.py b/tests/dask_nested/test_datasets.py new file mode 100644 index 0000000..b174d27 --- /dev/null +++ b/tests/dask_nested/test_datasets.py @@ -0,0 +1,20 @@ +import dask_nested as dn + + +def test_generate_data(): + """test the dataset generator function""" + + # test the seed + generate_1 = dn.datasets.generate_data(10, 100, npartitions=2, seed=1) + generate_2 = dn.datasets.generate_data(10, 100, npartitions=2, seed=1) + generate_3 = dn.datasets.generate_data(10, 100, npartitions=2, seed=2) + + assert generate_1.compute().equals(generate_2.compute()) + assert not generate_1.compute().equals(generate_3.compute()) + + # test npartitions + assert generate_1.npartitions == 2 + + # test the length + assert len(generate_1) == 10 + assert len(generate_1.nested.nest.to_flat()) == 1000 From 1b02e9874b047e4ccccc87897495d287582b499d Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 16 May 2024 10:03:36 -0700 Subject: [PATCH 13/24] from_nestedpandas -> from_nested_pandas --- benchmarks/benchmarks.py | 4 ++-- src/dask_nested/core.py | 2 +- src/dask_nested/datasets/generation.py | 2 +- tests/dask_nested/conftest.py | 12 ++++++------ 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index eff71d6..e9cff75 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -30,8 +30,8 @@ def _generate_benchmark_data(add_nested=True): layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() # Convert to Dask - base_nf = dn.NestedFrame.from_nestedpandas(base_nf).repartition(npartitions=5) - layer_nf = dn.NestedFrame.from_nestedpandas(layer_nf).repartition(npartitions=50) + base_nf = dn.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5) + layer_nf = dn.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50) # Return based on add_nested if add_nested: diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index c7b59f4..d84cd93 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -68,7 +68,7 @@ def __getitem__(self, key): return result @classmethod - def from_nestedpandas( + def from_nested_pandas( cls, data, npartitions=None, chunksize=None, sort=True, label=None, ensemble=None ) -> NestedFrame: """Returns an EnsembleFrame constructed from a TapeFrame. diff --git a/src/dask_nested/datasets/generation.py b/src/dask_nested/datasets/generation.py index 205ee1b..d5b296f 100644 --- a/src/dask_nested/datasets/generation.py +++ b/src/dask_nested/datasets/generation.py @@ -37,6 +37,6 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> dn.NestedFrame: base_nf = datasets.generate_data(n_base, n_layer, seed=seed) # Convert to dask-nested - base_nf = dn.NestedFrame.from_nestedpandas(base_nf).repartition(npartitions=npartitions) + base_nf = dn.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions) return base_nf diff --git a/tests/dask_nested/conftest.py b/tests/dask_nested/conftest.py index 9ff9611..30b2196 100644 --- a/tests/dask_nested/conftest.py +++ b/tests/dask_nested/conftest.py @@ -23,8 +23,8 @@ def test_dataset(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() - base_dn = dn.NestedFrame.from_nestedpandas(base_nf, npartitions=5) - layer_dn = dn.NestedFrame.from_nestedpandas(layer_nf, npartitions=10) + base_dn = dn.NestedFrame.from_nested_pandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) return base_dn.add_nested(layer_dn, "nested") @@ -53,8 +53,8 @@ def test_dataset_with_nans(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_dn = dn.NestedFrame.from_nestedpandas(base_nf, npartitions=5) - layer_dn = dn.NestedFrame.from_nestedpandas(layer_nf, npartitions=10) + base_dn = dn.NestedFrame.from_nested_pandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) return base_dn.add_nested(layer_dn, "nested") @@ -78,7 +78,7 @@ def test_dataset_no_add_nested(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_dn = dn.NestedFrame.from_nestedpandas(base_nf, npartitions=5) - layer_dn = dn.NestedFrame.from_nestedpandas(layer_nf, npartitions=10) + base_dn = dn.NestedFrame.from_nested_pandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) return (base_dn, layer_dn) From 7a12a96a322ce3f2b1613dbe5be24e982a35d5c5 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 16 May 2024 15:59:42 -0700 Subject: [PATCH 14/24] add to_parquet --- src/dask_nested/core.py | 58 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index d84cd93..0c538d6 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os + import dask.dataframe as dd import dask_expr as dx import nested_pandas as npd @@ -318,3 +320,59 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: # apply nested_pandas reduce via map_partitions return self.map_partitions(lambda x: x.reduce(func, *args, **kwargs), meta=meta) + + def to_parquet(self, path, by_layer=False, **kwargs) -> None: + """Creates parquet file(s) with the data of a NestedFrame, either + as a single parquet file directory where each nested dataset is packed + into its own column or as an individual parquet file directory for each + layer. + + Docstring copied from nested-pandas. + + Note that here we always opt to use the pyarrow engine for writing + parquet files. + + Parameters + ---------- + path : str + The path to the parquet directory to be written if 'by_layer' is + False. If 'by_layer' is True, this should be the path to an + existing directory. + by_layer : bool, default False + If False, writes the entire NestedFrame to a single parquet + directory. + + If True, writes each layer to a separate parquet sub-directory + within the directory specified by path. The filename for each + outputted file will be named after its layer. For example for the + base layer this is always "base". + kwargs : keyword arguments, optional + Keyword arguments to pass to the function. + + Returns + ------- + None + """ + + # code copied from nested-pandas rather than wrapped + # reason being that a map_partitions call is probably not well-behaved here? + + if not by_layer: + # We just defer to the pandas to_parquet method if we're not writing by layer + # or there is only one layer in the NestedFrame. + super().to_parquet(path, engine="pyarrow", **kwargs) + else: + # If we're writing by layer, path must be an existing directory + if not os.path.isdir(path): + raise ValueError("The provided path must be an existing directory if by_layer=True") + + # Write the base layer to a parquet file + base_frame = self.drop(columns=self.nested_columns) + base_frame.to_parquet(os.path.join(path, "base"), by_layer=False, **kwargs) + + # Write each nested layer to a parquet file + for layer in self.all_columns: + if layer != "base": + path_layer = os.path.join(path, f"{layer}") + self[layer].nest.to_flat().to_parquet(path_layer, engine="pyarrow", **kwargs) + return None From 3068845e6197e16723cadb1fe09aca110fc5182c Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 17 May 2024 10:39:32 -0700 Subject: [PATCH 15/24] add to_parquet --- src/dask_nested/core.py | 25 +++++++++----- tests/dask_nested/test_nestedframe.py | 47 ++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 10 deletions(-) diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index 0c538d6..adfb3d7 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -321,7 +321,7 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: # apply nested_pandas reduce via map_partitions return self.map_partitions(lambda x: x.reduce(func, *args, **kwargs), meta=meta) - def to_parquet(self, path, by_layer=False, **kwargs) -> None: + def to_parquet(self, path, by_layer=True, **kwargs) -> None: """Creates parquet file(s) with the data of a NestedFrame, either as a single parquet file directory where each nested dataset is packed into its own column or as an individual parquet file directory for each @@ -335,10 +335,8 @@ def to_parquet(self, path, by_layer=False, **kwargs) -> None: Parameters ---------- path : str - The path to the parquet directory to be written if 'by_layer' is - False. If 'by_layer' is True, this should be the path to an - existing directory. - by_layer : bool, default False + The path to the parquet directory to be written. + by_layer : bool, default True If False, writes the entire NestedFrame to a single parquet directory. @@ -358,14 +356,23 @@ def to_parquet(self, path, by_layer=False, **kwargs) -> None: # reason being that a map_partitions call is probably not well-behaved here? if not by_layer: + # Todo: Investigate this more + # Divisions cannot be generated from a parquet file that stores + # nested information without a reset_index().set_index() loop. It + # seems like this happens at the to_parquet level rather than + # in read_parquet as dropping the nested columns from the dataframe + # to save does enable divisions to be found, but removing the + # nested columns from the set of columns to load does not. + # Divisions are going to be crucial, and so I think it's best to + # not support this until this is resolved. However the non-by_layer + # mode is needed for by_layer so it may be best to just settle for + # changing the default and filing a higher-priority bug. + # raise NotImplementedError + # We just defer to the pandas to_parquet method if we're not writing by layer # or there is only one layer in the NestedFrame. super().to_parquet(path, engine="pyarrow", **kwargs) else: - # If we're writing by layer, path must be an existing directory - if not os.path.isdir(path): - raise ValueError("The provided path must be an existing directory if by_layer=True") - # Write the base layer to a parquet file base_frame = self.drop(columns=self.nested_columns) base_frame.to_parquet(os.path.join(path, "base"), by_layer=False, **kwargs) diff --git a/tests/dask_nested/test_nestedframe.py b/tests/dask_nested/test_nestedframe.py index 7e56f65..7f2a4fa 100644 --- a/tests/dask_nested/test_nestedframe.py +++ b/tests/dask_nested/test_nestedframe.py @@ -76,7 +76,6 @@ def test_dropna(test_dataset_with_nans): nan_free_nested = test_dataset_with_nans.dropna(subset=["nested.t"]) - # import pdb;pdb.set_trace() flat_nested_nan_free = nan_free_nested.map_partitions(lambda x: x.nested.nest.to_flat(), meta=meta) flat_nested = test_dataset_with_nans.map_partitions(lambda x: x.nested.nest.to_flat(), meta=meta) # should just remove one row @@ -99,3 +98,49 @@ def reflect_inputs(*args): assert pytest.approx(res2.compute()[15], 0.1) == 53.635174 assert pytest.approx(sum(res2.compute()), 0.1) == 2488.960119 + + +def test_to_parquet_combined(test_dataset, tmp_path): + """test to_parquet when saving all layers to a single directory""" + + test_save_path = tmp_path / "test_dataset" + + # send to parquet + test_dataset.to_parquet(test_save_path, by_layer=False) + + # load back from parquet + loaded_dataset = dn.read_parquet(test_save_path, calculate_divisions=True) + # todo: file bug for this and investigate + loaded_dataset = loaded_dataset.reset_index().set_index("index") + + # Check for equivalence + assert test_dataset.divisions == loaded_dataset.divisions + + test_dataset = test_dataset.compute() + loaded_dataset = loaded_dataset.compute() + + assert test_dataset.equals(loaded_dataset) + + +def test_to_parquet_by_layer(test_dataset, tmp_path): + """test to_parquet when saving layers to subdirectories""" + + test_save_path = tmp_path / "test_dataset" + + # send to parquet + test_dataset.to_parquet(test_save_path, by_layer=True, write_index=True) + + # load back from parquet + loaded_base = dn.read_parquet(test_save_path / "base", calculate_divisions=True) + loaded_nested = dn.read_parquet(test_save_path / "nested", calculate_divisions=True) + + loaded_dataset = loaded_base.add_nested(loaded_nested, "nested") + + # Check for equivalence + assert test_dataset.divisions == loaded_dataset.divisions + + test_dataset = test_dataset.compute() + loaded_dataset = loaded_dataset.compute() + + # import pdb; pdb.set_trace() + assert test_dataset.equals(loaded_dataset) From a68af561f1bce86d67919d8148ca903a847666ea Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 17 May 2024 10:49:43 -0700 Subject: [PATCH 16/24] add docstring note on by_layer=False issues --- src/dask_nested/core.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index adfb3d7..dfa58d5 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -337,6 +337,11 @@ def to_parquet(self, path, by_layer=True, **kwargs) -> None: path : str The path to the parquet directory to be written. by_layer : bool, default True + NOTE: by_layer=False will not reliably preserve divisions currently, + be warned when using it that loading from such a dataset will + likely require you to reset and set the index to generate divisions + information. + If False, writes the entire NestedFrame to a single parquet directory. From 583eadd0ad643cf3bbf243267b4a1dfd8d10e898 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 17 May 2024 15:18:06 -0700 Subject: [PATCH 17/24] add dependencies --- pyproject.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index c5ac0c4..0e4d535 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,11 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ + 'nested-pandas', + 'numpy', + 'dask>=2024.3.0', + 'dask[distributed]>=2024.3.0', + 'pyarrow', ] [project.urls] From 5993c7e449feb5ab8fae9cf5992fcfb8e2d39a78 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 17 May 2024 15:23:07 -0700 Subject: [PATCH 18/24] add dask_expr --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 0e4d535..bf8e911 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ 'numpy', 'dask>=2024.3.0', 'dask[distributed]>=2024.3.0', + 'dask_expr', 'pyarrow', ] From 687d8448070005d51bb49c96bebf88a5ad1ddf04 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 17 May 2024 15:32:26 -0700 Subject: [PATCH 19/24] add annotations for python 3.9 --- src/dask_nested/accessor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/dask_nested/accessor.py b/src/dask_nested/accessor.py index 5844845..11082fe 100644 --- a/src/dask_nested/accessor.py +++ b/src/dask_nested/accessor.py @@ -1,3 +1,6 @@ +# Python 3.9 doesn't support "|" for types +from __future__ import annotations + import dask.dataframe as dd import nested_pandas as npd from dask.dataframe.extensions import register_series_accessor From 5ba990413fa3b2a125ed6ab5d41771e78fbc27d8 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 17 May 2024 15:35:26 -0700 Subject: [PATCH 20/24] add annotations for python 3.9 --- src/dask_nested/backends.py | 3 +++ src/dask_nested/io.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py index 0f4e8d3..dd03890 100644 --- a/src/dask_nested/backends.py +++ b/src/dask_nested/backends.py @@ -1,3 +1,6 @@ +# Python 3.9 doesn't support "|" for types +from __future__ import annotations + import nested_pandas as npd import pandas as pd from dask.dataframe.backends import meta_nonempty_dataframe diff --git a/src/dask_nested/io.py b/src/dask_nested/io.py index 5e0d12e..325c450 100644 --- a/src/dask_nested/io.py +++ b/src/dask_nested/io.py @@ -1,3 +1,6 @@ +# Python 3.9 doesn't support "|" for types +from __future__ import annotations + import dask.dataframe as dd from .core import NestedFrame From 28020c9a0beaeb7ff80855257609e728cb68dd2f Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 17 May 2024 15:47:01 -0700 Subject: [PATCH 21/24] remove pdb --- tests/dask_nested/test_nestedframe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/dask_nested/test_nestedframe.py b/tests/dask_nested/test_nestedframe.py index 7f2a4fa..5dfed86 100644 --- a/tests/dask_nested/test_nestedframe.py +++ b/tests/dask_nested/test_nestedframe.py @@ -142,5 +142,4 @@ def test_to_parquet_by_layer(test_dataset, tmp_path): test_dataset = test_dataset.compute() loaded_dataset = loaded_dataset.compute() - # import pdb; pdb.set_trace() assert test_dataset.equals(loaded_dataset) From 8192366cf63709b9339f398fabeb28a0b5b8420f Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 20 May 2024 14:17:46 -0700 Subject: [PATCH 22/24] address review comments --- src/dask_nested/backends.py | 6 ++++- src/dask_nested/core.py | 45 +++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py index dd03890..991d182 100644 --- a/src/dask_nested/backends.py +++ b/src/dask_nested/backends.py @@ -14,6 +14,9 @@ get_collection_type.register(npd.NestedFrame, lambda _: NestedFrame) +# The following dispatch functions are defined as per the Dask extension guide: +# https://docs.dask.org/en/latest/dataframe-extend.html + @make_meta_dispatch.register(npd.NestedFrame) def make_meta_frame(x, index=None) -> npd.NestedFrame: @@ -32,5 +35,6 @@ def _nonempty_nestedframe(x, index=None) -> npd.NestedFrame: @make_array_nonempty.register(npd.NestedDtype) def _(dtype) -> NestedExtensionArray: """Register a valid dtype for the NestedExtensionArray""" - # must be two values + # must be two values to avoid a length error in meta inference + # Dask seems to explicitly require meta dtypes to have length 2. return NestedExtensionArray._from_sequence([pd.NA, pd.NA], dtype=dtype) diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index dfa58d5..2f3cc80 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -17,13 +17,11 @@ class _Frame(dx.FrameBase): # type: ignore - """Base class for extensions of Dask Dataframes that track additional - Ensemble-related metadata. - """ + """Base class for extensions of Dask Dataframes.""" _partition_type = npd.NestedFrame - def __init__(self, expr, label=None, ensemble=None): + def __init__(self, expr): super().__init__(expr) @property @@ -49,18 +47,14 @@ def _rebuild(self, graph, func, args): # type: ignore class NestedFrame( _Frame, dd.DataFrame ): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0) - """An extension for a Dask Dataframe for Nested. - - The underlying non-parallel dataframes are TapeFrames and TapeSeries which extend Pandas frames. + """An extension for a Dask Dataframe that has Nested-Pandas functionality. Examples - ---------- - Instatiation:: - - import tape - ens = tape.Ensemble() - data = {...} # Some data you want tracked by the Ensemble - ensemble_frame = tape.EnsembleFrame.from_dict(data, label="my_frame", ensemble=ens) + -------- + >>> import nested_dask as nd + >>> base = nd.NestedFrame(base_data) + >>> layer = nd.NestedFrame(layer_data) + >>> base.add_nested(layer, "layer") """ _partition_type = npd.NestedFrame # Tracks the underlying data type @@ -71,14 +65,19 @@ def __getitem__(self, key): @classmethod def from_nested_pandas( - cls, data, npartitions=None, chunksize=None, sort=True, label=None, ensemble=None + cls, + data, + npartitions=None, + chunksize=None, + sort=True, ) -> NestedFrame: - """Returns an EnsembleFrame constructed from a TapeFrame. + """Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas + NestedFrame. Parameters ---------- - data: `TapeFrame` - Frame containing the underlying data fro the EnsembleFram + data: `NestedFrame` + Nested-Pandas NestedFrame containing the underlying data npartitions: `int`, optional The number of partitions of the index to create. Note that depending on the size and index of the dataframe, the output may have fewer @@ -87,15 +86,11 @@ def from_nested_pandas( Size of the individual chunks of data in non-parallel objects that make up Dask frames. sort: `bool`, optional Whether to sort the frame by a default index. - label: `str`, optional - The label used to by the Ensemble to identify the frame. - ensemble: `tape.Ensemble`, optional - A link to the Ensemble object that owns this frame. Returns ---------- - result: `tape.EnsembleFrame` - The constructed EnsembleFrame object. + result: `NestedFrame` + The constructed Dask-Nested NestedFrame object. """ result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort) return NestedFrame.from_dask_dataframe(result) @@ -268,7 +263,7 @@ def dropna( to a single layer, multi-layer operations are not supported at this time. """ - # grab meta from head, assumes row-based operation + # propagate meta, assumes row-based operation return self.map_partitions( lambda x: x.dropna( axis=axis, From f6a21e3183d9bbb7f8510b9688bcf93aaa71396c Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 21 May 2024 09:29:20 -0700 Subject: [PATCH 23/24] address review comments --- src/dask_nested/accessor.py | 2 +- src/dask_nested/backends.py | 4 +++- src/dask_nested/core.py | 24 +++++++++++++++++++++--- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/dask_nested/accessor.py b/src/dask_nested/accessor.py index 11082fe..ceb8fb9 100644 --- a/src/dask_nested/accessor.py +++ b/src/dask_nested/accessor.py @@ -35,7 +35,7 @@ def _check_series(series): def fields(self) -> list[str]: """Names of the nested columns""" - return self._series.head(0).nest.fields # hacky + return list(self._series.dtype.fields) def to_lists(self, fields: list[str] | None = None) -> dd.DataFrame: """Convert nested series into dataframe of list-array columns diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py index 991d182..bd589f2 100644 --- a/src/dask_nested/backends.py +++ b/src/dask_nested/backends.py @@ -21,7 +21,9 @@ @make_meta_dispatch.register(npd.NestedFrame) def make_meta_frame(x, index=None) -> npd.NestedFrame: """Create an empty NestedFrame to use as Dask's underlying object meta.""" - result = x.head(0) + + dtypes = x.dtypes.to_dict() + result = npd.NestedFrame({key: pd.Series(dtype=d) for key, d in dtypes.items()}) return result diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py index 2f3cc80..a93b3da 100644 --- a/src/dask_nested/core.py +++ b/src/dask_nested/core.py @@ -83,7 +83,9 @@ def from_nested_pandas( the size and index of the dataframe, the output may have fewer partitions than requested. chunksize: `int`, optional - Size of the individual chunks of data in non-parallel objects that make up Dask frames. + The desired number of rows per index partition to use. Note that + depending on the size and index of the dataframe, actual partition + sizes may vary. sort: `bool`, optional Whether to sort the frame by a default index. @@ -133,7 +135,7 @@ def nested_columns(self) -> list: nest_cols.append(column) return nest_cols - def add_nested(self, nested, name) -> NestedFrame: # type: ignore[name-defined] # noqa: F821 + def add_nested(self, nested, name, how="outer") -> NestedFrame: # type: ignore[name-defined] # noqa: F821 """Packs a dataframe into a nested column Parameters @@ -142,13 +144,29 @@ def add_nested(self, nested, name) -> NestedFrame: # type: ignore[name-defined] A flat dataframe to pack into a nested column name: The name given to the nested column + how: {‘left’, ‘right’, ‘outer’, ‘inner’, ‘cross’}, default ‘outer’ + How to handle the operation of the two objects. + + * left: use calling frame’s index (or column if on is specified) + + * right: use other’s index. + + * outer: form union of calling frame’s index (or column if on is + specified) with other’s index, and sort it lexicographically. + + * inner: form intersection of calling frame’s index (or column if + on is specified) with other’s index, preserving the order of the + calling’s one. + + * cross: creates the cartesian product from both frames, preserves + the order of the left keys. Returns ------- `dask_nested.NestedFrame` """ nested = nested.map_partitions(lambda x: pack_flat(x)).rename(name) - return self.join(nested, how="outer") + return self.join(nested, how=how) def query(self, expr) -> Self: # type: ignore # noqa: F821: """ From 414d15d67f374f0827dd3e51b6543c86a7a5d246 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 21 May 2024 09:39:43 -0700 Subject: [PATCH 24/24] add meta to reduce benchmark --- benchmarks/benchmarks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index e9cff75..c07f397 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -6,6 +6,7 @@ import dask_nested as dn import nested_pandas as npd import numpy as np +import pandas as pd def _generate_benchmark_data(add_nested=True): @@ -77,7 +78,8 @@ def setup(self): def run(self): """Run the benchmark.""" - self.nf.reduce(np.mean, "nested.flux").compute() + meta = pd.Series(name="mean", dtype=float) + self.nf.reduce(np.mean, "nested.flux", meta=meta).compute() def time_run(self): """Benchmark the runtime of applying the reduce function"""