diff --git a/CHANGELOG.md b/CHANGELOG.md index 5882e348..d9281787 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,11 @@ and start a new "In Progress" section above it. ## In progress + +## 0.120.0 + - NDVI process: correctly handle band dimension as part of dry run +- Introduce support for user job pagination ([#332](https://github.com/Open-EO/openeo-python-driver/issues/332)) ## 0.119.0 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 2393ac8b..63914367 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.119.0a1" +__version__ = "0.120.0a1" diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index 3a46255b..d96c0d2b 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -13,13 +13,14 @@ import json import logging import dataclasses - +import inspect import sys from datetime import datetime, timedelta from pathlib import Path from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable import flask +import werkzeug.datastructures import openeo_driver.util.view_helpers from openeo.capabilities import ComparableVersion @@ -382,6 +383,31 @@ class BatchJobResultMetadata: # TODO: more fields +class JobListing: + """ + Basic container/interface for user job listings, + (e.g. including pagination) + """ + + # TODO: just implement as frozen dataclass, or does that conflict with later need to subclass? + __slots__ = ["_jobs", "_next_parameters"] + + def __init__(self, jobs: List[BatchJobMetadata], next_parameters: Optional[dict]): + self._jobs = jobs + self._next_parameters = next_parameters + + def to_response_dict(self, url_for: Callable[[dict], str], api_version: ComparableVersion) -> dict: + """Produce `GET /jobs` response data, to be JSONified.""" + links = [] + if self._next_parameters: + links.append({"rel": "next", "href": url_for(self._next_parameters)}) + + return { + "jobs": [m.to_api_dict(full=False, api_version=api_version) for m in self._jobs], + "links": links, + } + + class BatchJobs(MicroService): """ Base contract/implementation for Batch Jobs "microservice" @@ -415,7 +441,12 @@ def get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata: """ raise NotImplementedError - def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]: + def get_user_jobs( + self, + user_id: str, + limit: Optional[int] = None, + request_parameters: Optional[werkzeug.datastructures.MultiDict] = None, + ) -> Union[List[BatchJobMetadata], dict, JobListing]: """ Get details about all batch jobs of a user https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-jobs @@ -810,3 +841,9 @@ def request_costs( :param job_options: job options (if any provided in the sync processing request) """ return None + + +def function_has_argument(function: Callable, argument: str) -> bool: + """Does function support given argument?""" + signature = inspect.signature(function) + return argument in signature.parameters diff --git a/openeo_driver/dummy/dummy_backend.py b/openeo_driver/dummy/dummy_backend.py index fba08540..2ab0edf0 100644 --- a/openeo_driver/dummy/dummy_backend.py +++ b/openeo_driver/dummy/dummy_backend.py @@ -14,11 +14,13 @@ from shapely.geometry import Polygon, MultiPolygon from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry from shapely.geometry.collection import GeometryCollection +import werkzeug.datastructures from openeo.api.logs import normalize_log_level from openeo.internal.process_graph_visitor import ProcessGraphVisitor from openeo.metadata import CollectionMetadata, Band, SpatialDimension, TemporalDimension, BandDimension import openeo.udf +from openeo.util import rfc3339 from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing from openeo_driver.backend import ( SecondaryServices, @@ -33,6 +35,7 @@ LoadParameters, Processing, BatchJobResultMetadata, + JobListing, ) from openeo_driver.config import OpenEoBackendConfig from openeo_driver.constants import STAC_EXTENSION @@ -51,6 +54,7 @@ from openeo_driver.save_result import AggregatePolygonResult, AggregatePolygonSpatialResult from openeo_driver.users import User import openeo_driver.util.changelog +from openeo_driver.util.http import UrlSafeStructCodec from openeo_driver.utils import EvalEnv, generate_unique_id, WhiteListEvalEnv DEFAULT_DATETIME = datetime(2020, 4, 23, 16, 20, 27) @@ -671,8 +675,39 @@ def _get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata: except KeyError: raise JobNotFoundException(job_id) - def get_user_jobs(self, user_id: str) -> List[BatchJobMetadata]: - return [v for (k, v) in self._job_registry.items() if k[0] == user_id] + def get_user_jobs( + self, + user_id: str, + limit: Optional[int] = None, + request_parameters: Optional[werkzeug.datastructures.MultiDict] = None, + ) -> JobListing: + jobs: List[BatchJobMetadata] = [v for (k, v) in self._job_registry.items() if k[0] == user_id] + next_parameters = None + if limit: + # Pagination: order by `created` and job id (as tiebreaker) + # Note that we order from more recent to older. + # As a result, "search_after" (blatantly based on ElasticSearch) + # practically means "search older" + + def order_key(m: BatchJobMetadata) -> Tuple[str, str]: + return rfc3339.datetime(m.created), m.id + + url_safe_codec = UrlSafeStructCodec() + + threshold = request_parameters.get("search_after") + if threshold: + threshold = url_safe_codec.decode(threshold) + jobs = [m for m in jobs if order_key(m) < threshold] + jobs = sorted(jobs, key=order_key, reverse=True) + add_next = len(jobs) > limit + jobs = jobs[:limit] + if add_next: + next_parameters = { + "limit": limit, + "search_after": url_safe_codec.encode(order_key(jobs[-1])), + } + + return JobListing(jobs=jobs, next_parameters=next_parameters) @classmethod def _update_status(cls, job_id: str, user_id: str, status: str): diff --git a/openeo_driver/testing.py b/openeo_driver/testing.py index aefdebd8..9c1297e7 100644 --- a/openeo_driver/testing.py +++ b/openeo_driver/testing.py @@ -189,6 +189,10 @@ def url(self, path): """Build URL based on (possibly versioned) root URL.""" return re.sub("/+", "/", f"/{self.url_root}/{path}") + def extract_path(self, url: str) -> str: + m = re.fullmatch(r"(https?://[\w.]+)?/" + re.escape(self.url_root.strip("/")) + r"(?P/.*)", url) + return m.group("path") + def _request_headers(self, headers: dict = None) -> dict: return {**self.default_request_headers, **(headers or {})} diff --git a/openeo_driver/util/http.py b/openeo_driver/util/http.py index 590b8379..d0892be5 100644 --- a/openeo_driver/util/http.py +++ b/openeo_driver/util/http.py @@ -1,5 +1,6 @@ -from typing import Set - +import json +from typing import Set, Union +import base64 import requests import requests.adapters @@ -29,3 +30,54 @@ def requests_with_retry( session.mount("http://", adapter) session.mount("https://", adapter) return session + + +class UrlSafeStructCodec: + """ + Utility to encode common data structures (including tuples, bytes, sets) + in a URL-safe way. + """ + + # TODO: add compression too? + + def __init__(self, sniff: str = "_customserde"): + self._sniff = sniff + + def encode(self, data) -> str: + # JSON serialization (with extra preprocessing for tuples) + data = json.dumps(self._json_prepare(data), separators=(",", ":")) + # Use base64 to make get URL-safe string + data = base64.urlsafe_b64encode(data.encode("utf8")).decode("utf8") + return data + + def _json_prepare(self, data): + """Prepare data before passing to `json.dumps`""" + # Note that we do it as preprocessing instead of using + # the json.JSONEncoder `default` feature, + # which does not allow to override the handling of tuples. + if isinstance(data, tuple): + return {self._sniff: "tuple", "d": [self._json_prepare(x) for x in data]} + elif isinstance(data, set): + return {self._sniff: "set", "d": [self._json_prepare(x) for x in data]} + elif isinstance(data, bytes): + return {self._sniff: "bytes", "d": base64.b64encode(data).decode("utf8")} + elif isinstance(data, list): + return [self._json_prepare(x) for x in data] + elif isinstance(data, dict): + return {k: self._json_prepare(v) for k, v in data.items()} + return data + + def decode(self, data: str) -> Union[dict, list, tuple, set]: + data = base64.urlsafe_b64decode(data.encode("utf8")).decode("utf8") + data = json.loads(data, object_hook=self._json_object_hook) + return data + + def _json_object_hook(self, d: dict): + """Implementation of `object_hook` of `json.JSONDecoder` API.""" + if d.get(self._sniff) == "tuple": + return tuple(d["d"]) + elif d.get(self._sniff) == "set": + return set(d["d"]) + elif d.get(self._sniff) == "bytes": + return base64.b64decode(d["d"].encode("utf8")) + return d diff --git a/openeo_driver/views.py b/openeo_driver/views.py index 5efe8b8f..4b300374 100644 --- a/openeo_driver/views.py +++ b/openeo_driver/views.py @@ -27,8 +27,17 @@ from openeo.capabilities import ComparableVersion from openeo.util import dict_no_none, deep_get, Rfc3339, TimingLogger from openeo_driver.urlsigning import UrlSigner -from openeo_driver.backend import ServiceMetadata, BatchJobMetadata, UserDefinedProcessMetadata, \ - ErrorSummary, OpenEoBackendImplementation, BatchJobs, is_not_implemented +from openeo_driver.backend import ( + ServiceMetadata, + BatchJobMetadata, + UserDefinedProcessMetadata, + ErrorSummary, + OpenEoBackendImplementation, + BatchJobs, + is_not_implemented, + function_has_argument, + JobListing, +) from openeo_driver.config import get_backend_config, OpenEoBackendConfig from openeo_driver.constants import STAC_EXTENSION from openeo_driver.datacube import DriverMlModel @@ -880,9 +889,23 @@ def create_job(user: User): @blueprint.route('/jobs', methods=['GET']) @auth_handler.requires_bearer_auth def list_jobs(user: User): - # TODO: support for `limit` param and paging links? + # TODO: openEO API currently prescribes no pagination by default (unset limit) + # This is however not very scalable, so we might want to set a default limit here. + # Also see https://github.com/Open-EO/openeo-api/issues/550 + limit = flask.request.args.get("limit", type=int) + request_parameters = flask.request.args + + get_user_jobs = backend_implementation.batch_jobs.get_user_jobs + if function_has_argument(get_user_jobs, argument="limit") and function_has_argument( + get_user_jobs, argument="request_parameters" + ): + # TODO: make this the one and only code path when all `get_user_jobs` implementations are migrated + listing = get_user_jobs(user_id=user.user_id, limit=limit, request_parameters=request_parameters) + else: + # TODO: remove support for this old API + listing = get_user_jobs(user_id=user.user_id) - listing = backend_implementation.batch_jobs.get_user_jobs(user.user_id) + # TODO: while eliminating old `get_user_jobs` API above, also just settle on dict based return if isinstance(listing, list): jobs = listing links = [] @@ -893,6 +916,12 @@ def list_jobs(user: User): # TODO: this "extra" whitelist is from experimental # "federation extension API" https://github.com/Open-EO/openeo-api/pull/419 extra = {k: listing[k] for k in ["federation:missing"] if k in listing} + elif isinstance(listing, JobListing): + data = listing.to_response_dict( + url_for=lambda params: flask.url_for(".list_jobs", **params, _external=True), + api_version=requested_api_version(), + ) + return flask.jsonify(data) else: raise InternalException(f"Invalid user jobs listing {type(listing)}") diff --git a/tests/test_views.py b/tests/test_views.py index c2b21d0c..c5762eed 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1440,6 +1440,61 @@ def get_user_jobs(self, user_id: str): "federation:missing": ["b4"], } + def test_list_user_jobs_paging(self, api100): + # Setup some jobs in registry + dummy_backend.DummyBatchJobs._job_registry = { + (TEST_USER, f"j-{x}{x}"): BatchJobMetadata( + id=f"j-{x}{x}", + status="created", + created=datetime(2024, 12, x), + ) + for x in [1, 2, 3, 4, 5, 6, 7, 8] + } + + # First job listing + resp = api100.get("/jobs?limit=3", headers=self.AUTH_HEADER) + data = resp.assert_status_code(200).json + expected_next_href = dirty_equals.IsStr( + regex=re.escape("http://oeo.net/openeo/1.0.0/jobs?limit=3&search_after=") + "[a-zA-Z0-9_/-]+(%3D)*" + ) + assert data == { + "jobs": [ + dirty_equals.IsPartialDict(id="j-88", created="2024-12-08T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-77", created="2024-12-07T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-66", created="2024-12-06T00:00:00Z"), + ], + "links": [ + {"rel": "next", "href": expected_next_href}, + ], + } + + # Follow link to next page (first, linking to second page) + next_url = data["links"][0]["href"] + resp = api100.get(path=api100.extract_path(next_url), headers=self.AUTH_HEADER) + data = resp.assert_status_code(200).json + assert data == { + "jobs": [ + dirty_equals.IsPartialDict(id="j-55", created="2024-12-05T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-44", created="2024-12-04T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-33", created="2024-12-03T00:00:00Z"), + ], + "links": [ + {"rel": "next", "href": expected_next_href}, + ], + } + + # Follow link to next (second, no next page) + next_url = data["links"][0]["href"] + resp = api100.get(path=api100.extract_path(next_url), headers=self.AUTH_HEADER) + data = resp.assert_status_code(200).json + assert data == { + "jobs": [ + dirty_equals.IsPartialDict(id="j-22", created="2024-12-02T00:00:00Z"), + dirty_equals.IsPartialDict(id="j-11", created="2024-12-01T00:00:00Z"), + ], + "links": [], + } + def test_get_job_results_unfinished(self, api): with self._fresh_job_registry(next_job_id="job-345"): resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', headers=self.AUTH_HEADER) diff --git a/tests/util/test_http.py b/tests/util/test_http.py index e987c77c..5add0a16 100644 --- a/tests/util/test_http.py +++ b/tests/util/test_http.py @@ -1,10 +1,11 @@ import logging +import re import pytest import requests.exceptions from re_assert import Matches -from openeo_driver.util.http import requests_with_retry +from openeo_driver.util.http import requests_with_retry, UrlSafeStructCodec def test_requests_with_retry(caplog): @@ -41,3 +42,30 @@ def test_requests_with_retry_zero(caplog): assert caplog.messages == [ "Starting new HTTPS connection (1): example.test:443", ] + + +class TestUrlSafeStructCodec: + @pytest.mark.parametrize( + "data", + [ + "foo", + "unsafe?str!n@:*&~%^&foo=bar&baz[]", + 1234, + ["apple", "banana", "coconut"], + {"type": "banana", "color": "blue"}, + b"bytes", + bytes([0, 2, 128]), + ("tuple", 123), + {"set", "set", "go"}, + ("nest", (1, 2), [3, 4], {5, 6}), + {"nest", (1, 2), 777}, + ], + ) + def test_basic(self, data): + codec = UrlSafeStructCodec() + encoded = codec.encode(data) + assert isinstance(encoded, str) + assert re.fullmatch("[a-zA-Z0-9_/=-]+", encoded) + assert codec.decode(encoded) == data + + import regex