Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Implementation #2

Merged
merged 24 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 103 additions & 7 deletions benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,110 @@
For more information on writing benchmarks:
https://asv.readthedocs.io/en/stable/writing_benchmarks.html."""

from dask_nested import example_benchmarks
import dask_nested as dn
import nested_pandas as npd
import numpy as np


def time_computation():
"""Time computations are prefixed with 'time'."""
example_benchmarks.runtime_computation()
def _generate_benchmark_data(add_nested=True):
"""generate a dataset for benchmarks"""

n_base = 100
layer_size = 1000

def mem_list():
"""Memory computations are prefixed with 'mem' or 'peakmem'."""
return example_benchmarks.memory_computation()
# use provided seed, "None" acts as if no seed is provided
randomstate = np.random.RandomState(seed=1)

# Generate base data
base_data = {"a": randomstate.random(n_base), "b": randomstate.random(n_base) * 2}
base_nf = npd.NestedFrame(data=base_data)

layer_data = {
"t": randomstate.random(layer_size * n_base) * 20,
"flux": randomstate.random(layer_size * n_base) * 100,
"band": randomstate.choice(["r", "g"], size=layer_size * n_base),
"index": np.arange(layer_size * n_base) % n_base,
}
layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index()

# Convert to Dask
base_nf = dn.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5)
layer_nf = dn.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50)

# Return based on add_nested
if add_nested:
base_nf = base_nf.add_nested(layer_nf, "nested")
return base_nf
else:
return base_nf, layer_nf


class NestedFrameAddNested:
"""Benchmark the NestedFrame.add_nested function"""

n_base = 100
layer_size = 1000
base_nf = dn.NestedFrame
layer_nf = dn.NestedFrame

def setup(self):
"""Set up the benchmark environment"""
self.base_nf, self.layer_nf = _generate_benchmark_data(add_nested=False)

def run(self):
"""Run the benchmark."""
self.base_nf.add_nested(self.layer_nf, "nested").compute()

def time_run(self):
"""Benchmark the runtime of adding a nested layer"""
self.run()

def peakmem_run(self):
"""Benchmark the memory usage of adding a nested layer"""
self.run()


class NestedFrameReduce:
"""Benchmark the NestedFrame.reduce function"""

nf = dn.NestedFrame

def setup(self):
"""Set up the benchmark environment"""
self.nf = _generate_benchmark_data(add_nested=True)

def run(self):
"""Run the benchmark."""
self.nf.reduce(np.mean, "nested.flux").compute()

def time_run(self):
"""Benchmark the runtime of applying the reduce function"""
self.run()

def peakmem_run(self):
"""Benchmark the memory usage of applying the reduce function"""
self.run()


class NestedFrameQuery:
"""Benchmark the NestedFrame.query function"""

nf = dn.NestedFrame

def setup(self):
"""Set up the benchmark environment"""
self.nf = _generate_benchmark_data(add_nested=True)

def run(self):
"""Run the benchmark."""

# Apply nested layer query
self.nf = self.nf.query("nested.band == 'g'").compute()

def time_run(self):
"""Benchmark the runtime of applying the two queries"""
self.run()

def peakmem_run(self):
"""Benchmark the memory usage of applying the two queries"""
self.run()
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ classifiers = [
dynamic = ["version"]
requires-python = ">=3.9"
dependencies = [
'nested-pandas',
'numpy',
'dask>=2024.3.0',
'dask[distributed]>=2024.3.0',
'dask_expr',
'pyarrow',
]

[project.urls]
Expand Down
7 changes: 4 additions & 3 deletions src/dask_nested/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .example_module import greetings, meaning

__all__ = ["greetings", "meaning"]
from . import backends, accessor # noqa
from .core import NestedFrame # noqa
from .io import read_parquet # noqa
from .datasets import generate_data # noqa
68 changes: 68 additions & 0 deletions src/dask_nested/accessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Python 3.9 doesn't support "|" for types
from __future__ import annotations

import dask.dataframe as dd
import nested_pandas as npd
from dask.dataframe.extensions import register_series_accessor
from nested_pandas import NestedDtype


@register_series_accessor("nest")
class DaskNestSeriesAccessor(npd.NestSeriesAccessor):
"""The nested-dask version of the nested-pandas NestSeriesAccessor.

Note that this has a very limited implementation relative to nested-pandas.

Parameters
----------
series: dd.series
A series to tie to the accessor
"""

def __init__(self, series):
self._check_series(series)

self._series = series

@staticmethod
def _check_series(series):
"""chcek the validity of the tied series dtype"""
dtype = series.dtype
if not isinstance(dtype, NestedDtype):
raise AttributeError(f"Can only use .nest accessor with a Series of NestedDtype, got {dtype}")

Check warning on line 32 in src/dask_nested/accessor.py

View check run for this annotation

Codecov / codecov/patch

src/dask_nested/accessor.py#L32

Added line #L32 was not covered by tests

@property
def fields(self) -> list[str]:
"""Names of the nested columns"""

return self._series.head(0).nest.fields # hacky
dougbrn marked this conversation as resolved.
Show resolved Hide resolved

def to_lists(self, fields: list[str] | None = None) -> dd.DataFrame:
"""Convert nested series into dataframe of list-array columns

Parameters
----------
fields : list[str] or None, optional
Names of the fields to include. Default is None, which means all fields.

Returns
-------
dd.DataFrame
Dataframe of list-arrays.
"""
return self._series.map_partitions(lambda x: x.nest.to_lists(fields=fields))

def to_flat(self, fields: list[str] | None = None) -> dd.DataFrame:
"""Convert nested series into dataframe of flat arrays

Parameters
----------
fields : list[str] or None, optional
Names of the fields to include. Default is None, which means all fields.

Returns
-------
dd.DataFrame
Dataframe of flat arrays.
"""
return self._series.map_partitions(lambda x: x.nest.to_flat(fields=fields))
40 changes: 40 additions & 0 deletions src/dask_nested/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Python 3.9 doesn't support "|" for types
from __future__ import annotations

import nested_pandas as npd
import pandas as pd
from dask.dataframe.backends import meta_nonempty_dataframe
from dask.dataframe.dispatch import make_meta_dispatch
from dask.dataframe.extensions import make_array_nonempty
from dask.dataframe.utils import meta_nonempty
from dask_expr import get_collection_type
from nested_pandas.series.ext_array import NestedExtensionArray

from .core import NestedFrame

dougbrn marked this conversation as resolved.
Show resolved Hide resolved
get_collection_type.register(npd.NestedFrame, lambda _: NestedFrame)

# The following dispatch functions are defined as per the Dask extension guide:
# https://docs.dask.org/en/latest/dataframe-extend.html


@make_meta_dispatch.register(npd.NestedFrame)
def make_meta_frame(x, index=None) -> npd.NestedFrame:
"""Create an empty NestedFrame to use as Dask's underlying object meta."""
result = x.head(0)
return result
dougbrn marked this conversation as resolved.
Show resolved Hide resolved


@meta_nonempty.register(npd.NestedFrame)
def _nonempty_nestedframe(x, index=None) -> npd.NestedFrame:
"""Construct a new NestedFrame with the same underlying data."""
df = meta_nonempty_dataframe(x)
return npd.NestedFrame(df)


@make_array_nonempty.register(npd.NestedDtype)
def _(dtype) -> NestedExtensionArray:
"""Register a valid dtype for the NestedExtensionArray"""
# must be two values to avoid a length error in meta inference
# Dask seems to explicitly require meta dtypes to have length 2.
return NestedExtensionArray._from_sequence([pd.NA, pd.NA], dtype=dtype)
Loading