Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Store.getsize #2426

Merged
merged 17 commits into from
Nov 14, 2024
Merged
68 changes: 68 additions & 0 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from itertools import starmap
from typing import TYPE_CHECKING, NamedTuple, Protocol, runtime_checkable

from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import concurrent_map
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
from types import TracebackType
Expand Down Expand Up @@ -396,6 +400,70 @@ async def _get_many(
for req in requests:
yield (req[0], await self.get(*req))

async def getsize(self, key: str) -> int:
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved
"""
Return the size, in bytes, of a value in a Store.

Parameters
----------
key : str

Returns
-------
nbytes : int
The size of the value (in bytes).

Raises
------
FileNotFoundError
When the given key does not exist in the store.
"""
# Note to implementers: this default implementation is very inefficient since
# it requires reading the entire object. Many systems will have ways to get the
# size of an object without reading it.
value = await self.get(key, prototype=default_buffer_prototype())
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved
if value is None:
raise FileNotFoundError(key)
return len(value)

async def getsize_prefix(self, prefix: str) -> int:
"""
Return the size, in bytes, of all values under a prefix.

Parameters
----------
prefix : str
The prefix of the directory to measure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we offer implementers the following in documentation?:

This function will be called by zarr using a prefix that is the path of a group, an array, or the root. Implementations can choose to do undefined behavior when that is not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure... I was hoping we could somehow ensure that we don't call it with anything other than a group / array / root path, but users can directly use Store.getsize_prefix and they can do whatever.

LMK if you want any more specific guidance on what to do (e.g. raise a ValueError). I'm hesitant about trying to force required exceptions into an ABC / interface.

Copy link
Contributor Author

@TomAugspurger TomAugspurger Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant about trying to force required exceptions into an ABC / interface.

And now I'm noticing that I've done exactly that in getsize, with requiring implementations to raise FileNotFoundError if the key isn't found :)


Returns
-------
nbytes : int
The sum of the sizes of the values in the directory (in bytes).

See Also
--------
zarr.Array.nbytes_stored
Store.getsize

Notes
-----
``getsize_prefix`` is just provided as a potentially faster alternative to
listing all the keys under a prefix calling :meth:`Store.getsize` on each.

In general, ``prefix`` should be the path of an Array or Group in the Store.
Implementations may differ on the behavior when some other ``prefix``
is provided.
"""
# TODO: Overlap listing keys with getsize calls.
# Currently, we load the list of keys into memory and only then move
# on to getting sizes. Ideally we would overlap those two, which should
# improve tail latency and might reduce memory pressure (since not all keys
# would be in memory at once).
keys = [(x,) async for x in self.list_prefix(prefix)]
limit = config.get("async.concurrency")
sizes = await concurrent_map(keys, self.getsize, limit=limit)
return sum(sizes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This materializes the full list of keys in memory, can we maintain the generator longer to avoid that?

Also, this has unlimited concurrency, for a potentially very large number of keys. It could easily create millions of async tasks. We should probably run in chunks limited by the value of the concurrency setting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See concurrent_map for an example

Copy link
Contributor Author

@TomAugspurger TomAugspurger Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This materializes the full list of keys in memory, can we maintain the generator longer to avoid that?

I don't immediately see how that's possible.

The best I'm coming up with is a fold-like function that asynchronously iterates through keys from list_prefix and (asynchronously) calls self.getsize to update the size. Sounds kinda complicated.

FWIW, it looks like concurrent_map wants an iterable of items:

>           return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
E           TypeError: 'async_generator' object is not iterable```

In 7cbc500 I've hacked in some support for AsyncIterable there. I haven't had enough coffee to figure out what the flow of

return await asyncio.gather(*[asyncio.ensure_future(run(item)) async for item in items])

is. I'm a bit worried the async for item in items is happening immediately, so we end up building that list of keys in memory anyway.

We should probably run in chunks limited by the value of the concurrency setting.

Fixed. We should probably replace all instances of asyncio.gather with a concurrency-limited version. I'll make a separate issue for that.

Copy link
Contributor Author

@TomAugspurger TomAugspurger Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5f1d036 removed support for AsyncIterable in concurrent_map, replacing it with a TODO.

I think there's some discussion around improving our use of asyncio to handle cases like this (using queues to mediate task producers like list_prefix and consumers like getsize) that will address this.

The unbounded concurrency issue you raised, is still fixed. It's just the loading of keys into memory that's not yet addressed.



@runtime_checkable
class ByteGetter(Protocol):
Expand Down
13 changes: 13 additions & 0 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ async def nchunks_initialized(self) -> int:
"""
return len(await chunks_initialized(self))

async def nbytes_stored(self) -> int:
return await self.store_path.store.getsize_prefix(self.store_path.path)

def _iter_chunk_coords(
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
) -> Iterator[ChunkCoords]:
Expand Down Expand Up @@ -1540,6 +1543,16 @@ def nchunks_initialized(self) -> int:
"""
return sync(self._async_array.nchunks_initialized())

def nbytes_stored(self) -> int:
"""
Determine the size, in bytes, of the array actually written to the store.

Returns
-------
size : int
"""
return sync(self._async_array.nbytes_stored())

def _iter_chunk_keys(
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
) -> Iterator[str]:
Expand Down
4 changes: 3 additions & 1 deletion src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def product(tup: ChunkCoords) -> int:


async def concurrent_map(
items: Iterable[T], func: Callable[..., Awaitable[V]], limit: int | None = None
items: Iterable[T],
func: Callable[..., Awaitable[V]],
limit: int | None = None,
) -> list[V]:
if limit is None:
return await asyncio.gather(*list(starmap(func, items)))
Expand Down
8 changes: 7 additions & 1 deletion src/zarr/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from zarr.abc.store import ByteRangeRequest, Store
from zarr.core.buffer import Buffer
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.common import concurrent_map

if TYPE_CHECKING:
Expand Down Expand Up @@ -143,10 +144,12 @@ def __eq__(self, other: object) -> bool:
async def get(
self,
key: str,
prototype: BufferPrototype,
prototype: BufferPrototype | None = None,
byte_range: tuple[int | None, int | None] | None = None,
) -> Buffer | None:
# docstring inherited
if prototype is None:
prototype = default_buffer_prototype()
if not self._is_open:
await self._open()
assert isinstance(key, str)
Expand Down Expand Up @@ -241,3 +244,6 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str]:
yield key.relative_to(base).as_posix()
except (FileNotFoundError, NotADirectoryError):
pass

async def getsize(self, key: str) -> int:
return os.path.getsize(self.root / key)
13 changes: 13 additions & 0 deletions src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,16 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str]:
f"{self.path}/{prefix}", detail=False, maxdepth=None, withdirs=False
):
yield onefile.removeprefix(f"{self.path}/")

async def getsize(self, key: str) -> int:
path = _dereference_path(self.path, key)
info = await self.fs._info(path)

size = info.get("size")

if size is None:
# Not all filesystems support size. Fall back to reading the entire object
return await super().getsize(key)
else:
# fsspec doesn't have typing. We'll need to assume or verify this is true
return int(size)
23 changes: 23 additions & 0 deletions src/zarr/testing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,26 @@ async def test_set_if_not_exists(self, store: S) -> None:

result = await store.get("k2", default_buffer_prototype())
assert result == new

async def test_getsize(self, store: S) -> None:
key = "k"
data = self.buffer_cls.from_bytes(b"0" * 10)
await self.set(store, key, data)

result = await store.getsize(key)
assert isinstance(result, int)
assert result > 0

async def test_getsize_raises(self, store: S) -> None:
with pytest.raises(FileNotFoundError):
await store.getsize("not-a-real-key")

async def test_getsize_prefix(self, store: S) -> None:
prefix = "array/c/"
for i in range(10):
data = self.buffer_cls.from_bytes(b"0" * 10)
await self.set(store, f"{prefix}/{i}", data)

result = await store.getsize_prefix(prefix)
assert isinstance(result, int)
assert result > 0
24 changes: 24 additions & 0 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,30 @@ async def test_chunks_initialized() -> None:
assert observed == expected


def test_nbytes_stored() -> None:
arr = zarr.create(shape=(100,), chunks=(10,), dtype="i4")
result = arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
d-v-b marked this conversation as resolved.
Show resolved Hide resolved
arr[:50] = 1
result = arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
arr[50:] = 2
result = arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.


async def test_nbytes_stored_async() -> None:
arr = await zarr.api.asynchronous.create(shape=(100,), chunks=(10,), dtype="i4")
result = await arr.nbytes_stored()
assert result == 366 # the size of the metadata document. This is a fragile test.
await arr.setitem(slice(50), 1)
result = await arr.nbytes_stored()
assert result == 566 # the size with 5 chunks filled.
await arr.setitem(slice(50, 100), 2)
result = await arr.nbytes_stored()
assert result == 766 # the size with all chunks filled.


def test_default_fill_values() -> None:
a = Array.create(MemoryStore({}, mode="w"), shape=5, chunk_shape=5, dtype="<U4")
assert a.fill_value == ""
Expand Down
1 change: 1 addition & 0 deletions tests/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
if TYPE_CHECKING:
from collections.abc import AsyncGenerator

from zarr.core.buffer import BufferPrototype
from zarr.core.buffer.core import Buffer
from zarr.core.common import ChunkCoords

Expand Down
2 changes: 1 addition & 1 deletion tests/test_metadata/test_consolidated.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
open,
open_consolidated,
)
from zarr.core.buffer.core import default_buffer_prototype
from zarr.core.buffer import default_buffer_prototype
from zarr.core.group import ConsolidatedMetadata, GroupMetadata
from zarr.core.metadata import ArrayV3Metadata
from zarr.core.metadata.v2 import ArrayV2Metadata
Expand Down
1 change: 1 addition & 0 deletions tests/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from numcodecs.blosc import Blosc

import zarr
import zarr.core.buffer
import zarr.storage
from zarr import Array
from zarr.storage import MemoryStore, StorePath
Expand Down