Skip to content

Commit

Permalink
Merge pull request #41 from lincc-frameworks/more_wrappers
Browse files Browse the repository at this point in the history
add wrapper functions: `from_pandas`, `from_delayed`, `from_map`
  • Loading branch information
dougbrn authored Jul 3, 2024
2 parents 60cae9a + ddd764a commit 73e988c
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 16 deletions.
4 changes: 2 additions & 2 deletions benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def _generate_benchmark_data(add_nested=True):
layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index()

# Convert to Dask
base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5)
layer_nf = nd.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50)
base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=5)
layer_nf = nd.NestedFrame.from_pandas(layer_nf).repartition(npartitions=50)

# Return based on add_nested
if add_nested:
Expand Down
6 changes: 3 additions & 3 deletions docs/tutorials/loading_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"source": [
"## From Nested-Pandas\n",
"\n",
"Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_nested_pandas` class function."
"Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_pandas` class function."
]
},
{
Expand All @@ -48,7 +48,7 @@
"nf = nf.add_nested(nested, \"nested\")\n",
"\n",
"# Convert to Nested-Dask NestedFrame\n",
"nf = nd.NestedFrame.from_nested_pandas(nf)\n",
"nf = nd.NestedFrame.from_pandas(nf)\n",
"nf"
]
},
Expand Down Expand Up @@ -225,7 +225,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
"version": "3.10.11"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/work_with_lsdb.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
"version": "3.10.11"
}
},
"nbformat": 4,
Expand Down
117 changes: 114 additions & 3 deletions src/nested_dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ def __getitem__(self, key):
return result

@classmethod
def from_nested_pandas(
def from_pandas(
cls,
data,
npartitions=None,
chunksize=None,
sort=True,
) -> NestedFrame:
"""Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas
NestedFrame.
NestedFrame or Pandas DataFrame.
Parameters
----------
data: `NestedFrame`
data: `NestedFrame` or `DataFrame`
Nested-Pandas NestedFrame containing the underlying data
npartitions: `int`, optional
The number of partitions of the index to create. Note that depending on
Expand Down Expand Up @@ -112,6 +112,117 @@ def from_dask_dataframe(cls, df: dd.DataFrame) -> NestedFrame:
"""
return df.map_partitions(npd.NestedFrame, meta=npd.NestedFrame(df._meta.copy()))

@classmethod
def from_delayed(cls, dfs, meta=None, divisions=None, prefix="from-delayed", verify_meta=True):
"""
Create Nested-Dask NestedFrames from many Dask Delayed objects.
Docstring is copied from `dask.dataframe.from_delayed`.
Parameters
----------
dfs :
A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either
of these objects, e.g. returned by ``client.submit``. These comprise the
individual partitions of the resulting dataframe.
If a single object is provided (not an iterable), then the resulting dataframe
will have only one partition.
meta:
An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of
the output. This metadata is necessary for many algorithms in dask dataframe
to work. For ease of use, some alternative inputs are also available. Instead of a
DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that
the order of the names should match the order of the columns). Instead of a series, a tuple of
(name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead
to unexpected results, so providing meta is recommended. For more information, see
dask.dataframe.utils.make_meta.
divisions :
Partition boundaries along the index.
For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
For string 'sorted' will compute the delayed values to find index
values. Assumes that the indexes are mutually sorted.
If None, then won't use index information
prefix :
Prefix to prepend to the keys.
verify_meta :
If True check that the partitions have consistent metadata, defaults to True.
"""
nf = dd.from_delayed(dfs=dfs, meta=meta, divisions=divisions, prefix=prefix, verify_meta=verify_meta)
return NestedFrame.from_dask_dataframe(nf)

@classmethod
def from_map(
cls,
func,
*iterables,
args=None,
meta=None,
divisions=None,
label=None,
enforce_metadata=True,
**kwargs,
):
"""
Create a DataFrame collection from a custom function map
WARNING: The ``from_map`` API is experimental, and stability is not
yet guaranteed. Use at your own risk!
Parameters
----------
func : callable
Function used to create each partition. If ``func`` satisfies the
``DataFrameIOFunction`` protocol, column projection will be enabled.
*iterables : Iterable objects
Iterable objects to map to each output partition. All iterables must
be the same length. This length determines the number of partitions
in the output collection (only one element of each iterable will
be passed to ``func`` for each partition).
args : list or tuple, optional
Positional arguments to broadcast to each output partition. Note
that these arguments will always be passed to ``func`` after the
``iterables`` positional arguments.
meta:
An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of
the output. This metadata is necessary for many algorithms in dask dataframe
to work. For ease of use, some alternative inputs are also available. Instead of a
DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that
the order of the names should match the order of the columns). Instead of a series, a tuple of
(name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead
to unexpected results, so providing meta is recommended. For more information, see
dask.dataframe.utils.make_meta.
divisions : tuple, str, optional
Partition boundaries along the index.
For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
For string 'sorted' will compute the delayed values to find index
values. Assumes that the indexes are mutually sorted.
If None, then won't use index information
label : str, optional
String to use as the function-name label in the output
collection-key names.
enforce_metadata : bool, default True
Whether to enforce at runtime that the structure of the DataFrame
produced by ``func`` actually matches the structure of ``meta``.
This will rename and reorder columns for each partition,
and will raise an error if this doesn't work,
but it won't raise if dtypes don't match.
**kwargs:
Key-word arguments to broadcast to each output partition. These
same arguments will be passed to ``func`` for every output partition.
"""
nf = dd.from_map(
func,
*iterables,
args=args,
meta=meta,
divisions=divisions,
label=label,
enforce_metadata=enforce_metadata,
**kwargs,
)
return NestedFrame.from_dask_dataframe(nf)

def compute(self, **kwargs):
"""Compute this Dask collection, returning the underlying dataframe or series."""
return npd.NestedFrame(super().compute(**kwargs))
Expand Down
2 changes: 1 addition & 1 deletion src/nested_dask/datasets/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> nd.NestedFrame:
base_nf = datasets.generate_data(n_base, n_layer, seed=seed)

# Convert to nested-dask
base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions)
base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=npartitions)

