Skip to content

Commit

Permalink
Support block_id for general_blockwise
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Oct 8, 2024
1 parent c2d042c commit 6681920
Showing 1 changed file with 77 additions and 30 deletions.
107 changes: 77 additions & 30 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@
from cubed.spec import spec_from_config
from cubed.storage.backend import open_backend_array
from cubed.types import T_RegularChunks, T_Shape
from cubed.utils import (
_concatenate2,
array_memory,
array_size,
get_item,
offset_to_block_id,
to_chunksize,
)
from cubed.utils import _concatenate2, array_memory, array_size, get_item
from cubed.utils import numblocks as compute_numblocks
from cubed.utils import offset_to_block_id, to_chunksize
from cubed.vendor.dask.array.core import normalize_chunks
from cubed.vendor.dask.array.utils import validate_axis
from cubed.vendor.dask.blockwise import broadcast_dimensions, lol_product
Expand Down Expand Up @@ -342,6 +337,77 @@ def general_blockwise(
target_paths=None,
extra_func_kwargs=None,
**kwargs,
) -> Union["Array", Tuple["Array", ...]]:
if has_keyword(func, "block_id"):
from cubed.array_api.creation_functions import offsets_virtual_array

# Create an array of index offsets with the same chunk structure as the args,
# which we convert to block ids (chunk coordinates) later.
array0 = arrays[0]
# note that primitive general_blockwise checks that all chunkss have same numblocks
numblocks = compute_numblocks(chunkss[0])
offsets = offsets_virtual_array(numblocks, array0.spec)
new_arrays = arrays + (offsets,)

def key_function_with_offset(key_function):
def wrap(out_key):
out_coords = out_key[1:]
offset_in_key = ((offsets.name,) + out_coords,)
return key_function(out_key) + offset_in_key

return wrap

def func_with_block_id(func):
def wrap(*a, **kw):
offset = int(a[-1]) # convert from 0-d array
block_id = offset_to_block_id(offset, numblocks)
return func(*a[:-1], block_id=block_id, **kw)

return wrap

num_input_blocks = kwargs.pop("num_input_blocks", None)
if num_input_blocks is not None:
num_input_blocks = num_input_blocks + (1,) # for offsets array

return _general_blockwise(
func_with_block_id(func),
key_function_with_offset(key_function),
*new_arrays,
shapes=shapes,
dtypes=dtypes,
chunkss=chunkss,
target_stores=target_stores,
target_paths=target_paths,
extra_func_kwargs=extra_func_kwargs,
num_input_blocks=num_input_blocks,
**kwargs,
)

return _general_blockwise(
func,
key_function,
*arrays,
shapes=shapes,
dtypes=dtypes,
chunkss=chunkss,
target_stores=target_stores,
target_paths=target_paths,
extra_func_kwargs=extra_func_kwargs,
**kwargs,
)


def _general_blockwise(
func,
key_function,
*arrays,
shapes,
dtypes,
chunkss,
target_stores=None,
target_paths=None,
extra_func_kwargs=None,
**kwargs,
) -> Union["Array", Tuple["Array", ...]]:
assert len(arrays) > 0

Expand Down Expand Up @@ -504,12 +570,6 @@ def merged_chunk_len_for_indexer(ia, c):
if _is_chunk_aligned_selection(idx):
# use general_blockwise, which allows more opportunities for optimization than map_direct

from cubed.array_api.creation_functions import offsets_virtual_array

# general_blockwise doesn't support block_id, so emulate it ourselves
numblocks = tuple(map(len, target_chunks))
offsets = offsets_virtual_array(numblocks, x.spec)

def key_function(out_key):
out_coords = out_key[1:]

Expand All @@ -521,24 +581,17 @@ def key_function(out_key):
in_sel, x.zarray_maybe_lazy.shape, x.zarray_maybe_lazy.chunks
)

offset_in_key = ((offsets.name,) + out_coords,)
return (
tuple((x.name,) + chunk_coords for (chunk_coords, _, _) in indexer)
+ offset_in_key
return tuple(
(x.name,) + chunk_coords for (chunk_coords, _, _) in indexer
)

# since selection is chunk-aligned, we know that we only read one block of x
num_input_blocks = (1, 1) # x, offsets

out = general_blockwise(
_assemble_index_chunk,
key_function,
x,
offsets,
shapes=[shape],
dtypes=[x.dtype],
chunkss=[target_chunks],
num_input_blocks=num_input_blocks,
target_chunks=target_chunks,
selection=selection,
in_shape=x.shape,
Expand Down Expand Up @@ -622,14 +675,8 @@ def _assemble_index_chunk(
selection=None,
in_shape=None,
in_chunksize=None,
block_id=None,
):
# last array contains the offset for the block_id
offset = int(arrs[-1]) # convert from 0-d array
numblocks = tuple(map(len, target_chunks))
block_id = offset_to_block_id(offset, numblocks)

arrs = arrs[:-1] # drop offset array

# compute the selection on x required to get the relevant chunk for out_coords
out_coords = block_id
in_sel = _target_chunk_selection(target_chunks, out_coords, selection)
Expand Down

0 comments on commit 6681920

Please sign in to comment.