Skip to content

Commit

Permalink
Introduce VirtualEmptyArray (#290)
Browse files Browse the repository at this point in the history
Don't show virtual arrays in DAG visualization
  • Loading branch information
tomwhite authored Aug 14, 2023
1 parent 1b2f871 commit 6ad5601
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 10 deletions.
22 changes: 19 additions & 3 deletions cubed/array_api/creation_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cubed.core import Plan, gensym, map_blocks
from cubed.core.ops import map_direct
from cubed.core.plan import new_temp_path
from cubed.storage.virtual import virtual_offsets
from cubed.storage.virtual import virtual_empty, virtual_offsets
from cubed.storage.zarr import lazy_from_array, lazy_full
from cubed.utils import to_chunksize
from cubed.vendor.dask.array.core import normalize_chunks
Expand Down Expand Up @@ -88,6 +88,22 @@ def empty_like(x, /, *, dtype=None, device=None, chunks=None, spec=None) -> "Arr
return empty(**_like_args(x, dtype, device, chunks, spec))


def empty_virtual_array(
shape, *, dtype=None, device=None, chunks="auto", spec=None
) -> "Array":
if dtype is None:
dtype = np.float64

chunksize = to_chunksize(normalize_chunks(chunks, shape=shape, dtype=dtype))
name = gensym()
target = virtual_empty(shape, dtype=dtype, chunks=chunksize)

from .array_object import Array

plan = Plan._new(name, "empty", target, hidden=True)
return Array(name, target, spec, plan)


def eye(
n_rows, n_cols=None, /, *, k=0, dtype=None, device=None, chunks="auto", spec=None
) -> "Array":
Expand Down Expand Up @@ -154,13 +170,13 @@ def full(
return Array(name, target, spec, plan)


def offsets_array(shape, spec=None) -> "Array":
def offsets_virtual_array(shape, spec=None) -> "Array":
name = gensym()
target = virtual_offsets(shape)

from .array_object import Array

plan = Plan._new(name, "block_ids", target)
plan = Plan._new(name, "block_ids", target, hidden=True)
return Array(name, target, spec, plan)


Expand Down
12 changes: 8 additions & 4 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def blockwise(
"blockwise",
pipeline.target_array,
pipeline,
False,
*source_arrays,
)
from cubed.array_api import Array
Expand Down Expand Up @@ -454,12 +455,12 @@ def map_blocks(
) -> "Array":
"""Apply a function to corresponding blocks from multiple input arrays."""
if has_keyword(func, "block_id"):
from cubed.array_api.creation_functions import offsets_array
from cubed.array_api.creation_functions import offsets_virtual_array

# Create an array of index offsets with the same chunk structure as the args,
# which we convert to block ids (chunk coordinates) later.
a = args[0]
offsets = offsets_array(a.numblocks, a.spec)
offsets = offsets_virtual_array(a.numblocks, a.spec)
new_args = args + (offsets,)

def offset_to_block_id(offset):
Expand Down Expand Up @@ -588,12 +589,12 @@ def map_direct(
(`args`) will be used (if any).
"""

from cubed.array_api.creation_functions import empty
from cubed.array_api.creation_functions import empty_virtual_array

if spec is None and len(args) > 0 and hasattr(args[0], "spec"):
spec = args[0].spec

out = empty(shape, dtype=dtype, chunks=chunks, spec=spec)
out = empty_virtual_array(shape, dtype=dtype, chunks=chunks, spec=spec)

kwargs["arrays"] = args

Expand Down Expand Up @@ -646,6 +647,7 @@ def rechunk(x, chunks, target_store=None):
"rechunk",
pipeline.target_array,
pipeline,
False,
x,
)
return Array(name, pipeline.target_array, spec, plan)
Expand All @@ -657,6 +659,7 @@ def rechunk(x, chunks, target_store=None):
"rechunk",
pipeline1.target_array,
pipeline1,
False,
x,
)
x_int = Array(name_int, pipeline1.target_array, spec, plan1)
Expand All @@ -667,6 +670,7 @@ def rechunk(x, chunks, target_store=None):
"rechunk",
pipeline2.target_array,
pipeline2,
False,
x_int,
)
return Array(name, pipeline2.target_array, spec, plan2)
Expand Down
3 changes: 3 additions & 0 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _new(
op_name,
target,
pipeline=None,
hidden=False,
*source_arrays,
):
# create an empty DAG or combine from sources
Expand All @@ -60,6 +61,7 @@ def _new(
op_name=op_name,
target=target,
stack_summaries=stack_summaries,
hidden=hidden,
)
else:
dag.add_node(
Expand All @@ -68,6 +70,7 @@ def _new(
op_name=op_name,
target=target,
stack_summaries=stack_summaries,
hidden=hidden,
pipeline=pipeline,
)
for x in source_arrays:
Expand Down
35 changes: 33 additions & 2 deletions cubed/storage/virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,34 @@

import numpy as np
import zarr
from zarr.indexing import is_slice
from zarr.indexing import BasicIndexer, is_slice

from cubed.types import T_Shape
from cubed.types import T_DType, T_RegularChunks, T_Shape


class VirtualEmptyArray:
"""An array that is never materialized (in memory or on disk) and contains empty values."""

def __init__(
self,
shape: T_Shape,
dtype: T_DType,
chunks: T_RegularChunks,
):
# use an empty in-memory Zarr array as a template since it normalizes its properties
template = zarr.empty(
shape, dtype=dtype, chunks=chunks, store=zarr.storage.MemoryStore()
)
self.shape = template.shape
self.dtype = template.dtype
self.chunks = template.chunks
self.template = template

def __getitem__(self, key):
if not isinstance(key, tuple):
key = (key,)
indexer = BasicIndexer(key, self.template)
return np.empty(indexer.shape, dtype=self.dtype)


class VirtualOffsetsArray:
Expand Down Expand Up @@ -43,5 +68,11 @@ def _key_to_index_tuple(selection):
return tuple(sel)


def virtual_empty(
shape: T_Shape, *, dtype: T_DType, chunks: T_RegularChunks, **kwargs
) -> VirtualEmptyArray:
return VirtualEmptyArray(shape, dtype, chunks, **kwargs)


def virtual_offsets(shape: T_Shape) -> VirtualOffsetsArray:
return VirtualOffsetsArray(shape)
21 changes: 20 additions & 1 deletion cubed/tests/storage/test_virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,26 @@
import numpy as np
import pytest

from cubed.storage.virtual import virtual_offsets
from cubed.storage.virtual import virtual_empty, virtual_offsets


@pytest.mark.parametrize(
"shape,chunks,index",
[
((3,), (2,), 2),
((3, 2), (2, 1), (2, 1)),
((3, 2), (2, 1), (2, slice(0, 1))),
((3, 2), (2, 1), (slice(1, 3), 1)),
((3, 2), (2, 1), (slice(1, 3), slice(0, 1))),
],
)
def test_virtual_empty(shape, chunks, index):
# array contents can be any uninitialized values, so
# just check shapes not values
v_empty = virtual_empty(shape, dtype=np.int32, chunks=chunks)
empty = np.empty(shape, dtype=np.int32)
assert v_empty[index].shape == empty[index].shape
assert v_empty[...].shape == empty[...].shape


@pytest.mark.parametrize("shape", [(), (3,), (3, 2)])
Expand Down

0 comments on commit 6ad5601

Please sign in to comment.