Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make AsyncArray.nchunks_initialized async #2449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 49 additions & 40 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
T_ArrayMetadata,
)
from zarr.core.metadata.v3 import parse_node_type_array
from zarr.core.sync import collect_aiterator, sync
from zarr.core.sync import sync
from zarr.errors import MetadataValidationError
from zarr.registry import get_pipeline_class
from zarr.storage import StoreLike, make_store_path
Expand Down Expand Up @@ -829,17 +829,31 @@ def nchunks(self) -> int:
"""
return product(self.cdata_shape)

@property
def nchunks_initialized(self) -> int:
async def nchunks_initialized(self) -> int:
"""
The number of chunks that have been persisted in storage.
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
been persisted to the storage backend.

Returns
-------
int
The number of initialized chunks in the array.
nchunks_initialized : int
The number of chunks that have been initialized.

Notes
-----
On :class:`AsyncArray` this is an asynchronous method, unlike the (synchronous)
property :attr:`Array.nchunks_initialized`.

Examples
--------
>>> arr = await zarr.api.asynchronous.create(shape=(10,), chunks=(2,))
>>> await arr.nchunks_initialized()
0
>>> await arr.setitem(slice(5), 1)
>>> await arr.nchunks_initialized()
3
"""
return nchunks_initialized(self)
return len(await chunks_initialized(self))

def _iter_chunk_coords(
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
Expand Down Expand Up @@ -1492,9 +1506,29 @@ def nbytes(self) -> int:
@property
def nchunks_initialized(self) -> int:
"""
The number of chunks that have been initialized in the stored representation of this array.
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
been persisted to the storage backend.

Returns
-------
nchunks_initialized : int
The number of chunks that have been initialized.

Notes
-----
On :class:`Array` this is a (synchronous) property, unlike asynchronous function
:meth:`AsyncArray.nchunks_initialized`.

Examples
--------
>>> arr = await zarr.create(shape=(10,), chunks=(2,))
>>> arr.nchunks_initialized
0
>>> arr[:5] = 1
>>> arr.nchunks_initialized
3
"""
return self._async_array.nchunks_initialized
return sync(self._async_array.nchunks_initialized())

def _iter_chunk_keys(
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
Expand Down Expand Up @@ -2905,39 +2939,15 @@ def info(self) -> None:
)


def nchunks_initialized(
array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | Array,
) -> int:
"""
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
been persisted to the storage backend.

Parameters
----------
array : Array
The array to inspect.

Returns
-------
nchunks_initialized : int
The number of chunks that have been initialized.

See Also
--------
chunks_initialized
"""
return len(chunks_initialized(array))


def chunks_initialized(
array: Array | AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata],
async def chunks_initialized(
array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata],
) -> tuple[str, ...]:
"""
Return the keys of the chunks that have been persisted to the storage backend.

Parameters
----------
array : Array
array : AsyncArray
The array to inspect.

Returns
Expand All @@ -2950,10 +2960,9 @@ def chunks_initialized(
nchunks_initialized

"""
# TODO: make this compose with the underlying async iterator
store_contents = list(
collect_aiterator(array.store_path.store.list_prefix(prefix=array.store_path.path))
)
store_contents = [
x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path)
]
return tuple(chunk_key for chunk_key in array._iter_chunk_keys() if chunk_key in store_contents)


Expand Down
16 changes: 5 additions & 11 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def test_nchunks(test_cls: type[Array] | type[AsyncArray[Any]], nchunks: int) ->


@pytest.mark.parametrize("test_cls", [Array, AsyncArray[Any]])
def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
async def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
"""
Test that nchunks_initialized accurately returns the number of stored chunks.
"""
Expand All @@ -337,7 +337,7 @@ def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> N
if test_cls == Array:
observed = arr.nchunks_initialized
else:
observed = arr._async_array.nchunks_initialized
observed = await arr._async_array.nchunks_initialized()
assert observed == expected

# delete chunks
Expand All @@ -346,13 +346,12 @@ def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> N
if test_cls == Array:
observed = arr.nchunks_initialized
else:
observed = arr._async_array.nchunks_initialized
observed = await arr._async_array.nchunks_initialized()
expected = arr.nchunks - idx - 1
assert observed == expected


@pytest.mark.parametrize("test_cls", [Array, AsyncArray[Any]])
def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
async def test_chunks_initialized() -> None:
"""
Test that chunks_initialized accurately returns the keys of stored chunks.
"""
Expand All @@ -364,12 +363,7 @@ def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> No
)
for keys, region in zip(chunks_accumulated, arr._iter_chunk_regions(), strict=False):
arr[region] = 1

if test_cls == Array:
observed = sorted(chunks_initialized(arr))
else:
observed = sorted(chunks_initialized(arr._async_array))

observed = sorted(await chunks_initialized(arr._async_array))
expected = sorted(keys)
assert observed == expected

Expand Down