diff --git a/CHANGELOG.md b/CHANGELOG.md index cfb88802..17027b00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,8 +21,13 @@ and start a new "In Progress" section above it. ## In progress + +## 0.120.0 + - mask: also apply at load time when resample_spatial is used - NDVI process: correctly handle band dimension as part of dry run +- Introduce support for user job pagination ([#332](https://github.com/Open-EO/openeo-python-driver/issues/332)) + ## 0.119.0 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 2393ac8b..63914367 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.119.0a1" +__version__ = "0.120.0a1" diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index 3a46255b..e0585df1 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -13,13 +13,14 @@ import json import logging import dataclasses - +import inspect import sys from datetime import datetime, timedelta from pathlib import Path from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable import flask +import werkzeug.datastructures import openeo_driver.util.view_helpers from openeo.capabilities import ComparableVersion @@ -382,6 +383,31 @@ class BatchJobResultMetadata: # TODO: more fields +class JobListing: + """ + Basic container/interface for user job listings, + (e.g. including pagination) + """ + + # TODO: just implement as frozen dataclass, or does that conflict with later need to subclass? + __slots__ = ["_jobs", "_next_parameters"] + + def __init__(self, jobs: List[BatchJobMetadata], next_parameters: Optional[dict] = None): + self._jobs = jobs + self._next_parameters = next_parameters + + def to_response_dict(self, url_for: Callable[[dict], str], api_version: ComparableVersion) -> dict: + """Produce `GET /jobs` response data, to be JSONified.""" + links = [] + if self._next_parameters: + links.append({"rel": "next", "href": url_for(self._next_parameters)}) + + return { + "jobs": [m.to_api_dict(full=False, api_version=api_version) for m in self._jobs], + "links": links, + } + + class BatchJobs(MicroService): """ Base contract/implementation for Batch Jobs "microservice" @@ -415,10 +441,21 @@ def get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata: """ raise NotImplementedError - def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]: + def get_user_jobs( + self, + user_id: str, + limit: Optional[int] = None, + request_parameters: Optional[werkzeug.datastructures.MultiDict] = None, + # TODO #332 settle on returning just `JobListing` and eliminate other options/code paths. + ) -> Union[List[BatchJobMetadata], dict, JobListing]: """ Get details about all batch jobs of a user https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-jobs + + :param limit: limit parameter in request (as defined by openEO API spec) + :param request_parameters: all request parameters. + Note that backend implementation has freedom here + how to leverage these to implement pagination """ raise NotImplementedError @@ -810,3 +847,9 @@ def request_costs( :param job_options: job options (if any provided in the sync processing request) """ return None + + +def function_has_argument(function: Callable, argument: str) -> bool: + """Does function support given argument?""" + signature = inspect.signature(function) + return argument in signature.parameters diff --git a/openeo_driver/dummy/dummy_backend.py b/openeo_driver/dummy/dummy_backend.py index fba08540..59d685e9 100644 --- a/openeo_driver/dummy/dummy_backend.py +++ b/openeo_driver/dummy/dummy_backend.py @@ -1,38 +1,47 @@ +import importlib.metadata import json import numbers +import unittest.mock from datetime import datetime from functools import lru_cache from pathlib import Path -from typing import List, Dict, Union, Tuple, Optional, Iterable, Any, Sequence -import unittest.mock +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union from unittest.mock import Mock -import importlib.metadata import flask import numpy +import openeo.udf +import werkzeug.datastructures import xarray -from shapely.geometry import Polygon, MultiPolygon +from openeo.api.logs import normalize_log_level +from openeo.internal.process_graph_visitor import ProcessGraphVisitor +from openeo.metadata import ( + Band, + BandDimension, + CollectionMetadata, + SpatialDimension, + TemporalDimension, +) +from openeo.util import rfc3339 +from shapely.geometry import MultiPolygon, Polygon from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry from shapely.geometry.collection import GeometryCollection -from openeo.api.logs import normalize_log_level -from openeo.internal.process_graph_visitor import ProcessGraphVisitor -from openeo.metadata import CollectionMetadata, Band, SpatialDimension, TemporalDimension, BandDimension -import openeo.udf -from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing +import openeo_driver.util.changelog from openeo_driver.backend import ( - SecondaryServices, - OpenEoBackendImplementation, - CollectionCatalog, - ServiceMetadata, - BatchJobs, BatchJobMetadata, + BatchJobResultMetadata, + BatchJobs, + CollectionCatalog, + JobListing, + LoadParameters, OidcProvider, + OpenEoBackendImplementation, + Processing, + SecondaryServices, + ServiceMetadata, UserDefinedProcesses, UserDefinedProcessMetadata, - LoadParameters, - Processing, - BatchJobResultMetadata, ) from openeo_driver.config import OpenEoBackendConfig from openeo_driver.constants import STAC_EXTENSION @@ -41,17 +50,21 @@ from openeo_driver.delayed_vector import DelayedVector from openeo_driver.dry_run import SourceConstraint from openeo_driver.errors import ( - JobNotFoundException, + FeatureUnsupportedException, JobNotFinishedException, - ProcessGraphNotFoundException, + JobNotFoundException, PermissionsInsufficientException, - FeatureUnsupportedException, + ProcessGraphNotFoundException, ) from openeo_driver.jobregistry import JOB_STATUS -from openeo_driver.save_result import AggregatePolygonResult, AggregatePolygonSpatialResult +from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing +from openeo_driver.save_result import ( + AggregatePolygonResult, + AggregatePolygonSpatialResult, +) from openeo_driver.users import User -import openeo_driver.util.changelog -from openeo_driver.utils import EvalEnv, generate_unique_id, WhiteListEvalEnv +from openeo_driver.util.http import UrlSafeStructCodec +from openeo_driver.utils import EvalEnv, WhiteListEvalEnv, generate_unique_id DEFAULT_DATETIME = datetime(2020, 4, 23, 16, 20, 27) @@ -671,8 +684,39 @@ def _get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata: except KeyError: raise JobNotFoundException(job_id) - def get_user_jobs(self, user_id: str) -> List[BatchJobMetadata]: - return [v for (k, v) in self._job_registry.items() if k[0] == user_id] + def get_user_jobs( + self, + user_id: str, + limit: Optional[int] = None, + request_parameters: Optional[werkzeug.datastructures.MultiDict] = None, + ) -> JobListing: + jobs: List[BatchJobMetadata] = [v for (k, v) in self._job_registry.items() if k[0] == user_id] + next_parameters = None + if limit: + # Pagination: order by `created` and job id (as tiebreaker) + # Note that we order from more recent to older. + # As a result, "search_after" (blatantly based on ElasticSearch) + # practically means "search older" + + def order_key(m: BatchJobMetadata) -> Tuple[str, str]: + return rfc3339.datetime(m.created), m.id + + url_safe_codec = UrlSafeStructCodec(signature_field="_usc") + + threshold = (request_parameters or {}).get("search_after") + if threshold: + threshold = url_safe_codec.decode(threshold) + jobs = [m for m in jobs if order_key(m) < threshold] + jobs = sorted(jobs, key=order_key, reverse=True) + add_next = len(jobs) > limit + jobs = jobs[:limit] + if add_next: + next_parameters = { + "limit": limit, + "search_after": url_safe_codec.encode(order_key(jobs[-1])), + } + + return JobListing(jobs=jobs, next_parameters=next_parameters) @classmethod def _update_status(cls, job_id: str, user_id: str, status: str): diff --git a/openeo_driver/testing.py b/openeo_driver/testing.py index aefdebd8..f58d4c5b 100644 --- a/openeo_driver/testing.py +++ b/openeo_driver/testing.py @@ -189,6 +189,11 @@ def url(self, path): """Build URL based on (possibly versioned) root URL.""" return re.sub("/+", "/", f"/{self.url_root}/{path}") + def extract_path(self, url: str) -> str: + """Strip host and root_url from url to get openEO-style path""" + m = re.fullmatch(r"(https?://[\w.]+)?/" + re.escape(self.url_root.strip("/")) + r"(?P/.*)", url) + return m.group("path") + def _request_headers(self, headers: dict = None) -> dict: return {**self.default_request_headers, **(headers or {})} diff --git a/openeo_driver/util/http.py b/openeo_driver/util/http.py index 590b8379..d98b05ea 100644 --- a/openeo_driver/util/http.py +++ b/openeo_driver/util/http.py @@ -1,8 +1,12 @@ -from typing import Set +import base64 +import json +from typing import Set, Union import requests import requests.adapters +from openeo.util import repr_truncate + def requests_with_retry( total: int = 3, # the number of retries, not attempts (which is retries + 1) @@ -29,3 +33,62 @@ def requests_with_retry( session.mount("http://", adapter) session.mount("https://", adapter) return session + + +class UrlSafeStructCodec: + """ + Utility to encode common data structures (including tuples, bytes, sets) + in a URL-safe way, based on enriched JSON serialization and base64 encoding + """ + + # TODO: add compression too? + # TODO: support dicts with tuple keys + + def __init__(self, signature_field: str = "_custjson"): + """ + :param signature_field: field to use as indicator for custom object constructs + """ + self._signature_field = signature_field + + def encode(self, data) -> str: + # JSON serialization (with extra preprocessing for tuples) + data = json.dumps(self._json_prepare(data), separators=(",", ":")) + # Use base64 to make get URL-safe string + data = base64.urlsafe_b64encode(data.encode("utf8")).decode("utf8") + return data + + def _json_prepare(self, data): + """Prepare data before passing to `json.dumps`""" + # Note that we do it as preprocessing instead of using + # the json.JSONEncoder `default` feature, + # which does not allow to override the handling of tuples. + if isinstance(data, tuple): + return {self._signature_field: "tuple", "d": [self._json_prepare(x) for x in data]} + elif isinstance(data, set): + return {self._signature_field: "set", "d": [self._json_prepare(x) for x in data]} + elif isinstance(data, bytes): + return {self._signature_field: "bytes", "d": base64.b64encode(data).decode("utf8")} + elif isinstance(data, list): + return [self._json_prepare(x) for x in data] + elif isinstance(data, dict): + return {k: self._json_prepare(v) for k, v in data.items()} + return data + + def decode(self, data: str) -> Union[dict, list, tuple, set]: + try: + data = base64.urlsafe_b64decode(data.encode("utf8")).decode("utf8") + data = json.loads(data, object_hook=self._json_object_hook) + return data + except Exception as e: + raise ValueError(f"Failed to decode {repr_truncate(data)}") from e + + + def _json_object_hook(self, d: dict): + """Implementation of `object_hook` of `json.JSONDecoder` API.""" + if d.get(self._signature_field) == "tuple": + return tuple(d["d"]) + elif d.get(self._signature_field) == "set": + return set(d["d"]) + elif d.get(self._signature_field) == "bytes": + return base64.b64decode(d["d"].encode("utf8")) + return d diff --git a/openeo_driver/views.py b/openeo_driver/views.py index 5efe8b8f..06d0a2dc 100644 --- a/openeo_driver/views.py +++ b/openeo_driver/views.py @@ -27,8 +27,17 @@ from openeo.capabilities import ComparableVersion from openeo.util import dict_no_none, deep_get, Rfc3339, TimingLogger from openeo_driver.urlsigning import UrlSigner -from openeo_driver.backend import ServiceMetadata, BatchJobMetadata, UserDefinedProcessMetadata, \ - ErrorSummary, OpenEoBackendImplementation, BatchJobs, is_not_implemented +from openeo_driver.backend import ( + ServiceMetadata, + BatchJobMetadata, + UserDefinedProcessMetadata, + ErrorSummary, + OpenEoBackendImplementation, + BatchJobs, + is_not_implemented, + function_has_argument, + JobListing, +) from openeo_driver.config import get_backend_config, OpenEoBackendConfig from openeo_driver.constants import STAC_EXTENSION from openeo_driver.datacube import DriverMlModel @@ -880,9 +889,23 @@ def create_job(user: User): @blueprint.route('/jobs', methods=['GET']) @auth_handler.requires_bearer_auth def list_jobs(user: User): - # TODO: support for `limit` param and paging links? + # TODO: openEO API currently prescribes no pagination by default (unset limit) + # This is however not very scalable, so we might want to set a default limit here. + # Also see https://github.com/Open-EO/openeo-api/issues/550 + limit = flask.request.args.get("limit", type=int) + request_parameters = flask.request.args + + # TODO #332 settle on receiving just `JobListing` here and eliminate other options/code paths. + get_user_jobs = backend_implementation.batch_jobs.get_user_jobs + if function_has_argument(get_user_jobs, argument="request_parameters"): + # TODO #332 make this the one and only code path when all `get_user_jobs` implementations are migrated + listing = get_user_jobs(user_id=user.user_id, limit=limit, request_parameters=request_parameters) + else: + # TODO #332 remove support for this old API + listing = get_user_jobs(user_id=user.user_id) - listing = backend_implementation.batch_jobs.get_user_jobs(user.user_id) + # TODO #332 while eliminating old `get_user_jobs` API above, also just settle on JobListing based return, + # and drop all this legacy cruft if isinstance(listing, list): jobs = listing links = [] @@ -893,6 +916,12 @@ def list_jobs(user: User): # TODO: this "extra" whitelist is from experimental # "federation extension API" https://github.com/Open-EO/openeo-api/pull/419 extra = {k: listing[k] for k in ["federation:missing"] if k in listing} + elif isinstance(listing, JobListing): + data = listing.to_response_dict( + url_for=lambda params: flask.url_for(".list_jobs", **params, _external=True), + api_version=requested_api_version(), + ) + return flask.jsonify(data) else: raise InternalException(f"Invalid user jobs listing {type(listing)}") diff --git a/setup.py b/setup.py index 643213f7..8ee0ed64 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ "moto[s3]>=5.0.0", "time-machine>=2.8.0", "netCDF4>=1.5.4", - "re-assert", + "re-assert", # TODO: drop this dependency in favor of dirty-equals "pyarrow>=10.0.0", "pystac", "jsonschema", diff --git a/tests/test_views.py b/tests/test_views.py index c2b21d0c..37e8b66d 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1,4 +1,3 @@ -import dirty_equals import json import logging import re @@ -10,43 +9,61 @@ from unittest import mock import boto3 +import dirty_equals import flask +import geopandas as gpd +import moto import pystac.validation.stac_validator import pytest import re_assert import werkzeug.exceptions -import moto -import geopandas as gpd - from openeo.capabilities import ComparableVersion -from openeo_driver.ProcessGraphDeserializer import custom_process_from_process_graph + from openeo_driver.backend import ( BatchJobMetadata, - UserDefinedProcessMetadata, + BatchJobResultMetadata, BatchJobs, OpenEoBackendImplementation, Processing, + UserDefinedProcessMetadata, not_implemented, - BatchJobResultMetadata, ) from openeo_driver.config import OpenEoBackendConfig from openeo_driver.datacube import DriverVectorCube from openeo_driver.dummy import dummy_backend, dummy_config from openeo_driver.dummy.dummy_backend import DummyBackendImplementation from openeo_driver.errors import OpenEOApiException +from openeo_driver.ProcessGraphDeserializer import custom_process_from_process_graph from openeo_driver.save_result import VectorCubeResult -from openeo_driver.testing import ApiTester, TEST_USER, ApiResponse, TEST_USER_AUTH_HEADER, \ - generate_unique_test_process_id, build_basic_http_auth_header, ListSubSet, DictSubSet, RegexMatcher +from openeo_driver.testing import ( + TEST_USER, + TEST_USER_AUTH_HEADER, + ApiResponse, + ApiTester, + DictSubSet, + ListSubSet, + RegexMatcher, + build_basic_http_auth_header, + generate_unique_test_process_id, +) from openeo_driver.urlsigning import UrlSigner from openeo_driver.users import User -from openeo_driver.users.auth import HttpAuthHandler, AccessTokenException +from openeo_driver.users.auth import AccessTokenException, HttpAuthHandler from openeo_driver.users.oidc import OidcProvider -from openeo_driver.util.logging import LOGGING_CONTEXT_FLASK, FlaskRequestCorrelationIdLogging -from openeo_driver.views import EndpointRegistry, _normalize_collection_metadata, build_app, STREAM_CHUNK_SIZE_DEFAULT +from openeo_driver.util.logging import ( + LOGGING_CONTEXT_FLASK, + FlaskRequestCorrelationIdLogging, +) +from openeo_driver.views import ( + STREAM_CHUNK_SIZE_DEFAULT, + EndpointRegistry, + _normalize_collection_metadata, + build_app, +) + from .conftest import TEST_APP_CONFIG, enhanced_logging from .data import TEST_DATA_ROOT, get_path - EXPECTED_PROCESSING_EXPRESSION = {"expression": {"process_graph": {"foo": {"process_id": "foo", "arguments": {}}}}, "format": "openeo"} @@ -1440,6 +1457,61 @@ def get_user_jobs(self, user_id: str): "federation:missing": ["b4"], } + def test_list_user_jobs_paging(self, api100): + # Setup some jobs in registry + dummy_backend.DummyBatchJobs._job_registry = { + (TEST_USER, f"j-{x}{x}"): BatchJobMetadata( + id=f"j-{x}{x}", + status="created", + created=datetime(2024, 12, x), + ) + for x in [1, 2, 3, 4, 5, 6, 7, 8] + } + + # Initial job listing with explicit limit (first page) + resp = api100.get("/jobs?limit=3", headers=self.AUTH_HEADER) + data = resp.assert_status_code(200).json + expected_next_href = dirty_equals.IsStr( + regex=re.escape("http://oeo.net/openeo/1.0.0/jobs?limit=3&search_after=") + "[a-zA-Z0-9_/-]+(%3D)*" + ) + assert data == { + "jobs": [ + dirty_equals.IsPartialDict(id="j-88", created="2024-12-08T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-77", created="2024-12-07T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-66", created="2024-12-06T00:00:00Z"), + ], + "links": [ + {"rel": "next", "href": expected_next_href}, + ], + } + + # Follow link to second page (which links to third page) + next_url = data["links"][0]["href"] + resp = api100.get(path=api100.extract_path(next_url), headers=self.AUTH_HEADER) + data = resp.assert_status_code(200).json + assert data == { + "jobs": [ + dirty_equals.IsPartialDict(id="j-55", created="2024-12-05T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-44", created="2024-12-04T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-33", created="2024-12-03T00:00:00Z"), + ], + "links": [ + {"rel": "next", "href": expected_next_href}, + ], + } + + # Follow link to third page (which has no next link) + next_url = data["links"][0]["href"] + resp = api100.get(path=api100.extract_path(next_url), headers=self.AUTH_HEADER) + data = resp.assert_status_code(200).json + assert data == { + "jobs": [ + dirty_equals.IsPartialDict(id="j-22", created="2024-12-02T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-11", created="2024-12-01T00:00:00Z"), + ], + "links": [], + } + def test_get_job_results_unfinished(self, api): with self._fresh_job_registry(next_job_id="job-345"): resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', headers=self.AUTH_HEADER) diff --git a/tests/util/test_http.py b/tests/util/test_http.py index e987c77c..1f100b14 100644 --- a/tests/util/test_http.py +++ b/tests/util/test_http.py @@ -1,10 +1,11 @@ import logging - +import re +import base64 import pytest import requests.exceptions from re_assert import Matches -from openeo_driver.util.http import requests_with_retry +from openeo_driver.util.http import UrlSafeStructCodec, requests_with_retry def test_requests_with_retry(caplog): @@ -41,3 +42,44 @@ def test_requests_with_retry_zero(caplog): assert caplog.messages == [ "Starting new HTTPS connection (1): example.test:443", ] + + +class TestUrlSafeStructCodec: + def test_simple(self): + codec = UrlSafeStructCodec(signature_field="_usc") + assert codec.encode("foo") == "ImZvbyI=" + assert codec.encode([1, 2, "three"]) == "WzEsMiwidGhyZWUiXQ==" + assert codec.encode((1, 2, "three")) == "eyJfdXNjIjoidHVwbGUiLCJkIjpbMSwyLCJ0aHJlZSJdfQ==" + + @pytest.mark.parametrize( + "data", + [ + "foo", + "unsafe?str!n@:*&~%^&foo=bar&baz[]", + 1234, + ["apple", "banana", "coconut"], + {"type": "banana", "color": "blue"}, + b"bytes", + bytes([0, 2, 128]), + ("tuple", 123), + {"set", "set", "go"}, + ("nest", (1, 2), [3, 4], {5, 6}), + {"nest", (1, 2), 777}, + {"deep": ["nested", ("data", [1, 2, (3, 4), {"foo": ("bar", ["x", "y"])}])]}, + ], + ) + def test_basic(self, data): + codec = UrlSafeStructCodec() + encoded = codec.encode(data) + assert isinstance(encoded, str) + assert re.fullmatch("[a-zA-Z0-9_/=-]+", encoded) + assert codec.decode(encoded) == data + + def test_decode_b64_garbage(self): + with pytest.raises(ValueError, match="Failed to decode"): + _ = UrlSafeStructCodec().decode(data="g@rbag%?$$!") + + def test_decode_json_garbage(self): + garbage = base64.b64encode(b"nope, n0t J$ON here").decode("utf8") + with pytest.raises(ValueError, match="Failed to decode"): + _ = UrlSafeStructCodec().decode(data=garbage)