Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial support for S3 active storage #77

Merged
merged 2 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions activestorage/active.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import numpy as np
import pathlib

#FIXME: Consider using h5py throughout, for more generality
from netCDF4 import Dataset
from zarr.indexing import (
OrthogonalIndexer,
)
from activestorage.config import *
markgoddard marked this conversation as resolved.
Show resolved Hide resolved
from activestorage.s3 import reduce_chunk as s3_reduce_chunk
from activestorage.storage import reduce_chunk
from activestorage import netcdf_to_zarr as nz

Expand Down Expand Up @@ -333,13 +336,24 @@ def _process_chunk(self, fsref, chunk_coords, chunk_selection, out, counts,
key = f"{self.ncvar}/{coord}"
rfile, offset, size = tuple(fsref[key])

# note there is an ongoing discussion about this interface, and what it returns
# see https://github.com/valeriupredoi/PyActiveStorage/issues/33
# so neither the returned data or the interface should be considered stable
# although we will version changes.
tmp, count = reduce_chunk(rfile, offset, size, compressor, filters, missing,
self.zds._dtype, self.zds._chunks, self.zds._order,
chunk_selection, method=self.method)
if USE_S3:
markgoddard marked this conversation as resolved.
Show resolved Hide resolved
object = os.path.basename(rfile)
tmp, count = s3_reduce_chunk(S3_ACTIVE_STORAGE_URL, S3_ACCESS_KEY,
S3_SECRET_KEY, S3_URL, S3_BUCKET,
object, offset, size,
compressor, filters, missing,
self.zds._dtype, self.zds._chunks,
self.zds._order, chunk_selection,
operation=self._method)
else:
# note there is an ongoing discussion about this interface, and what it returns
# see https://github.com/valeriupredoi/PyActiveStorage/issues/33
# so neither the returned data or the interface should be considered stable
# although we will version changes.
tmp, count = reduce_chunk(rfile, offset, size, compressor, filters,
missing, self.zds._dtype,
self.zds._chunks, self.zds._order,
chunk_selection, method=self.method)

if self.method is not None:
out.append(tmp)
Expand Down
19 changes: 19 additions & 0 deletions activestorage/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This file contains configuration for PyActiveStorage.

# Whether to use the S3 Active Storage interface.
USE_S3 = False

# URL of S3 Active Storage server.
S3_ACTIVE_STORAGE_URL = "http://localhost:8080"

# URL of S3 object store.
S3_URL = "http://localhost:9000"

# S3 access key / username.
S3_ACCESS_KEY = "minioadmin"

# S3 secret key / password.
S3_SECRET_KEY = "minioadmin"

# S3 bucket.
S3_BUCKET = "pyactivestorage"
127 changes: 127 additions & 0 deletions activestorage/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""S3 active storage module."""

import http.client
import json
import requests
import numpy as np
import sys


def reduce_chunk(server, username, password, source, bucket, object, offset,
size, compression, filters, missing, dtype, shape, order,
chunk_selection, operation):
"""Perform a reduction on a chunk using S3 Active Storage.

:param server: S3 active storage server URL
:param username: S3 username / access key
:param password: S3 password / secret key
:param source: S3 URL
:param bucket: S3 bucket
:param object: S3 object
:param offset: offset of data in object
:param size: size of data in object
:param compression: name of compression, unsupported
:param filters: name of filters, unsupported
:param missing: 4-tuple describing missing data, unsupported
:param dtype: data type name
:param shape: will be a tuple, something like (3,3,1), this is the
dimensionality of the chunk itself
:param order: typically 'C' for c-type ordering
:param chunk_selection: N-tuple where N is the length of `shape`, and each
item is an integer or slice. e.g. (slice(0, 2,
1), slice(1, 3, 1), slice(0, 1, 1))
this defines the part of the chunk which is to be
obtained or operated upon.
:param operation: name of operation to perform
:returns: the reduced data as a numpy array or scalar
:raises S3ActiveStorageError: if the request to S3 active storage fails
"""

if compression is not None:
raise NotImplementedError("Compression is not yet supported!")
if filters is not None:
raise NotImplementedError("Filters are not yet supported!")

request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection)
api_operation = "sum" if operation == "mean" else operation or "select"
url = f'{server}/v1/{api_operation}/'
response = request(url, username, password, request_data)

if response.ok:
# FIXME: Return count from mean
result = decode_result(response)
if operation == "mean":
count = reduce_chunk(server, username, password, source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, "count")[0]
else:
count = None
return result, count
else:
decode_and_raise_error(response)


def encode_selection(selection):
"""Encode a chunk selection in a JSON-compatible format."""
def encode_slice(s):
if isinstance(s, slice):
return [s.start, s.stop, s.step]
else:
# Integer - select single value
return [s, s + 1, 1]

return [encode_slice(s) for s in selection]


def build_request_data(source: str, bucket: str, object: str, offset: int,
size: int, compression, filters, missing, dtype, shape,
order, selection) -> dict:
"""Build request data for S3 active storage API."""
# TODO: compression, filters, missing
request_data = {
'source': source,
'bucket': bucket,
'object': object,
'dtype': dtype.name,
'offset': offset,
'size': size,
'order': order,
}
if shape:
request_data["shape"] = shape
if selection:
request_data["selection"] = encode_selection(selection)
return {k: v for k, v in request_data.items() if v is not None}


def request(url: str, username: str, password: str, request_data: dict):
"""Make a request to an S3 active storage API."""
response = requests.post(
url,
json=request_data,
auth=(username, password)
)
return response


def decode_result(response):
"""Decode a successful response, return as a numpy array or scalar."""
dtype = response.headers['x-activestorage-dtype']
shape = json.loads(response.headers['x-activestorage-shape'])
result = np.frombuffer(response.content, dtype=dtype)
result = result.reshape(shape)
return result


class S3ActiveStorageError(Exception):
"""Exception for S3 Active Storage failures."""

def __init__(self, status_code, error):
super(S3ActiveStorageError, self).__init__(f"S3 Active Storage error: HTTP {status_code}: {error}")


def decode_and_raise_error(response):
"""Decode an error response and raise S3ActiveStorageError."""
try:
error = json.dumps(response.json())
raise S3ActiveStorageError(response.status_code, error)
markgoddard marked this conversation as resolved.
Show resolved Hide resolved
except requests.exceptions.JSONDecodeError as exc:
raise S3ActiveStorageError(response.status_code, "-") from exc
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- netcdf4
- numpy
- pip !=21.3
- s3fs
# pin Zarr to avoid using old KVStore interface
# see github.com/zarr-developers/zarr-python/issues/1362
- zarr >=2.13.6 # KVStore to FSStore
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'kerchunk',
'netcdf4',
'numpy',
's3fs',
# pin Zarr to use new FSStore instead of KVStore
'zarr>=2.13.3', # github.com/zarr-developers/zarr-python/issues/1362
# for testing
Expand Down
17 changes: 17 additions & 0 deletions tests/test_bigger_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import numpy as np
from netCDF4 import Dataset
from pathlib import Path
import s3fs

from activestorage.active import Active
from activestorage.config import *


@pytest.fixture
Expand Down Expand Up @@ -85,13 +87,28 @@ def create_hyb_pres_file_with_a(dataset, short_name):
'p0: p0 a: a_bnds b: b_bnds ps: ps')


def upload_to_s3(server, username, password, bucket, object, rfile):
"""Upload a file to an S3 object store."""
s3_fs = s3fs.S3FileSystem(key=username, secret=password, client_kwargs={'endpoint_url': server})
# Make sure s3 bucket exists
try:
s3_fs.mkdir(bucket)
except FileExistsError:
pass

s3_fs.put_file(rfile, os.path.join(bucket, object))


def save_cl_file_with_a(tmp_path):
"""Create netcdf file for ``cl`` with ``a`` coordinate."""
save_path = tmp_path / 'common_cl_a.nc'
nc_path = os.path.join(save_path)
dataset = Dataset(nc_path, mode='w')
create_hyb_pres_file_with_a(dataset, 'cl')
dataset.close()
if USE_S3:
object = os.path.basename(nc_path)
upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, object, nc_path)
print(f"Saved {save_path}")
return str(save_path)

Expand Down