diff --git a/api/Dockerfile b/api/Dockerfile index a2cfea0..f0da2fc 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,4 +1,4 @@ -ARG REGISTRY=rg.nl-ams.scw.cloud/geodds-production +ARG REGISTRY=rg.nl-ams.scw.cloud/geogeolake-production ARG TAG=latest FROM $REGISTRY/geolake-datastore:$TAG WORKDIR /app diff --git a/api/app/auth/backend.py b/api/app/auth/backend.py index c172b58..885ee55 100644 --- a/api/app/auth/backend.py +++ b/api/app/auth/backend.py @@ -9,11 +9,11 @@ from dbmanager.dbmanager import DBManager import exceptions as exc -from auth.models import DDSUser +from auth.models import GeoLakeUser from auth import scopes -class DDSAuthenticationBackend(AuthenticationBackend): +class GeoLakeAuthenticationBackend(AuthenticationBackend): """Class managing authentication and authorization""" async def authenticate(self, conn): @@ -25,7 +25,7 @@ async def authenticate(self, conn): def _manage_user_token_auth(self, user_token: str): try: user_id, api_key = self.get_authorization_scheme_param(user_token) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() user_dto = DBManager().get_user_details(user_id) eligible_scopes = [scopes.AUTHENTICATED] + self._get_scopes_for_user( @@ -35,7 +35,7 @@ def _manage_user_token_auth(self, user_token: str): raise exc.AuthenticationFailed( user_dto ).wrap_around_http_exception() - return AuthCredentials(eligible_scopes), DDSUser(username=user_id) + return AuthCredentials(eligible_scopes), GeoLakeUser(username=user_id) def _get_scopes_for_user(self, user_dto) -> list[str]: if user_dto is None: diff --git a/api/app/auth/manager.py b/api/app/auth/manager.py index 02bf686..4d09791 100644 --- a/api/app/auth/manager.py +++ b/api/app/auth/manager.py @@ -1,10 +1,10 @@ """Module with access/authentication functions""" from typing import Optional -from utils.api_logging import get_dds_logger +from utils.api_logging import get_geolake_logger import exceptions as exc -log = get_dds_logger(__name__) +log = get_geolake_logger(__name__) def is_role_eligible_for_product( diff --git a/api/app/auth/models.py b/api/app/auth/models.py index bff896f..e4c9936 100644 --- a/api/app/auth/models.py +++ b/api/app/auth/models.py @@ -2,7 +2,7 @@ from starlette.authentication import SimpleUser -class DDSUser(SimpleUser): +class GeoLakeUser(SimpleUser): """Immutable class containing information about the authenticated user""" def __init__(self, username: str) -> None: @@ -13,7 +13,7 @@ def id(self): return self.username def __eq__(self, other) -> bool: - if not isinstance(other, DDSUser): + if not isinstance(other, GeoLakeUser): return False if self.username == other.username: return True @@ -23,7 +23,7 @@ def __ne__(self, other): return self != other def __repr__(self): - return f"" + return f"" def __delattr__(self, name): if getattr(self, name, None) is not None: diff --git a/api/app/callbacks/on_startup.py b/api/app/callbacks/on_startup.py index ec883d3..4e25072 100644 --- a/api/app/callbacks/on_startup.py +++ b/api/app/callbacks/on_startup.py @@ -1,9 +1,9 @@ """Module with functions call during API server startup""" -from utils.api_logging import get_dds_logger +from utils.api_logging import get_geolake_logger from datastore.datastore import Datastore -log = get_dds_logger(__name__) +log = get_geolake_logger(__name__) def _load_cache() -> None: diff --git a/api/app/endpoint_handlers/dataset.py b/api/app/endpoint_handlers/dataset.py index c03a54b..25e6afb 100644 --- a/api/app/endpoint_handlers/dataset.py +++ b/api/app/endpoint_handlers/dataset.py @@ -13,7 +13,7 @@ from datastore import exception as datastore_exception from utils.metrics import log_execution_time -from utils.api_logging import get_dds_logger +from utils.api_logging import get_geolake_logger from auth.manager import ( is_role_eligible_for_product, ) @@ -23,7 +23,7 @@ from . import request -log = get_dds_logger(__name__) +log = get_geolake_logger(__name__) data_store = Datastore() MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"] diff --git a/api/app/endpoint_handlers/file.py b/api/app/endpoint_handlers/file.py index 04cf562..140975e 100644 --- a/api/app/endpoint_handlers/file.py +++ b/api/app/endpoint_handlers/file.py @@ -4,11 +4,11 @@ from fastapi.responses import FileResponse from dbmanager.dbmanager import DBManager, RequestStatus -from utils.api_logging import get_dds_logger +from utils.api_logging import get_geolake_logger from utils.metrics import log_execution_time import exceptions as exc -log = get_dds_logger(__name__) +log = get_geolake_logger(__name__) @log_execution_time(log) @@ -33,7 +33,7 @@ def download_request_result(request_id: int): Raises ------- RequestNotYetAccomplished - If dds request was not yet finished + If geolake request was not yet finished FileNotFoundError If file was not found """ diff --git a/api/app/endpoint_handlers/request.py b/api/app/endpoint_handlers/request.py index 93a0636..b503f07 100644 --- a/api/app/endpoint_handlers/request.py +++ b/api/app/endpoint_handlers/request.py @@ -1,11 +1,11 @@ """Modules with functions realizing logic for requests-related endpoints""" from dbmanager.dbmanager import DBManager -from utils.api_logging import get_dds_logger +from utils.api_logging import get_geolake_logger from utils.metrics import log_execution_time import exceptions as exc -log = get_dds_logger(__name__) +log = get_geolake_logger(__name__) @log_execution_time(log) diff --git a/api/app/exceptions.py b/api/app/exceptions.py index af4d072..eb19ad7 100644 --- a/api/app/exceptions.py +++ b/api/app/exceptions.py @@ -1,11 +1,11 @@ -"""Module with DDS exceptions definitions""" +"""Module with GeoLake exceptions definitions""" from typing import Optional from fastapi import HTTPException -class BaseDDSException(BaseException): - """Base class for DDS.api exceptions""" +class BaseGeoLakeException(BaseException): + """Base class for GeoLake.api exceptions""" msg: str = "Bad request" code: int = 400 @@ -18,13 +18,13 @@ def wrap_around_http_exception(self) -> HTTPException: ) -class EmptyUserTokenError(BaseDDSException): +class EmptyUserTokenError(BaseGeoLakeException): """Raised if `User-Token` is empty""" msg: str = "User-Token cannot be empty!" -class ImproperUserTokenError(BaseDDSException): +class ImproperUserTokenError(BaseGeoLakeException): """Raised if `User-Token` format is wrong""" msg: str = ( @@ -33,7 +33,7 @@ class ImproperUserTokenError(BaseDDSException): ) -class NoEligibleProductInDatasetError(BaseDDSException): +class NoEligibleProductInDatasetError(BaseGeoLakeException): """No eligible products in the dataset Error""" msg: str = ( @@ -48,7 +48,7 @@ def __init__(self, dataset_id: str, user_roles_names: list[str]) -> None: super().__init__(self.msg) -class MissingKeyInCatalogEntryError(BaseDDSException): +class MissingKeyInCatalogEntryError(BaseGeoLakeException): """Missing key in the catalog entry""" msg: str = ( @@ -60,7 +60,7 @@ def __init__(self, key, dataset): super().__init__(self.msg) -class MaximumAllowedSizeExceededError(BaseDDSException): +class MaximumAllowedSizeExceededError(BaseGeoLakeException): """Estimated size is too big""" msg: str = ( @@ -81,8 +81,8 @@ def __init__( super().__init__(self.msg) -class RequestNotYetAccomplished(BaseDDSException): - """Raised if dds request was not finished yet""" +class RequestNotYetAccomplished(BaseGeoLakeException): + """Raised if geolake request was not finished yet""" msg: str = ( "Request with id: {request_id} does not exist or it is not" @@ -94,7 +94,7 @@ def __init__(self, request_id): super().__init__(self.msg) -class RequestNotFound(BaseDDSException): +class RequestNotFound(BaseGeoLakeException): """If the given request could not be found""" msg: str = "Request with ID '{request_id}' was not found" @@ -104,7 +104,7 @@ def __init__(self, request_id: int) -> None: super().__init__(self.msg) -class RequestStatusNotDone(BaseDDSException): +class RequestStatusNotDone(BaseGeoLakeException): """Raised when the submitted request failed""" msg: str = ( @@ -119,7 +119,7 @@ def __init__(self, request_id, request_status) -> None: super().__init__(self.msg) -class AuthorizationFailed(BaseDDSException): +class AuthorizationFailed(BaseGeoLakeException): """Raised when the user is not authorized for the given resource""" msg: str = "{user} is not authorized for the resource!" @@ -133,7 +133,7 @@ def __init__(self, user_id: Optional[str] = None): super().__init__(self.msg) -class AuthenticationFailed(BaseDDSException): +class AuthenticationFailed(BaseGeoLakeException): """Raised when the key of the provided user differs from the one s tored in the DB""" @@ -145,7 +145,7 @@ def __init__(self, user_id: str): super().__init__(self.msg) -class MissingDatasetError(BaseDDSException): +class MissingDatasetError(BaseGeoLakeException): """Raied if the queried dataset is not present in the catalog""" msg: str = "Dataset '{dataset_id}' does not exist in the catalog!" @@ -155,7 +155,7 @@ def __init__(self, dataset_id: str): super().__init__(self.msg) -class MissingProductError(BaseDDSException): +class MissingProductError(BaseGeoLakeException): """Raised if the requested product is not defined for the dataset""" msg: str = ( @@ -169,7 +169,7 @@ def __init__(self, dataset_id: str, product_id: str): super().__init__(self.msg) -class EmptyDatasetError(BaseDDSException): +class EmptyDatasetError(BaseGeoLakeException): """The size of the requested dataset is zero""" msg: str = "The resulting dataset '{dataset_id}.{product_id}' is empty" @@ -181,7 +181,7 @@ def __init__(self, dataset_id, product_id): ) super().__init__(self.msg) -class ProductRetrievingError(BaseDDSException): +class ProductRetrievingError(BaseGeoLakeException): """Retrieving of the product failed.""" msg: str = "Retrieving of the product '{dataset_id}.{product_id}' failed with the status {status}" diff --git a/api/app/main.py b/api/app/main.py index 2084394..75f310f 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -1,4 +1,4 @@ -"""Main module with dekube-dds API endpoints defined""" +"""Main module with dekube-geolake API endpoints defined""" __version__ = "2.0" import os from typing import Optional @@ -21,14 +21,14 @@ from intake_geokube.queries.workflow import Workflow from intake_geokube.queries.geoquery import GeoQuery -from utils.api_logging import get_dds_logger +from utils.api_logging import get_geolake_logger import exceptions as exc from endpoint_handlers import ( dataset_handler, file_handler, request_handler, ) -from auth.backend import DDSAuthenticationBackend +from auth.backend import GeoLakeAuthenticationBackend from callbacks import all_onstartup_callbacks from encoders import extend_json_encoders from const import venv, tags @@ -49,14 +49,14 @@ def map_to_geoquery( format_args=format_kwargs, format=format) return query -logger = get_dds_logger(__name__) +logger = get_geolake_logger(__name__) # ======== JSON encoders extension ========= # extend_json_encoders() app = FastAPI( - title="geokube-dds API", - description="REST API for geokube-dds", + title="geokube-geolake API", + description="REST API for geokube-geolake", version=__version__, contact={ "name": "geokube Contributors", @@ -72,7 +72,7 @@ def map_to_geoquery( # ======== Authentication backend ========= # app.add_middleware( - AuthenticationMiddleware, backend=DDSAuthenticationBackend() + AuthenticationMiddleware, backend=GeoLakeAuthenticationBackend() ) # ======== CORS ========= # @@ -107,9 +107,9 @@ def map_to_geoquery( # ======== Endpoints definitions ========= # @app.get("/", tags=[tags.BASIC]) -async def dds_info(): - """Return current version of the DDS API""" - return f"DDS API {__version__}" +async def geolake_info(): + """Return current version of the GeoLake API""" + return f"GeoLake API {__version__}" @app.get("/datasets", tags=[tags.DATASET]) @@ -123,7 +123,7 @@ async def get_datasets(request: Request): return dataset_handler.get_datasets( user_roles_names=request.auth.scopes ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -145,7 +145,7 @@ async def get_first_product_details( user_roles_names=request.auth.scopes, dataset_id=dataset_id, ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -169,7 +169,7 @@ async def get_product_details( dataset_id=dataset_id, product_id=product_id, ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @app.get("/datasets/{dataset_id}/{product_id}/map", tags=[tags.DATASET]) @@ -219,7 +219,7 @@ async def get_map( product_id=product_id, query=query ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @app.get("/datasets/{dataset_id}/{product_id}/items/{feature_id}", tags=[tags.DATASET]) @@ -264,7 +264,7 @@ async def get_feature( product_id=product_id, query=query ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @app.get("/datasets/{dataset_id}/{product_id}/metadata", tags=[tags.DATASET]) @@ -285,7 +285,7 @@ async def get_metadata( return dataset_handler.get_metadata( dataset_id=dataset_id, product_id=product_id ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -312,7 +312,7 @@ async def estimate( query=query, unit=unit, ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -339,7 +339,7 @@ async def query( product_id=product_id, query=query, ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -360,7 +360,7 @@ async def workflow( user_id=request.user.id, workflow=tasks, ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -376,7 +376,7 @@ async def get_requests( app.state.api_http_requests_total.inc({"route": "GET /requests"}) try: return request_handler.get_requests(request.user.id) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -398,7 +398,7 @@ async def get_request_status( return request_handler.get_request_status( user_id=request.user.id, request_id=request_id ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -420,7 +420,7 @@ async def get_request_resulting_size( return request_handler.get_request_resulting_size( request_id=request_id ) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -440,7 +440,7 @@ async def get_request_uri( ) try: return request_handler.get_request_uri(request_id=request_id) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err @@ -460,7 +460,7 @@ async def download_request_result( ) try: return file_handler.download_request_result(request_id=request_id) - except exc.BaseDDSException as err: + except exc.BaseGeoLakeException as err: raise err.wrap_around_http_exception() from err except FileNotFoundError as err: raise HTTPException( diff --git a/api/app/validation.py b/api/app/validation.py index 51bdbc1..150b173 100644 --- a/api/app/validation.py +++ b/api/app/validation.py @@ -1,12 +1,12 @@ from datastore.datastore import Datastore -from utils.api_logging import get_dds_logger +from utils.api_logging import get_geolake_logger from decorators_factory import assert_parameters_are_defined, bind_arguments from functools import wraps from inspect import signature import exceptions as exc -log = get_dds_logger(__name__) +log = get_geolake_logger(__name__) def assert_product_exists(func): diff --git a/datastore/utils/api_logging.py b/datastore/utils/api_logging.py index 58d148d..ec49291 100644 --- a/datastore/utils/api_logging.py +++ b/datastore/utils/api_logging.py @@ -3,11 +3,11 @@ import logging as default_logging -def get_dds_logger( +def get_geolake_logger( name: str, level: Literal["debug", "info", "warning", "error", "critical"] = "info", ): - """Get DDS logger with the expected format, handlers and formatter. + """Get GeoLake logger with the expected format, handlers and formatter. Parameters ---------- diff --git a/docker-compose.yaml b/docker-compose.yaml index be85d41..01bc7b8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -41,6 +41,6 @@ services: ports: - 5432:5432 environment: - POSTGRES_DB: dds - POSTGRES_USER: dds - POSTGRES_PASSWORD: dds \ No newline at end of file + POSTGRES_DB: geolake + POSTGRES_USER: geolake + POSTGRES_PASSWORD: geolake \ No newline at end of file diff --git a/docs/docs/about.md b/docs/about.md similarity index 100% rename from docs/docs/about.md rename to docs/about.md diff --git a/docs/api.md b/docs/api.md new file mode 100644 index 0000000..2619ae0 --- /dev/null +++ b/docs/api.md @@ -0,0 +1,9 @@ +::: api.app.main + +::: api.app.endpoint_handlers.dataset + +::: api.app.endpoint_handlers.file + +::: api.app.endpoint_handlers.request + + diff --git a/docs/base.md b/docs/base.md new file mode 100644 index 0000000..99fdf4c --- /dev/null +++ b/docs/base.md @@ -0,0 +1,2 @@ +::: drivers.intake_geokube.base + diff --git a/docs/docs/broker.md b/docs/broker.md similarity index 100% rename from docs/docs/broker.md rename to docs/broker.md diff --git a/docs/datastore.md b/docs/datastore.md new file mode 100644 index 0000000..3e30984 --- /dev/null +++ b/docs/datastore.md @@ -0,0 +1,3 @@ +::: datastore.datastore + +::: dbmanager.dbmanager diff --git a/docs/docs/api.md b/docs/docs/api.md deleted file mode 100644 index 836418a..0000000 --- a/docs/docs/api.md +++ /dev/null @@ -1,5 +0,0 @@ -# REST API - -## Description - -## Docstring \ No newline at end of file diff --git a/docs/docs/drivers.md b/docs/docs/drivers.md deleted file mode 100644 index a8a0052..0000000 --- a/docs/docs/drivers.md +++ /dev/null @@ -1,5 +0,0 @@ -# Drivers - -## Description - -## Docstring \ No newline at end of file diff --git a/docs/docs/executor.md b/docs/docs/executor.md deleted file mode 100644 index a003890..0000000 --- a/docs/docs/executor.md +++ /dev/null @@ -1,5 +0,0 @@ -# Executor - -## Description - -## Docstring \ No newline at end of file diff --git a/docs/docs/img/logo.svg b/docs/docs/img/logo.svg deleted file mode 100644 index 24aa2af..0000000 --- a/docs/docs/img/logo.svg +++ /dev/null @@ -1,3 +0,0 @@ - - -
L a k e
L a k e
eo
eo
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/executor.md b/docs/executor.md new file mode 100644 index 0000000..c14a3a4 --- /dev/null +++ b/docs/executor.md @@ -0,0 +1,5 @@ +::: executor.app.main + +::: executor.app.messaging + +::: executor.app.meta \ No newline at end of file diff --git a/docs/docs/img/favicon.svg b/docs/img/favicon.svg similarity index 100% rename from docs/docs/img/favicon.svg rename to docs/img/favicon.svg diff --git a/docs/img/logo.svg b/docs/img/logo.svg new file mode 100644 index 0000000..e7bc09c --- /dev/null +++ b/docs/img/logo.svg @@ -0,0 +1,3 @@ + + +
L a k e
L a k e
eo
eo
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/docs/index.md b/docs/index.md similarity index 100% rename from docs/docs/index.md rename to docs/index.md diff --git a/docs/docs/installation.md b/docs/installation.md similarity index 100% rename from docs/docs/installation.md rename to docs/installation.md diff --git a/docs/iot.md b/docs/iot.md new file mode 100644 index 0000000..aaab170 --- /dev/null +++ b/docs/iot.md @@ -0,0 +1,2 @@ +::: drivers.intake_geokube.iot.driver + diff --git a/docs/netcdf.md b/docs/netcdf.md new file mode 100644 index 0000000..8aaf9a9 --- /dev/null +++ b/docs/netcdf.md @@ -0,0 +1 @@ +::: drivers.intake_geokube.netcdf.driver diff --git a/docs/queries.md b/docs/queries.md new file mode 100644 index 0000000..51f8ecb --- /dev/null +++ b/docs/queries.md @@ -0,0 +1,4 @@ +::: drivers.intake_geokube.queries.geoquery + +::: drivers.intake_geokube.queries.workflow + diff --git a/docs/sentinel.md b/docs/sentinel.md new file mode 100644 index 0000000..db04d7b --- /dev/null +++ b/docs/sentinel.md @@ -0,0 +1,7 @@ +::: drivers.intake_geokube.sentinel.driver + + +::: drivers.intake_geokube.sentinel.odata_builder + + +::: drivers.intake_geokube.sentinel.auth diff --git a/docs/wrf.md b/docs/wrf.md new file mode 100644 index 0000000..2c20d04 --- /dev/null +++ b/docs/wrf.md @@ -0,0 +1,2 @@ +::: drivers.intake_geokube.wrf.driver + diff --git a/drivers/intake_geokube/base.py b/drivers/intake_geokube/base.py index e070427..efbefd5 100644 --- a/drivers/intake_geokube/base.py +++ b/drivers/intake_geokube/base.py @@ -16,7 +16,7 @@ class AbstractBaseDriver(ABC, DataSourceBase): - """Abstract base class for all DDS-related drivers.""" + """Abstract base class for all GeoLake-related drivers.""" name: str = _NOT_SET version: str = _NOT_SET @@ -40,10 +40,10 @@ def __init__(self, *, metadata: dict) -> None: @classmethod def __configure_logger(cls) -> logging.Logger: - log = logging.getLogger(f"dds.intake.{cls.__name__}") - level = os.environ.get("DDS_LOG_LEVEL", "INFO") + log = logging.getLogger(f"geolake.intake.{cls.__name__}") + level = os.environ.get("GeoLake_LOG_LEVEL", "INFO") logformat = os.environ.get( - "DDS_LOG_FORMAT", + "GeoLake_LOG_FORMAT", "%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s", ) log.setLevel(level) # type: ignore[arg-type] @@ -76,13 +76,19 @@ def process(self, query: GeoQuery) -> Any: Parameters ---------- - query: GeoQuery + query: `queries.GeoQuery` A query to use for data processing Results ------- res: Any Result of `query` processing + + Examples + -------- + ```python + >>> data = catalog['dataset']['product'].process(query) + ``` """ data_ = self.read() return self._process_geokube_dataset(data_, query=query, compute=True) diff --git a/drivers/intake_geokube/iot/driver.py b/drivers/intake_geokube/iot/driver.py index 93c52cd..5d11dc5 100644 --- a/drivers/intake_geokube/iot/driver.py +++ b/drivers/intake_geokube/iot/driver.py @@ -101,7 +101,12 @@ def _get_schema(self): return {"stream": str(self.stream)} def read(self) -> streamz.dataframe.core.DataFrame: - """Read IoT data.""" + """Read IoT data. + + Returns + ------- + stream : `streamz.dataframe.DataFrame` + """ self.log.info("reading stream...") self._get_schema() return self.stream @@ -123,7 +128,7 @@ def process(self, query: GeoQuery) -> streamz.dataframe.core.DataFrame: Returns ------- - stream : streamz.dataframe.core.DataFrame + stream : streamz.dataframe.DataFrame A DataFrame object with streamed content """ df = d[0] diff --git a/drivers/intake_geokube/netcdf/driver.py b/drivers/intake_geokube/netcdf/driver.py index e29cbfa..ac9e61f 100644 --- a/drivers/intake_geokube/netcdf/driver.py +++ b/drivers/intake_geokube/netcdf/driver.py @@ -1,4 +1,4 @@ -"""NetCDF driver for DDS.""" +"""NetCDF driver for GeoLake.""" from geokube import open_datacube, open_dataset from geokube.core.datacube import DataCube @@ -48,7 +48,22 @@ def _arguments(self) -> dict: } | self.xarray_kwargs def read(self) -> Dataset | DataCube: - """Read netCDF.""" + """Read netCDF into geokube.Dataset or geokube.Datacube. + + If `pattern` is set for a product, the method would return + a `geokube.Dataset` with `dask.Delayed` objects instead of + `geokube.DataCube`s. + + Returns + ------- + cube : `geokube.Dataset` or `geokube.DataCube` + + Examples + -------- + ```python + >>> data = catalog['era5']['reanalysis'].read() + ``` + """ if self.pattern: return open_dataset( pattern=self.pattern, delay_read_cubes=True, **self._arguments @@ -56,7 +71,20 @@ def read(self) -> Dataset | DataCube: return open_datacube(**self._arguments) def load(self) -> Dataset | DataCube: - """Load netCDF.""" + """Load netCDF into geokube.Dataset or geokube.Datacube. + + All cubes would be computed on loading. + + Returns + ------- + cube : `geokube.Dataset` or `geokube.DataCube` + + Examples + -------- + ```python + >>> data = catalog['era5']['reanalysis'].read() + ``` + """ if self.pattern: return open_dataset( pattern=self.pattern, delay_read_cubes=False, **self._arguments diff --git a/drivers/intake_geokube/sentinel/driver.py b/drivers/intake_geokube/sentinel/driver.py index 4895103..88875c7 100644 --- a/drivers/intake_geokube/sentinel/driver.py +++ b/drivers/intake_geokube/sentinel/driver.py @@ -323,7 +323,18 @@ def load(self) -> NoReturn: ) def process(self, query: GeoQuery) -> Dataset: - """Process query for sentinel data.""" + """Process sentinel data according to the `query`. + + Returns + ------- + cube : `geokube.Dataset` + + Examples + -------- + ```python + >>> data = catalog['sentinel']['prod_name'].process(query) + ``` + """ self.log.info("builder odata request based on passed geoquery...") req = self._build_odata_from_geoquery(query) self.log.info("downloading data...") diff --git a/drivers/intake_geokube/sentinel/odata_builder.py b/drivers/intake_geokube/sentinel/odata_builder.py index 4036810..64bd0a7 100644 --- a/drivers/intake_geokube/sentinel/odata_builder.py +++ b/drivers/intake_geokube/sentinel/odata_builder.py @@ -26,7 +26,17 @@ def datetime_to_isoformat(date: str | datetime) -> str: - """Convert string of datetime object to ISO datetime string.""" + """Convert string of datetime object to ISO datetime string. + + Parameters + ---------- + data : `str` or `datetime` + + Returns + ------- + date_str : str + A ISO-compliant datetime format + """ if isinstance(date, str): try: value = pd.to_datetime([date]).item().isoformat() diff --git a/drivers/intake_geokube/wrf/driver.py b/drivers/intake_geokube/wrf/driver.py index d819760..018748e 100644 --- a/drivers/intake_geokube/wrf/driver.py +++ b/drivers/intake_geokube/wrf/driver.py @@ -1,4 +1,4 @@ -"""WRF driver for DDS.""" +"""WRF driver for GeoLake.""" from functools import partial from typing import Any diff --git a/drivers/pyproject.toml b/drivers/pyproject.toml index 2f0a6d5..ba82fc4 100644 --- a/drivers/pyproject.toml +++ b/drivers/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "geolake-drivers" -description = "opengeokube DDS driver." +description = "opengeokube GeoLake driver." requires-python = ">=3.10" readme = "README.md" license = {file = "LICENSE"} diff --git a/executor/Dockerfile b/executor/Dockerfile index db3cebb..5aed976 100644 --- a/executor/Dockerfile +++ b/executor/Dockerfile @@ -1,4 +1,4 @@ -ARG REGISTRY=rg.nl-ams.scw.cloud/geodds-production +ARG REGISTRY=rg.nl-ams.scw.cloud/geogeolake-production ARG TAG=latest ARG SENTINEL_USERNAME=... ARG SENTINEL_PASSWORD=... diff --git a/executor/app/main.py b/executor/app/main.py index 35b90fe..4bcd076 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -1,3 +1,4 @@ +"""Module with executor logic.""" import os import time import datetime @@ -81,7 +82,7 @@ def rcp85_filename_condition(kube: DataCube, message: Message) -> bool: def get_history_message(): return ( - f"Generated by CMCC DDS version 0.9.0 {str(datetime.datetime.now())}" + f"Generated by CMCC GeoLake version 0.9.0 {str(datetime.datetime.now())}" ) @@ -90,6 +91,17 @@ def persist_datacube( message: Message, base_path: str | os.PathLike, ) -> str | os.PathLike: + """Save `geokube.DataCube` given the `message` and `base_path`. + + Parameters + ---------- + kube : `geokube.DataCube` + A data cube to save + message : `Message` + A message with details like dataset or product ID + base_path : `str` + Base path to save + """ if rcp85_filename_condition(kube, message): path = get_file_name_for_climate_downscaled(kube, message) else: @@ -136,6 +148,19 @@ def persist_dataset( message: Message, base_path: str | os.PathLike, ): + """Save `geokube.Dataset` given the `message` and `base_path`. + + Under the hood uses `persist_datacube` function. + + Parameters + ---------- + kube : `geokube.Dataset` + A data cube to save + message : `Message` + A message with details like dataset or product ID + base_path : `str` + Base path to save + """ def _get_attr_comb(dataframe_item, attrs): return "_".join([dataframe_item[attr_name] for attr_name in attrs]) @@ -214,6 +239,15 @@ def _persist_single_datacube(dataframe_item, base_path, format, format_args=None def process(message: Message, compute: bool): + """Process a message and compute (if needed). + + Parameters + ---------- + message : `Message` + A message to process + compute : bool + A flag to indicate if result should be computed + """ res_path = os.path.join(_BASE_DOWNLOAD_PATH, message.request_id) os.makedirs(res_path, exist_ok=True) match message.type: @@ -247,6 +281,7 @@ def process(message: Message, compute: bool): class Executor(metaclass=LoggableMeta): + """Executor class definition""" _LOG = logging.getLogger("geokube.Executor") def __init__(self, broker, store_path): @@ -259,6 +294,13 @@ def __init__(self, broker, store_path): self._db = DBManager() def create_dask_cluster(self, dask_cluster_opts: dict = None): + """Create a Dask cluster with given options. + + Parameters + ---------- + dask_cluster_opts : optional `dict` + A dictionary with cluster parameters. + """ if dask_cluster_opts is None: dask_cluster_opts = {} dask_cluster_opts["scheduler_port"] = int( @@ -292,6 +334,16 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): self._nanny = Nanny(self._dask_client.cluster.scheduler.address) def maybe_restart_cluster(self, status: RequestStatus): + """Restart the run Dask cluster when needed. + + Resturt the cluster if request status was set to `TIMEOUT` or + the cluster died. + + Parameters + ---------- + statis : `RequestStatus` + A status of a request being processed by the cluster. + """ if status is RequestStatus.TIMEOUT: self._LOG.info("recreating the cluster due to timeout") self._dask_client.cluster.close() @@ -311,9 +363,7 @@ def maybe_restart_cluster(self, status: RequestStatus): self.create_dask_cluster() def ack_message(self, channel, delivery_tag): - """Note that `channel` must be the same pika channel instance via which - the message being ACKed was retrieved (AMQP protocol constraint). - """ + """Acknowledge the broker message.""" if channel.is_open: channel.basic_ack(delivery_tag) else: @@ -329,6 +379,23 @@ def retry_until_timeout( retries: int = 30, sleep_time: int = 10, ): + """Retry processing the `future` object. + + Parameters + ---------- + future : `Future` + A future object to being computed + message : `Message` + A message object + retries : `int`, default `30` + A number of trials + sleep_time : `int`, default `10` + A number of seconds to sleep between trials + + Returns + ------- + result : `tuple` of (location_path, status, fail_reason) + """ assert retries is not None, "`retries` cannot be `None`" assert sleep_time is not None, "`sleep_time` cannot be `None`" status = fail_reason = location_path = None diff --git a/executor/app/messaging.py b/executor/app/messaging.py index 37ce25a..f135298 100644 --- a/executor/app/messaging.py +++ b/executor/app/messaging.py @@ -1,3 +1,4 @@ +"""Module contains definitions of messages in the executor.""" import os import logging from enum import Enum @@ -14,6 +15,7 @@ class MessageType(Enum): class Message: + """Message class definition.""" _LOG = logging.getLogger("geokube.Message") request_id: int @@ -23,6 +25,13 @@ class Message: content: GeoQuery | Workflow def __init__(self, load: bytes) -> None: + """Create `Message` instances. + + Parameters + ---------- + load : `bytes` + Bytes containing message load + """ self.request_id, msg_type, *query = load.decode().split( MESSAGE_SEPARATOR ) diff --git a/docs/mkdocs.yaml b/mkdocs.yaml similarity index 80% rename from docs/mkdocs.yaml rename to mkdocs.yaml index b2a8154..c192543 100644 --- a/docs/mkdocs.yaml +++ b/mkdocs.yaml @@ -36,8 +36,21 @@ nav: - GeoLake: index.md - installation.md - References: - - drivers.md + - queries.md + - Drivers: + - base.md + - netcdf.md + - sentinel.md + - iot.md + - wrf.md + - datastore.md - broker.md - api.md - executor.md -- about.md \ No newline at end of file +- about.md + +plugins: +- mkdocstrings: + handlers: + python: + paths: [drivers, executor, api, datastore] \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml deleted file mode 100644 index c97182f..0000000 --- a/mkdocs.yml +++ /dev/null @@ -1 +0,0 @@ -site_name: My Docs diff --git a/resources/catalogs/external/e-obs.yaml b/resources/catalogs/external/e-obs.yaml index 2dc4a49..a35d853 100644 --- a/resources/catalogs/external/e-obs.yaml +++ b/resources/catalogs/external/e-obs.yaml @@ -1,7 +1,7 @@ metadata: description: >- E-OBS is a daily gridded land-only observational dataset over Europe. The blended time series from the station network of the European Climate Assessment & Dataset (ECA&D) project form the basis for the E-OBS gridded dataset. All station data are sourced directly from the European National Meteorological and Hydrological Services (NMHSs) or other data holding institutions. For a considerable number of countries the number of stations used is the complete national network and therefore much more dense than the station network that is routinely shared among NMHSs (which is the basis of other gridded datasets). The density of stations gradually increases through collaborations with NMHSs within European research contracts. Initially, in 2008, this gridded dataset was developed to provide validation for the suite of Europe-wide climate model simulations produced as part of the European Union ENSEMBLES project. While E-OBS remains an important dataset for model validation, it is also used more generally for monitoring the climate across Europe, particularly with regard to the assessment of the magnitude and frequency of daily extremes. The position of E-OBS is unique in Europe because of the relatively high spatial horizontal grid spacing, the daily resolution of the dataset, the provision of multiple variables and the length of the dataset. Finally, the station data on which E-OBS is based are available through the ECA&D webpages (where the owner of the data has given permission to do so). In these respects it contrasts with other datasets. The dataset is daily, meaning the observations cover 24 hours per time step. The exact 24-hour period can be different per region. The reason for this is that some data providers measure between midnight to midnight while others might measure from morning to morning. Since E-OBS is an observational dataset, no attempts have been made to adjust time series for this 24-hour offset. It is made sure, where known, that the largest part of the measured 24-hour period corresponds to the day attached to the time step in E-OBS (and ECA&D). - contact: dds-support@cmcc.it + contact: geolake-support@cmcc.it label: E-OBS daily gridded meteorological data for Europe from 1950 to present image: https://diasfiles.cmccos.it/images/e-obs.png doi: https://doi.org/10.24381/cds.151d3ec6 @@ -19,7 +19,7 @@ sources: args: pattern: 'e-obs-ensemble-{ensemble}-var_{var}-resolution_{resolution}-version_{version}.0e.nc' path: '/code/app/resources/netcdfs/e-obs-ensemble-mean-var_*.nc' - id_pattern: '{__ddsapi_name}' + id_pattern: '{__geolakeapi_name}' delay_read_cubes: false metadata_caching: true metadata_cache_path: 'e-obs.cache'