diff --git a/activestorage/active.py b/activestorage/active.py index 798a9820..102d5f88 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -1,11 +1,14 @@ import os import numpy as np +import pathlib #FIXME: Consider using h5py throughout, for more generality from netCDF4 import Dataset from zarr.indexing import ( OrthogonalIndexer, ) +from activestorage.config import * +from activestorage.s3 import reduce_chunk as s3_reduce_chunk from activestorage.storage import reduce_chunk from activestorage import netcdf_to_zarr as nz @@ -333,13 +336,24 @@ def _process_chunk(self, fsref, chunk_coords, chunk_selection, out, counts, key = f"{self.ncvar}/{coord}" rfile, offset, size = tuple(fsref[key]) - # note there is an ongoing discussion about this interface, and what it returns - # see https://github.com/valeriupredoi/PyActiveStorage/issues/33 - # so neither the returned data or the interface should be considered stable - # although we will version changes. - tmp, count = reduce_chunk(rfile, offset, size, compressor, filters, missing, - self.zds._dtype, self.zds._chunks, self.zds._order, - chunk_selection, method=self.method) + if USE_S3: + object = os.path.basename(rfile) + tmp, count = s3_reduce_chunk(S3_ACTIVE_STORAGE_URL, S3_ACCESS_KEY, + S3_SECRET_KEY, S3_URL, S3_BUCKET, + object, offset, size, + compressor, filters, missing, + self.zds._dtype, self.zds._chunks, + self.zds._order, chunk_selection, + operation=self._method) + else: + # note there is an ongoing discussion about this interface, and what it returns + # see https://github.com/valeriupredoi/PyActiveStorage/issues/33 + # so neither the returned data or the interface should be considered stable + # although we will version changes. + tmp, count = reduce_chunk(rfile, offset, size, compressor, filters, + missing, self.zds._dtype, + self.zds._chunks, self.zds._order, + chunk_selection, method=self.method) if self.method is not None: out.append(tmp) diff --git a/activestorage/config.py b/activestorage/config.py new file mode 100644 index 00000000..0b76dfe3 --- /dev/null +++ b/activestorage/config.py @@ -0,0 +1,19 @@ +# This file contains configuration for PyActiveStorage. + +# Whether to use the S3 Active Storage interface. +USE_S3 = False + +# URL of S3 Active Storage server. +S3_ACTIVE_STORAGE_URL = "http://localhost:8080" + +# URL of S3 object store. +S3_URL = "http://localhost:9000" + +# S3 access key / username. +S3_ACCESS_KEY = "minioadmin" + +# S3 secret key / password. +S3_SECRET_KEY = "minioadmin" + +# S3 bucket. +S3_BUCKET = "pyactivestorage" diff --git a/activestorage/s3.py b/activestorage/s3.py new file mode 100644 index 00000000..a6f58210 --- /dev/null +++ b/activestorage/s3.py @@ -0,0 +1,127 @@ +"""S3 active storage module.""" + +import http.client +import json +import requests +import numpy as np +import sys + + +def reduce_chunk(server, username, password, source, bucket, object, offset, + size, compression, filters, missing, dtype, shape, order, + chunk_selection, operation): + """Perform a reduction on a chunk using S3 Active Storage. + + :param server: S3 active storage server URL + :param username: S3 username / access key + :param password: S3 password / secret key + :param source: S3 URL + :param bucket: S3 bucket + :param object: S3 object + :param offset: offset of data in object + :param size: size of data in object + :param compression: name of compression, unsupported + :param filters: name of filters, unsupported + :param missing: 4-tuple describing missing data, unsupported + :param dtype: data type name + :param shape: will be a tuple, something like (3,3,1), this is the + dimensionality of the chunk itself + :param order: typically 'C' for c-type ordering + :param chunk_selection: N-tuple where N is the length of `shape`, and each + item is an integer or slice. e.g. (slice(0, 2, + 1), slice(1, 3, 1), slice(0, 1, 1)) + this defines the part of the chunk which is to be + obtained or operated upon. + :param operation: name of operation to perform + :returns: the reduced data as a numpy array or scalar + :raises S3ActiveStorageError: if the request to S3 active storage fails + """ + + if compression is not None: + raise NotImplementedError("Compression is not yet supported!") + if filters is not None: + raise NotImplementedError("Filters are not yet supported!") + + request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection) + api_operation = "sum" if operation == "mean" else operation or "select" + url = f'{server}/v1/{api_operation}/' + response = request(url, username, password, request_data) + + if response.ok: + # FIXME: Return count from mean + result = decode_result(response) + if operation == "mean": + count = reduce_chunk(server, username, password, source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, "count")[0] + else: + count = None + return result, count + else: + decode_and_raise_error(response) + + +def encode_selection(selection): + """Encode a chunk selection in a JSON-compatible format.""" + def encode_slice(s): + if isinstance(s, slice): + return [s.start, s.stop, s.step] + else: + # Integer - select single value + return [s, s + 1, 1] + + return [encode_slice(s) for s in selection] + + +def build_request_data(source: str, bucket: str, object: str, offset: int, + size: int, compression, filters, missing, dtype, shape, + order, selection) -> dict: + """Build request data for S3 active storage API.""" + # TODO: compression, filters, missing + request_data = { + 'source': source, + 'bucket': bucket, + 'object': object, + 'dtype': dtype.name, + 'offset': offset, + 'size': size, + 'order': order, + } + if shape: + request_data["shape"] = shape + if selection: + request_data["selection"] = encode_selection(selection) + return {k: v for k, v in request_data.items() if v is not None} + + +def request(url: str, username: str, password: str, request_data: dict): + """Make a request to an S3 active storage API.""" + response = requests.post( + url, + json=request_data, + auth=(username, password) + ) + return response + + +def decode_result(response): + """Decode a successful response, return as a numpy array or scalar.""" + dtype = response.headers['x-activestorage-dtype'] + shape = json.loads(response.headers['x-activestorage-shape']) + result = np.frombuffer(response.content, dtype=dtype) + result = result.reshape(shape) + return result + + +class S3ActiveStorageError(Exception): + """Exception for S3 Active Storage failures.""" + + def __init__(self, status_code, error): + super(S3ActiveStorageError, self).__init__(f"S3 Active Storage error: HTTP {status_code}: {error}") + + +def decode_and_raise_error(response): + """Decode an error response and raise S3ActiveStorageError.""" + try: + error = json.dumps(response.json()) + raise S3ActiveStorageError(response.status_code, error) + except requests.exceptions.JSONDecodeError as exc: + raise S3ActiveStorageError(response.status_code, "-") from exc diff --git a/environment.yml b/environment.yml index 3f239aeb..29e2e35a 100644 --- a/environment.yml +++ b/environment.yml @@ -13,6 +13,7 @@ dependencies: - netcdf4 - numpy - pip !=21.3 + - s3fs # pin Zarr to avoid using old KVStore interface # see github.com/zarr-developers/zarr-python/issues/1362 - zarr >=2.13.6 # KVStore to FSStore diff --git a/setup.py b/setup.py index 72c7e01b..f217d0b8 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ 'kerchunk', 'netcdf4', 'numpy', + 's3fs', # pin Zarr to use new FSStore instead of KVStore 'zarr>=2.13.3', # github.com/zarr-developers/zarr-python/issues/1362 # for testing diff --git a/tests/test_bigger_data.py b/tests/test_bigger_data.py index 4264225c..e48d89a8 100644 --- a/tests/test_bigger_data.py +++ b/tests/test_bigger_data.py @@ -5,8 +5,10 @@ import numpy as np from netCDF4 import Dataset from pathlib import Path +import s3fs from activestorage.active import Active +from activestorage.config import * @pytest.fixture @@ -85,6 +87,18 @@ def create_hyb_pres_file_with_a(dataset, short_name): 'p0: p0 a: a_bnds b: b_bnds ps: ps') +def upload_to_s3(server, username, password, bucket, object, rfile): + """Upload a file to an S3 object store.""" + s3_fs = s3fs.S3FileSystem(key=username, secret=password, client_kwargs={'endpoint_url': server}) + # Make sure s3 bucket exists + try: + s3_fs.mkdir(bucket) + except FileExistsError: + pass + + s3_fs.put_file(rfile, os.path.join(bucket, object)) + + def save_cl_file_with_a(tmp_path): """Create netcdf file for ``cl`` with ``a`` coordinate.""" save_path = tmp_path / 'common_cl_a.nc' @@ -92,6 +106,9 @@ def save_cl_file_with_a(tmp_path): dataset = Dataset(nc_path, mode='w') create_hyb_pres_file_with_a(dataset, 'cl') dataset.close() + if USE_S3: + object = os.path.basename(nc_path) + upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, object, nc_path) print(f"Saved {save_path}") return str(save_path)