return base_nf

Expand Down
12 changes: 6 additions & 6 deletions tests/nested_dask/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def test_dataset():
}
layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index()

base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10)
base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10)

return base_nd.add_nested(layer_nd, "nested")

Expand Down Expand Up @@ -53,8 +53,8 @@ def test_dataset_with_nans():
}
layer_nf = npd.NestedFrame(data=layer_data).set_index("index")

base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10)
base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10)

return base_nd.add_nested(layer_nd, "nested")

Expand All @@ -78,7 +78,7 @@ def test_dataset_no_add_nested():
}
layer_nf = npd.NestedFrame(data=layer_data).set_index("index")

base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10)
base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10)

return (base_nd, layer_nd)
51 changes: 51 additions & 0 deletions tests/nested_dask/test_nestedframe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dask.dataframe as dd
import nested_dask as nd
import nested_pandas as npd
import numpy as np
import pandas as pd
import pytest
Expand Down Expand Up @@ -186,3 +187,53 @@ def test_from_epyc():

# just make sure the result was successfully computed
assert len(result) == 9817


@pytest.mark.parametrize("pkg", ["pandas", "nested-pandas"])
@pytest.mark.parametrize("with_nested", [True, False])
def test_from_pandas(pkg, with_nested):
"""Test that from_pandas returns a NestedFrame"""

if pkg == "pandas":
df = pd.DataFrame({"a": [1, 2, 3]}, index=[1, 2, 3])
elif pkg == "nested-pandas":
df = npd.NestedFrame({"a": [1, 2, 3]}, index=[1, 2, 3])
if with_nested:
nested = npd.NestedFrame({"b": [5, 10, 15, 20, 25, 30]}, index=[1, 1, 2, 2, 3, 3])
df = df.add_nested(nested, "nested")

ndf = nd.NestedFrame.from_pandas(df)
assert isinstance(ndf, nd.NestedFrame)


@pytest.mark.parametrize("with_nested", [True, False])
def test_from_delayed(with_nested):
"""Test that from_delayed returns a NestedFrame"""

nf = nd.datasets.generate_data(10, 10)
if not with_nested:
nf = nf.drop("nested", axis=1)

delayed = nf.to_delayed()

ndf = nd.NestedFrame.from_delayed(dfs=delayed, meta=nf._meta)
assert isinstance(ndf, nd.NestedFrame)


def test_from_map(test_dataset, tmp_path):
"""Test that from_map returns a NestedFrame"""

# Setup a temporary directory for files
test_save_path = tmp_path / "test_dataset"

# Save Base to Parquet
test_dataset[["a", "b"]].to_parquet(test_save_path, write_index=True)

# Load from_map
paths = [
tmp_path / "test_dataset" / "0.parquet",
tmp_path / "test_dataset" / "1.parquet",
tmp_path / "test_dataset" / "2.parquet",
]
ndf = nd.NestedFrame.from_map(nd.read_parquet, paths, meta=test_dataset[["a", "b"]]._meta)
assert isinstance(ndf, nd.NestedFrame)

0 comments on commit 73e988c

Please sign in to comment.