From 9ba0f7c572039d1edfd83f4469e69f8ae807708c Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 3 Jul 2024 10:37:38 -0700 Subject: [PATCH 1/2] add wrapper functions --- docs/tutorials/loading_data.ipynb | 6 +- docs/tutorials/work_with_lsdb.ipynb | 2 +- src/nested_dask/core.py | 117 ++++++++++++++++++++++++- src/nested_dask/datasets/generation.py | 2 +- tests/nested_dask/conftest.py | 12 +-- tests/nested_dask/test_nestedframe.py | 51 +++++++++++ 6 files changed, 176 insertions(+), 14 deletions(-) diff --git a/docs/tutorials/loading_data.ipynb b/docs/tutorials/loading_data.ipynb index ddd9843..242314a 100644 --- a/docs/tutorials/loading_data.ipynb +++ b/docs/tutorials/loading_data.ipynb @@ -28,7 +28,7 @@ "source": [ "## From Nested-Pandas\n", "\n", - "Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_nested_pandas` class function." + "Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_pandas` class function." ] }, { @@ -48,7 +48,7 @@ "nf = nf.add_nested(nested, \"nested\")\n", "\n", "# Convert to Nested-Dask NestedFrame\n", - "nf = nd.NestedFrame.from_nested_pandas(nf)\n", + "nf = nd.NestedFrame.from_pandas(nf)\n", "nf" ] }, @@ -225,7 +225,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/docs/tutorials/work_with_lsdb.ipynb b/docs/tutorials/work_with_lsdb.ipynb index f028496..c3fed88 100644 --- a/docs/tutorials/work_with_lsdb.ipynb +++ b/docs/tutorials/work_with_lsdb.ipynb @@ -439,7 +439,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.6" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index 814aadc..13f418e 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -64,7 +64,7 @@ def __getitem__(self, key): return result @classmethod - def from_nested_pandas( + def from_pandas( cls, data, npartitions=None, @@ -72,11 +72,11 @@ def from_nested_pandas( sort=True, ) -> NestedFrame: """Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas - NestedFrame. + NestedFrame or Pandas DataFrame. Parameters ---------- - data: `NestedFrame` + data: `NestedFrame` or `DataFrame` Nested-Pandas NestedFrame containing the underlying data npartitions: `int`, optional The number of partitions of the index to create. Note that depending on @@ -112,6 +112,117 @@ def from_dask_dataframe(cls, df: dd.DataFrame) -> NestedFrame: """ return df.map_partitions(npd.NestedFrame, meta=npd.NestedFrame(df._meta.copy())) + @classmethod + def from_delayed(cls, dfs, meta=None, divisions=None, prefix="from-delayed", verify_meta=True): + """ + Create Nested-Dask NestedFrames from many Dask Delayed objects. + + Docstring is copied from `dask.dataframe.from_delayed`. + + Parameters + ---------- + dfs : + A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either + of these objects, e.g. returned by ``client.submit``. These comprise the + individual partitions of the resulting dataframe. + If a single object is provided (not an iterable), then the resulting dataframe + will have only one partition. + meta: + An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of + the output. This metadata is necessary for many algorithms in dask dataframe + to work. For ease of use, some alternative inputs are also available. Instead of a + DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that + the order of the names should match the order of the columns). Instead of a series, a tuple of + (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead + to unexpected results, so providing meta is recommended. For more information, see + dask.dataframe.utils.make_meta. + divisions : + Partition boundaries along the index. + For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions + For string 'sorted' will compute the delayed values to find index + values. Assumes that the indexes are mutually sorted. + If None, then won't use index information + prefix : + Prefix to prepend to the keys. + verify_meta : + If True check that the partitions have consistent metadata, defaults to True. + + """ + nf = dd.from_delayed(dfs=dfs, meta=meta, divisions=divisions, prefix=prefix, verify_meta=verify_meta) + return NestedFrame.from_dask_dataframe(nf) + + @classmethod + def from_map( + cls, + func, + *iterables, + args=None, + meta=None, + divisions=None, + label=None, + enforce_metadata=True, + **kwargs, + ): + """ + Create a DataFrame collection from a custom function map + + WARNING: The ``from_map`` API is experimental, and stability is not + yet guaranteed. Use at your own risk! + + Parameters + ---------- + func : callable + Function used to create each partition. If ``func`` satisfies the + ``DataFrameIOFunction`` protocol, column projection will be enabled. + *iterables : Iterable objects + Iterable objects to map to each output partition. All iterables must + be the same length. This length determines the number of partitions + in the output collection (only one element of each iterable will + be passed to ``func`` for each partition). + args : list or tuple, optional + Positional arguments to broadcast to each output partition. Note + that these arguments will always be passed to ``func`` after the + ``iterables`` positional arguments. + meta: + An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of + the output. This metadata is necessary for many algorithms in dask dataframe + to work. For ease of use, some alternative inputs are also available. Instead of a + DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that + the order of the names should match the order of the columns). Instead of a series, a tuple of + (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead + to unexpected results, so providing meta is recommended. For more information, see + dask.dataframe.utils.make_meta. + divisions : tuple, str, optional + Partition boundaries along the index. + For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions + For string 'sorted' will compute the delayed values to find index + values. Assumes that the indexes are mutually sorted. + If None, then won't use index information + label : str, optional + String to use as the function-name label in the output + collection-key names. + enforce_metadata : bool, default True + Whether to enforce at runtime that the structure of the DataFrame + produced by ``func`` actually matches the structure of ``meta``. + This will rename and reorder columns for each partition, + and will raise an error if this doesn't work, + but it won't raise if dtypes don't match. + **kwargs: + Key-word arguments to broadcast to each output partition. These + same arguments will be passed to ``func`` for every output partition. + """ + nf = dd.from_map( + func, + *iterables, + args=args, + meta=meta, + divisions=divisions, + label=label, + enforce_metadata=enforce_metadata, + **kwargs, + ) + return NestedFrame.from_dask_dataframe(nf) + def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series.""" return npd.NestedFrame(super().compute(**kwargs)) diff --git a/src/nested_dask/datasets/generation.py b/src/nested_dask/datasets/generation.py index 6c283f3..70873d7 100644 --- a/src/nested_dask/datasets/generation.py +++ b/src/nested_dask/datasets/generation.py @@ -37,7 +37,7 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> nd.NestedFrame: base_nf = datasets.generate_data(n_base, n_layer, seed=seed) # Convert to nested-dask - base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions) + base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=npartitions) return base_nf diff --git a/tests/nested_dask/conftest.py b/tests/nested_dask/conftest.py index 778553b..bdcfad7 100644 --- a/tests/nested_dask/conftest.py +++ b/tests/nested_dask/conftest.py @@ -23,8 +23,8 @@ def test_dataset(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return base_nd.add_nested(layer_nd, "nested") @@ -53,8 +53,8 @@ def test_dataset_with_nans(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return base_nd.add_nested(layer_nd, "nested") @@ -78,7 +78,7 @@ def test_dataset_no_add_nested(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return (base_nd, layer_nd) diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index cc21a4d..d110ecd 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -1,5 +1,6 @@ import dask.dataframe as dd import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pytest @@ -186,3 +187,53 @@ def test_from_epyc(): # just make sure the result was successfully computed assert len(result) == 9817 + + +@pytest.mark.parametrize("pkg", ["pandas", "nested-pandas"]) +@pytest.mark.parametrize("with_nested", [True, False]) +def test_from_pandas(pkg, with_nested): + """Test that from_pandas returns a NestedFrame""" + + if pkg == "pandas": + df = pd.DataFrame({"a": [1, 2, 3]}, index=[1, 2, 3]) + elif pkg == "nested-pandas": + df = npd.NestedFrame({"a": [1, 2, 3]}, index=[1, 2, 3]) + if with_nested: + nested = npd.NestedFrame({"b": [5, 10, 15, 20, 25, 30]}, index=[1, 1, 2, 2, 3, 3]) + df = df.add_nested(nested, "nested") + + ndf = nd.NestedFrame.from_pandas(df) + assert isinstance(ndf, nd.NestedFrame) + + +@pytest.mark.parametrize("with_nested", [True, False]) +def test_from_delayed(with_nested): + """Test that from_delayed returns a NestedFrame""" + + nf = nd.datasets.generate_data(10, 10) + if not with_nested: + nf = nf.drop("nested", axis=1) + + delayed = nf.to_delayed() + + ndf = nd.NestedFrame.from_delayed(dfs=delayed, meta=nf._meta) + assert isinstance(ndf, nd.NestedFrame) + + +def test_from_map(test_dataset, tmp_path): + """Test that from_map returns a NestedFrame""" + + # Setup a temporary directory for files + test_save_path = tmp_path / "test_dataset" + + # Save Base to Parquet + test_dataset[["a", "b"]].to_parquet(test_save_path, write_index=True) + + # Load from_map + paths = [ + tmp_path / "test_dataset" / "0.parquet", + tmp_path / "test_dataset" / "1.parquet", + tmp_path / "test_dataset" / "2.parquet", + ] + ndf = nd.NestedFrame.from_map(nd.read_parquet, paths, meta=test_dataset[["a", "b"]]._meta) + assert isinstance(ndf, nd.NestedFrame) From ddd764a7e871788ab0b999c7220500cf3efe240e Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 3 Jul 2024 10:44:42 -0700 Subject: [PATCH 2/2] from_pandas switch --- benchmarks/benchmarks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 26fc7be..a220f98 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -31,8 +31,8 @@ def _generate_benchmark_data(add_nested=True): layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() # Convert to Dask - base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5) - layer_nf = nd.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50) + base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=5) + layer_nf = nd.NestedFrame.from_pandas(layer_nf).repartition(npartitions=50) # Return based on add_nested if add_nested: