From 14be82633aba4790e824ee229a1ec119b2ba72bf Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 8 Feb 2024 10:19:07 -0500 Subject: [PATCH 01/17] Initial object-store implementation --- src/zarr/v3/store/object_store.py | 78 +++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 src/zarr/v3/store/object_store.py diff --git a/src/zarr/v3/store/object_store.py b/src/zarr/v3/store/object_store.py new file mode 100644 index 0000000000..3790f621be --- /dev/null +++ b/src/zarr/v3/store/object_store.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import asyncio +from typing import List, Optional, Tuple + +from object_store import ObjectStore as _ObjectStore +from object_store import Path as ObjectPath + +from zarr.v3.abc.store import Store + + +class ObjectStore(Store): + supports_writes: bool = True + supports_partial_writes: bool = False + supports_listing: bool = True + + store: _ObjectStore + + def init(self, store: _ObjectStore): + self.store = store + + def __str__(self) -> str: + return f"object://{self.store}" + + def __repr__(self) -> str: + return f"ObjectStore({repr(str(self))})" + + async def get( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[bytes]: + if byte_range is None: + return await self.store.get_async(ObjectPath(key)) + + start, end = byte_range + if end is None: + # Have to wrap a separate object-store function to support this + raise NotImplementedError + + return await self.store.get_range_async(ObjectPath(key), start, end - start) + + async def get_partial_values( + self, key_ranges: List[Tuple[str, Tuple[int, int]]] + ) -> List[bytes]: + # TODO: use rust-based concurrency inside object-store + futs = [self.get(key, byte_range=byte_range) for (key, byte_range) in key_ranges] + + # Seems like a weird type match where `get()` returns `Optional[bytes]` but + # `get_partial_values` is non-optional? + return await asyncio.gather(*futs) # type: ignore + + async def exists(self, key: str) -> bool: + try: + _ = await self.store.head_async(ObjectPath(key)) + return True + except FileNotFoundError: + return False + + async def set(self, key: str, value: bytes) -> None: + await self.store.put_async(ObjectPath(key), value) + + async def delete(self, key: str) -> None: + await self.store.delete_async(ObjectPath(key)) + + async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + raise NotImplementedError + + async def list(self) -> List[str]: + objects = await self.store.list_async(None) + return [str(obj.location) for obj in objects] + + async def list_prefix(self, prefix: str) -> List[str]: + objects = await self.store.list_async(ObjectPath(prefix)) + return [str(obj.location) for obj in objects] + + async def list_dir(self, prefix: str) -> List[str]: + list_result = await self.store.list_with_delimiter_async(ObjectPath(prefix)) + common_prefixes = set(list_result.common_prefixes) + return [str(obj.location) for obj in list_result.objects if obj not in common_prefixes] From afa79af04462778d1f69e5f739db20e09b7c7609 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 27 Feb 2024 17:10:07 -0500 Subject: [PATCH 02/17] Update src/zarr/v3/store/object_store.py Co-authored-by: Deepak Cherian --- src/zarr/v3/store/object_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/v3/store/object_store.py b/src/zarr/v3/store/object_store.py index 3790f621be..a834b44102 100644 --- a/src/zarr/v3/store/object_store.py +++ b/src/zarr/v3/store/object_store.py @@ -16,7 +16,7 @@ class ObjectStore(Store): store: _ObjectStore - def init(self, store: _ObjectStore): + def __init__(self, store: _ObjectStore): self.store = store def __str__(self) -> str: From f5c884bc3626dc47f327d868d996b244850aa526 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 22 Oct 2024 14:03:32 -0400 Subject: [PATCH 03/17] update --- src/zarr/storage/object_store.py | 141 ++++++++++++++++++++++++++++++ src/zarr/v3/store/object_store.py | 78 ----------------- 2 files changed, 141 insertions(+), 78 deletions(-) create mode 100644 src/zarr/storage/object_store.py delete mode 100644 src/zarr/v3/store/object_store.py diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py new file mode 100644 index 0000000000..862aa3c74d --- /dev/null +++ b/src/zarr/storage/object_store.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +import asyncio +from collections import defaultdict +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any + +import object_store_rs as obs + +from zarr.abc.store import ByteRangeRequest, Store +from zarr.core.buffer import Buffer +from zarr.core.buffer.core import BufferPrototype + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Coroutine, Iterable + from typing import Any + + from object_store_rs.store import ObjectStore as _ObjectStore + + from zarr.core.buffer import Buffer, BufferPrototype + from zarr.core.common import BytesLike + + +class ObjectStore(Store): + store: _ObjectStore + + def __init__(self, store: _ObjectStore) -> None: + self.store = store + + def __str__(self) -> str: + return f"object://{self.store}" + + def __repr__(self) -> str: + return f"ObjectStore({self!r})" + + async def get( + self, key: str, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None + ) -> Buffer: + if byte_range is None: + resp = await obs.get_async(self.store, key) + return await resp.bytes_async() + + pass + + raise NotImplementedError + + async def get_partial_values( + self, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRangeRequest]], + ) -> list[Buffer | None]: + # TODO: this is a bit hacky and untested. ObjectStore has a `get_ranges` method + # that will additionally merge nearby ranges, but it's _per_ file. So we need to + # split these key_ranges into **per-file** key ranges, and then reassemble the + # results in the original order. + key_ranges = list(key_ranges) + + per_file_requests: dict[str, list[tuple[int | None, int | None, int]]] = defaultdict(list) + for idx, (path, range_) in enumerate(key_ranges): + per_file_requests[path].append((range_[0], range_[1], idx)) + + futs: list[Coroutine[Any, Any, list[bytes]]] = [] + for path, ranges in per_file_requests.items(): + offsets = [r[0] for r in ranges] + lengths = [r[1] - r[0] for r in ranges] + fut = obs.get_ranges_async(self.store, path, offsets=offsets, lengths=lengths) + futs.append(fut) + + result = await asyncio.gather(*futs) + + output_buffers: list[bytes] = [b""] * len(key_ranges) + for per_file_request, buffers in zip(per_file_requests.items(), result, strict=True): + path, ranges = per_file_request + for buffer, ranges_ in zip(buffers, ranges, strict=True): + initial_index = ranges_[2] + output_buffers[initial_index] = buffer + + return output_buffers + + async def exists(self, key: str) -> bool: + try: + await obs.head_async(self.store, key) + except FileNotFoundError: + return False + else: + return True + + @property + def supports_writes(self) -> bool: + return True + + async def set(self, key: str, value: Buffer) -> None: + buf = value.to_bytes() + await obs.put_async(self.store, key, buf) + + # TODO: + # async def set_if_not_exists(self, key: str, value: Buffer) -> None: + + @property + def supports_deletes(self) -> bool: + return True + + async def delete(self, key: str) -> None: + await obs.delete_async(self.store, key) + + @property + def supports_partial_writes(self) -> bool: + return False + + async def set_partial_values( + self, key_start_values: Iterable[tuple[str, int, BytesLike]] + ) -> None: + raise NotImplementedError + + @property + def supports_listing(self) -> bool: + return True + + def list(self) -> AsyncGenerator[str, None]: + # object-store-rs does not yet support list results as an async generator + # https://github.com/apache/arrow-rs/issues/6587 + objects = obs.list(self.store) + paths = [object["path"] for object in objects] + # Not sure how to convert list to async generator + return paths + + def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: + # object-store-rs does not yet support list results as an async generator + # https://github.com/apache/arrow-rs/issues/6587 + objects = obs.list(self.store, prefix=prefix) + paths = [object["path"] for object in objects] + # Not sure how to convert list to async generator + return paths + + def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: + # object-store-rs does not yet support list results as an async generator + # https://github.com/apache/arrow-rs/issues/6587 + objects = obs.list_with_delimiter(self.store, prefix=prefix) + paths = [object["path"] for object in objects["objects"]] + # Not sure how to convert list to async generator + return paths diff --git a/src/zarr/v3/store/object_store.py b/src/zarr/v3/store/object_store.py deleted file mode 100644 index a834b44102..0000000000 --- a/src/zarr/v3/store/object_store.py +++ /dev/null @@ -1,78 +0,0 @@ -from __future__ import annotations - -import asyncio -from typing import List, Optional, Tuple - -from object_store import ObjectStore as _ObjectStore -from object_store import Path as ObjectPath - -from zarr.v3.abc.store import Store - - -class ObjectStore(Store): - supports_writes: bool = True - supports_partial_writes: bool = False - supports_listing: bool = True - - store: _ObjectStore - - def __init__(self, store: _ObjectStore): - self.store = store - - def __str__(self) -> str: - return f"object://{self.store}" - - def __repr__(self) -> str: - return f"ObjectStore({repr(str(self))})" - - async def get( - self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[bytes]: - if byte_range is None: - return await self.store.get_async(ObjectPath(key)) - - start, end = byte_range - if end is None: - # Have to wrap a separate object-store function to support this - raise NotImplementedError - - return await self.store.get_range_async(ObjectPath(key), start, end - start) - - async def get_partial_values( - self, key_ranges: List[Tuple[str, Tuple[int, int]]] - ) -> List[bytes]: - # TODO: use rust-based concurrency inside object-store - futs = [self.get(key, byte_range=byte_range) for (key, byte_range) in key_ranges] - - # Seems like a weird type match where `get()` returns `Optional[bytes]` but - # `get_partial_values` is non-optional? - return await asyncio.gather(*futs) # type: ignore - - async def exists(self, key: str) -> bool: - try: - _ = await self.store.head_async(ObjectPath(key)) - return True - except FileNotFoundError: - return False - - async def set(self, key: str, value: bytes) -> None: - await self.store.put_async(ObjectPath(key), value) - - async def delete(self, key: str) -> None: - await self.store.delete_async(ObjectPath(key)) - - async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: - raise NotImplementedError - - async def list(self) -> List[str]: - objects = await self.store.list_async(None) - return [str(obj.location) for obj in objects] - - async def list_prefix(self, prefix: str) -> List[str]: - objects = await self.store.list_async(ObjectPath(prefix)) - return [str(obj.location) for obj in objects] - - async def list_dir(self, prefix: str) -> List[str]: - list_result = await self.store.list_with_delimiter_async(ObjectPath(prefix)) - common_prefixes = set(list_result.common_prefixes) - return [str(obj.location) for obj in list_result.objects if obj not in common_prefixes] From af2a39b52ed3d5ac2f33d2c280ac5e0675e67a2d Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 1 Nov 2024 16:54:54 -0400 Subject: [PATCH 04/17] Handle list streams --- src/zarr/storage/object_store.py | 56 +++++++++++++++++++------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 862aa3c74d..2bf709e470 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -5,7 +5,7 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any -import object_store_rs as obs +import obstore as obs from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer @@ -15,7 +15,8 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable from typing import Any - from object_store_rs.store import ObjectStore as _ObjectStore + from obstore import ListStream, ObjectMeta + from obstore.store import ObjectStore as _ObjectStore from zarr.core.buffer import Buffer, BufferPrototype from zarr.core.common import BytesLike @@ -93,8 +94,9 @@ async def set(self, key: str, value: Buffer) -> None: buf = value.to_bytes() await obs.put_async(self.store, key, buf) - # TODO: - # async def set_if_not_exists(self, key: str, value: Buffer) -> None: + async def set_if_not_exists(self, key: str, value: Buffer) -> None: + buf = value.to_bytes() + await obs.put_async(self.store, key, buf, mode="create") @property def supports_deletes(self) -> bool: @@ -117,25 +119,35 @@ def supports_listing(self) -> bool: return True def list(self) -> AsyncGenerator[str, None]: - # object-store-rs does not yet support list results as an async generator - # https://github.com/apache/arrow-rs/issues/6587 - objects = obs.list(self.store) - paths = [object["path"] for object in objects] - # Not sure how to convert list to async generator - return paths + objects: ListStream[list[ObjectMeta]] = obs.list(self.store) + return _transform_list(objects) def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: - # object-store-rs does not yet support list results as an async generator - # https://github.com/apache/arrow-rs/issues/6587 - objects = obs.list(self.store, prefix=prefix) - paths = [object["path"] for object in objects] - # Not sure how to convert list to async generator - return paths + objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) + return _transform_list(objects) def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: - # object-store-rs does not yet support list results as an async generator - # https://github.com/apache/arrow-rs/issues/6587 - objects = obs.list_with_delimiter(self.store, prefix=prefix) - paths = [object["path"] for object in objects["objects"]] - # Not sure how to convert list to async generator - return paths + objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) + return _transform_list_dir(objects, prefix) + + +async def _transform_list( + list_stream: AsyncGenerator[list[ObjectMeta], None], +) -> AsyncGenerator[str, None]: + async for batch in list_stream: + for item in batch: + yield item["path"] + + +async def _transform_list_dir( + list_stream: AsyncGenerator[list[ObjectMeta], None], prefix: str +) -> AsyncGenerator[str, None]: + # We assume that the underlying object-store implementation correctly handles the + # prefix, so we don't double-check that the returned results actually start with the + # given prefix. + prefix_len = len(prefix) + async for batch in list_stream: + for item in batch: + # Yield this item if "/" does not exist after the prefix. + if "/" not in item["path"][prefix_len:]: + yield item["path"] From d7cfbee1f37fe17f7c92335b8f1a569f88963608 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 1 Nov 2024 17:24:57 -0400 Subject: [PATCH 05/17] Update get --- src/zarr/storage/object_store.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 2bf709e470..8f40cd49d4 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -39,11 +39,22 @@ async def get( ) -> Buffer: if byte_range is None: resp = await obs.get_async(self.store, key) - return await resp.bytes_async() - - pass - - raise NotImplementedError + return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + + start, end = byte_range + if start is not None and end is not None: + resp = await obs.get_range_async(self.store, key, start=start, end=end) + return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + elif start is not None: + if start >= 0: + # Offset request + resp = await obs.get_async(self.store, key, options={"range": {"offset": start}}) + else: + resp = await obs.get_async(self.store, key, options={"range": {"suffix": start}}) + + return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + else: + raise ValueError(f"Unexpected input to `get`: {start=}, {end=}") async def get_partial_values( self, From cb40015f77d4e3df11f6801906ca1b3eb2201a47 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 1 Nov 2024 17:29:10 -0400 Subject: [PATCH 06/17] wip refactor get_partial_values --- src/zarr/storage/object_store.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 8f40cd49d4..ac56affa2d 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -15,6 +15,7 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable from typing import Any + from obstore import Buffer as ObjectStoreBuffer from obstore import ListStream, ObjectMeta from obstore.store import ObjectStore as _ObjectStore @@ -71,21 +72,21 @@ async def get_partial_values( for idx, (path, range_) in enumerate(key_ranges): per_file_requests[path].append((range_[0], range_[1], idx)) - futs: list[Coroutine[Any, Any, list[bytes]]] = [] + futs: list[Coroutine[Any, Any, list[ObjectStoreBuffer]]] = [] for path, ranges in per_file_requests.items(): - offsets = [r[0] for r in ranges] - lengths = [r[1] - r[0] for r in ranges] - fut = obs.get_ranges_async(self.store, path, offsets=offsets, lengths=lengths) + starts = [r[0] for r in ranges] + ends = [r[1] for r in ranges] + fut = obs.get_ranges_async(self.store, path, starts=starts, ends=ends) futs.append(fut) result = await asyncio.gather(*futs) - output_buffers: list[bytes] = [b""] * len(key_ranges) + output_buffers: list[type[BufferPrototype]] = [b""] * len(key_ranges) for per_file_request, buffers in zip(per_file_requests.items(), result, strict=True): path, ranges = per_file_request for buffer, ranges_ in zip(buffers, ranges, strict=True): initial_index = ranges_[2] - output_buffers[initial_index] = buffer + output_buffers[initial_index] = prototype.buffer.from_buffer(memoryview(buffer)) return output_buffers From b97645016717c37c6fa0a86c0edeb727fada10f0 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 7 Nov 2024 15:35:44 -0500 Subject: [PATCH 07/17] Fixes to _get_partial_values --- src/zarr/storage/object_store.py | 186 ++++++++++++++++++++++++++----- 1 file changed, 156 insertions(+), 30 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index ac56affa2d..7383ec3ac7 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -3,7 +3,7 @@ import asyncio from collections import defaultdict from collections.abc import Iterable -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypedDict import obstore as obs @@ -15,8 +15,7 @@ from collections.abc import AsyncGenerator, Coroutine, Iterable from typing import Any - from obstore import Buffer as ObjectStoreBuffer - from obstore import ListStream, ObjectMeta + from obstore import ListStream, ObjectMeta, OffsetRange, SuffixRange from obstore.store import ObjectStore as _ObjectStore from zarr.core.buffer import Buffer, BufferPrototype @@ -62,33 +61,7 @@ async def get_partial_values( prototype: BufferPrototype, key_ranges: Iterable[tuple[str, ByteRangeRequest]], ) -> list[Buffer | None]: - # TODO: this is a bit hacky and untested. ObjectStore has a `get_ranges` method - # that will additionally merge nearby ranges, but it's _per_ file. So we need to - # split these key_ranges into **per-file** key ranges, and then reassemble the - # results in the original order. - key_ranges = list(key_ranges) - - per_file_requests: dict[str, list[tuple[int | None, int | None, int]]] = defaultdict(list) - for idx, (path, range_) in enumerate(key_ranges): - per_file_requests[path].append((range_[0], range_[1], idx)) - - futs: list[Coroutine[Any, Any, list[ObjectStoreBuffer]]] = [] - for path, ranges in per_file_requests.items(): - starts = [r[0] for r in ranges] - ends = [r[1] for r in ranges] - fut = obs.get_ranges_async(self.store, path, starts=starts, ends=ends) - futs.append(fut) - - result = await asyncio.gather(*futs) - - output_buffers: list[type[BufferPrototype]] = [b""] * len(key_ranges) - for per_file_request, buffers in zip(per_file_requests.items(), result, strict=True): - path, ranges = per_file_request - for buffer, ranges_ in zip(buffers, ranges, strict=True): - initial_index = ranges_[2] - output_buffers[initial_index] = prototype.buffer.from_buffer(memoryview(buffer)) - - return output_buffers + return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges) async def exists(self, key: str) -> bool: try: @@ -163,3 +136,156 @@ async def _transform_list_dir( # Yield this item if "/" does not exist after the prefix. if "/" not in item["path"][prefix_len:]: yield item["path"] + + +class BoundedRequest(TypedDict): + """Range request with a known start and end byte. + + These requests can be multiplexed natively on the Rust side with + `obstore.get_ranges_async`. + """ + + original_request_index: int + """The positional index in the original key_ranges input""" + + start: int + """Start byte offset.""" + + end: int + """End byte offset.""" + + +class OtherRequest(TypedDict): + """Offset or suffix range requests. + + These requests cannot be concurrent on the Rust side, and each need their own call + to `obstore.get_async`, passing in the `range` parameter. + """ + + original_request_index: int + """The positional index in the original key_ranges input""" + + path: str + """The path to request from.""" + + range: OffsetRange | SuffixRange + """The range request type.""" + + +class Response(TypedDict): + """A response buffer associated with the original index that it should be restored to.""" + + original_request_index: int + """The positional index in the original key_ranges input""" + + buffer: Buffer + """The buffer returned from obstore's range request.""" + + +async def _make_bounded_requests( + store: obs.store.ObjectStore, + path: str, + requests: list[BoundedRequest], + prototype: BufferPrototype, +) -> list[Response]: + """Make all bounded requests for a specific file. + + `obstore.get_ranges_async` allows for making concurrent requests for multiple ranges + within a single file, and will e.g. merge concurrent requests. This only uses one + single Python coroutine. + """ + + starts = [r["start"] for r in requests] + ends = [r["end"] for r in requests] + responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) + + buffer_responses: list[Response] = [] + for request, response in zip(requests, responses, strict=True): + buffer_responses.append( + { + "original_request_index": request["original_request_index"], + "buffer": prototype.buffer.from_bytes(memoryview(response)), + } + ) + + return buffer_responses + + +async def _make_other_request( + store: obs.store.ObjectStore, + request: OtherRequest, + prototype: BufferPrototype, +) -> list[Response]: + """Make suffix or offset requests. + + We return a `list[Response]` for symmetry with `_make_bounded_requests` so that all + futures can be gathered together. + """ + resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) + buffer = await resp.bytes_async() + return [ + { + "original_request_index": request["original_request_index"], + "buffer": prototype.buffer.from_bytes(buffer), + } + ] + + +async def _get_partial_values( + store: obs.store.ObjectStore, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRangeRequest]], +) -> list[Buffer | None]: + """Make multiple range requests. + + ObjectStore has a `get_ranges` method that will additionally merge nearby ranges, + but it's _per_ file. So we need to split these key_ranges into **per-file** key + ranges, and then reassemble the results in the original order. + + We separate into different requests: + + - One call to `obstore.get_ranges_async` **per target file** + - One call to `obstore.get_async` for each other request. + """ + key_ranges = list(key_ranges) + per_file_bounded_requests: dict[str, list[BoundedRequest]] = defaultdict(list) + other_requests: list[OtherRequest] = [] + + for idx, (path, (start, end)) in enumerate(key_ranges): + if start is None: + raise ValueError("Cannot pass `None` for the start of the range request.") + + if end is not None: + # This is a bounded request with known start and end byte. + per_file_bounded_requests[path].append( + {"original_request_index": idx, "start": start, "end": end} + ) + elif end is None and start < 0: + # Suffix request from the end + other_requests.append( + {"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} + ) + elif end is None and start > 0: + # Offset request to the end + other_requests.append( + {"original_request_index": idx, "path": path, "range": {"offset": start}} + ) + else: + raise ValueError(f"Unsupported range input: {start=}, {end=}") + + futs: list[Coroutine[Any, Any, list[Response]]] = [] + for path, bounded_ranges in per_file_bounded_requests.items(): + futs.append(_make_bounded_requests(store, path, bounded_ranges, prototype)) + + for request in other_requests: + futs.append(_make_other_request(store, request, prototype)) # noqa: PERF401 + + buffers: list[Buffer | None] = [None] * len(key_ranges) + + # TODO: this gather a list of list of Response; not sure if there's a way to + # unpack these lists inside of an `asyncio.gather`? + for responses in await asyncio.gather(*futs): + for resp in responses: + buffers[resp["original_request_index"]] = resp["buffer"] + + return buffers From f2c827dea81b0c5d8d6c83a525b402b4fe59960b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 7 Nov 2024 15:38:54 -0500 Subject: [PATCH 08/17] Fix constructing prototype from get --- src/zarr/storage/object_store.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 7383ec3ac7..dd56697321 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -39,12 +39,12 @@ async def get( ) -> Buffer: if byte_range is None: resp = await obs.get_async(self.store, key) - return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + return prototype.buffer.from_bytes(await resp.bytes_async()) start, end = byte_range if start is not None and end is not None: resp = await obs.get_range_async(self.store, key, start=start, end=end) - return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + return prototype.buffer.from_bytes(memoryview(resp)) elif start is not None: if start >= 0: # Offset request @@ -52,7 +52,7 @@ async def get( else: resp = await obs.get_async(self.store, key, options={"range": {"suffix": start}}) - return prototype.buffer.from_buffer(memoryview(await resp.bytes_async())) # type: ignore not assignable to buffer + return prototype.buffer.from_bytes(await resp.bytes_async()) else: raise ValueError(f"Unexpected input to `get`: {start=}, {end=}") @@ -265,7 +265,7 @@ async def _get_partial_values( other_requests.append( {"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} ) - elif end is None and start > 0: + elif end is None and start >= 0: # Offset request to the end other_requests.append( {"original_request_index": idx, "path": path, "range": {"offset": start}} From 5c8903f99bc647469f9ef7638ab57e7061b50662 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 7 Nov 2024 15:39:29 -0500 Subject: [PATCH 09/17] lint --- src/zarr/storage/object_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index dd56697321..7945a29568 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -260,12 +260,12 @@ async def _get_partial_values( per_file_bounded_requests[path].append( {"original_request_index": idx, "start": start, "end": end} ) - elif end is None and start < 0: + elif start < 0: # Suffix request from the end other_requests.append( {"original_request_index": idx, "path": path, "range": {"suffix": abs(start)}} ) - elif end is None and start >= 0: + elif start >= 0: # Offset request to the end other_requests.append( {"original_request_index": idx, "path": path, "range": {"offset": start}} From 8bb252e045f95c29d05b8613e491d079d578e4c3 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 18 Nov 2024 17:13:06 +0000 Subject: [PATCH 10/17] Add docstring --- src/zarr/storage/object_store.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 7945a29568..a915696a16 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -23,6 +23,14 @@ class ObjectStore(Store): + """A Zarr store that uses obstore for fast read/write from AWS, GCP, and Azure. + + Parameters + ---------- + store : obstore.store.ObjectStore + An obstore store instance that is set up with the proper credentials. + """ + store: _ObjectStore def __init__(self, store: _ObjectStore) -> None: From 559eafde892f16a3cdaa02d6336f85fb52a16c3c Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 18 Nov 2024 17:14:35 +0000 Subject: [PATCH 11/17] Make names private --- src/zarr/storage/object_store.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index a915696a16..2a728470ad 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -146,7 +146,7 @@ async def _transform_list_dir( yield item["path"] -class BoundedRequest(TypedDict): +class _BoundedRequest(TypedDict): """Range request with a known start and end byte. These requests can be multiplexed natively on the Rust side with @@ -163,7 +163,7 @@ class BoundedRequest(TypedDict): """End byte offset.""" -class OtherRequest(TypedDict): +class _OtherRequest(TypedDict): """Offset or suffix range requests. These requests cannot be concurrent on the Rust side, and each need their own call @@ -180,7 +180,7 @@ class OtherRequest(TypedDict): """The range request type.""" -class Response(TypedDict): +class _Response(TypedDict): """A response buffer associated with the original index that it should be restored to.""" original_request_index: int @@ -193,9 +193,9 @@ class Response(TypedDict): async def _make_bounded_requests( store: obs.store.ObjectStore, path: str, - requests: list[BoundedRequest], + requests: list[_BoundedRequest], prototype: BufferPrototype, -) -> list[Response]: +) -> list[_Response]: """Make all bounded requests for a specific file. `obstore.get_ranges_async` allows for making concurrent requests for multiple ranges @@ -207,7 +207,7 @@ async def _make_bounded_requests( ends = [r["end"] for r in requests] responses = await obs.get_ranges_async(store, path=path, starts=starts, ends=ends) - buffer_responses: list[Response] = [] + buffer_responses: list[_Response] = [] for request, response in zip(requests, responses, strict=True): buffer_responses.append( { @@ -221,12 +221,12 @@ async def _make_bounded_requests( async def _make_other_request( store: obs.store.ObjectStore, - request: OtherRequest, + request: _OtherRequest, prototype: BufferPrototype, -) -> list[Response]: +) -> list[_Response]: """Make suffix or offset requests. - We return a `list[Response]` for symmetry with `_make_bounded_requests` so that all + We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all futures can be gathered together. """ resp = await obs.get_async(store, request["path"], options={"range": request["range"]}) @@ -256,8 +256,8 @@ async def _get_partial_values( - One call to `obstore.get_async` for each other request. """ key_ranges = list(key_ranges) - per_file_bounded_requests: dict[str, list[BoundedRequest]] = defaultdict(list) - other_requests: list[OtherRequest] = [] + per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list) + other_requests: list[_OtherRequest] = [] for idx, (path, (start, end)) in enumerate(key_ranges): if start is None: @@ -281,7 +281,7 @@ async def _get_partial_values( else: raise ValueError(f"Unsupported range input: {start=}, {end=}") - futs: list[Coroutine[Any, Any, list[Response]]] = [] + futs: list[Coroutine[Any, Any, list[_Response]]] = [] for path, bounded_ranges in per_file_bounded_requests.items(): futs.append(_make_bounded_requests(store, path, bounded_ranges, prototype)) From 5486e69081455bf4ef85102f98fbb6ae59cf2470 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 18 Nov 2024 17:35:21 +0000 Subject: [PATCH 12/17] Implement eq --- src/zarr/storage/object_store.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/zarr/storage/object_store.py b/src/zarr/storage/object_store.py index 2a728470ad..b7b9902aa0 100644 --- a/src/zarr/storage/object_store.py +++ b/src/zarr/storage/object_store.py @@ -32,6 +32,13 @@ class ObjectStore(Store): """ store: _ObjectStore + """The underlying obstore instance.""" + + def __eq__(self, value: object) -> bool: + if not isinstance(value, ObjectStore): + return False + + return self.store.__eq__(value.store) def __init__(self, store: _ObjectStore) -> None: self.store = store From 9a05c017e8755e941f89d7f1129efb05364c87e6 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 18 Nov 2024 13:19:14 -0500 Subject: [PATCH 13/17] Add obstore as a test dep --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 09797ae3d4..92c5812c4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ test = [ "mypy", "hypothesis", "universal-pathlib", + "obstore==0.3.0b5", ] jupyter = [ From 56b7a0bd365d4ee2f80530d357f37b8fc472d29b Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Mon, 18 Nov 2024 13:21:52 -0500 Subject: [PATCH 14/17] Run store tests on ObjectStore --- tests/test_store/test_object.py | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 tests/test_store/test_object.py diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py new file mode 100644 index 0000000000..f4ff234313 --- /dev/null +++ b/tests/test_store/test_object.py @@ -0,0 +1,7 @@ +from zarr.core.buffer import cpu +from zarr.storage.object_store import ObjectStore +from zarr.testing.store import StoreTests + + +class TestObjectStore(StoreTests[ObjectStore, cpu.Buffer]): + store_cls = ObjectStore From b38ada1082bf7729106175336b6b1812dee66524 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 21 Nov 2024 14:55:01 +0000 Subject: [PATCH 15/17] import or skip --- tests/test_store/test_object.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index f4ff234313..0fa5a0a098 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -1,3 +1,8 @@ +# ruff: noqa: E402 +import pytest + +pytest.importorskip("obstore") + from zarr.core.buffer import cpu from zarr.storage.object_store import ObjectStore from zarr.testing.store import StoreTests From ab00b46daff61054595ea57eebff1772b1d67658 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 22 Nov 2024 14:52:54 +0000 Subject: [PATCH 16/17] Bump obstore beta version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 92c5812c4d..41fbd70b15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ test = [ "mypy", "hypothesis", "universal-pathlib", - "obstore==0.3.0b5", + "obstore==0.3.0b8", ] jupyter = [ From 9c65e4d50fffaabbd286e8f6be0dac10e5e4a518 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 22 Nov 2024 15:25:03 +0000 Subject: [PATCH 17/17] bump pre-commit --- .pre-commit-config.yaml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dd9660fa5f..df4f35a48a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -9,9 +9,9 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.7.3 hooks: - - id: ruff - args: ["--fix", "--show-fixes"] - - id: ruff-format + - id: ruff + args: ["--fix", "--show-fixes"] + - id: ruff-format - repo: https://github.com/codespell-project/codespell rev: v2.3.0 hooks: @@ -20,7 +20,7 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - - id: check-yaml + - id: check-yaml - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.13.0 hooks: @@ -35,6 +35,7 @@ repos: - numpy - typing_extensions - universal-pathlib + - obstore==0.3.0-beta.8 # Tests - pytest - repo: https://github.com/scientific-python/cookie