Skip to content

Commit

Permalink
Issue #332 introduce support for job listing pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 5, 2024
1 parent c8b9076 commit bafcd9b
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 49 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ and start a new "In Progress" section above it.

## In progress


## 0.120.0

- mask: also apply at load time when resample_spatial is used
- NDVI process: correctly handle band dimension as part of dry run
- Introduce support for user job pagination ([#332](https://github.com/Open-EO/openeo-python-driver/issues/332))


## 0.119.0

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.119.0a1"
__version__ = "0.120.0a1"
47 changes: 45 additions & 2 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import json
import logging
import dataclasses

import inspect
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable

import flask
import werkzeug.datastructures

import openeo_driver.util.view_helpers
from openeo.capabilities import ComparableVersion
Expand Down Expand Up @@ -382,6 +383,31 @@ class BatchJobResultMetadata:
# TODO: more fields


class JobListing:
"""
Basic container/interface for user job listings,
(e.g. including pagination)
"""

# TODO: just implement as frozen dataclass, or does that conflict with later need to subclass?
__slots__ = ["_jobs", "_next_parameters"]

def __init__(self, jobs: List[BatchJobMetadata], next_parameters: Optional[dict] = None):
self._jobs = jobs
self._next_parameters = next_parameters

def to_response_dict(self, url_for: Callable[[dict], str], api_version: ComparableVersion) -> dict:
"""Produce `GET /jobs` response data, to be JSONified."""
links = []
if self._next_parameters:
links.append({"rel": "next", "href": url_for(self._next_parameters)})

return {
"jobs": [m.to_api_dict(full=False, api_version=api_version) for m in self._jobs],
"links": links,
}


class BatchJobs(MicroService):
"""
Base contract/implementation for Batch Jobs "microservice"
Expand Down Expand Up @@ -415,10 +441,21 @@ def get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata:
"""
raise NotImplementedError

def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
def get_user_jobs(
self,
user_id: str,
limit: Optional[int] = None,
request_parameters: Optional[werkzeug.datastructures.MultiDict] = None,
# TODO #332 settle on returning just `JobListing` and eliminate other options/code paths.
) -> Union[List[BatchJobMetadata], dict, JobListing]:
"""
Get details about all batch jobs of a user
https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-jobs
:param limit: limit parameter in request (as defined by openEO API spec)
:param request_parameters: all request parameters.
Note that backend implementation has freedom here
how to leverage these to implement pagination
"""
raise NotImplementedError

Expand Down Expand Up @@ -810,3 +847,9 @@ def request_costs(
:param job_options: job options (if any provided in the sync processing request)
"""
return None


def function_has_argument(function: Callable, argument: str) -> bool:
"""Does function support given argument?"""
signature = inspect.signature(function)
return argument in signature.parameters
94 changes: 69 additions & 25 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
import importlib.metadata
import json
import numbers
import unittest.mock
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import List, Dict, Union, Tuple, Optional, Iterable, Any, Sequence
import unittest.mock
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
from unittest.mock import Mock
import importlib.metadata

import flask
import numpy
import openeo.udf
import werkzeug.datastructures
import xarray
from shapely.geometry import Polygon, MultiPolygon
from openeo.api.logs import normalize_log_level
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import (
Band,
BandDimension,
CollectionMetadata,
SpatialDimension,
TemporalDimension,
)
from openeo.util import rfc3339
from shapely.geometry import MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry
from shapely.geometry.collection import GeometryCollection

from openeo.api.logs import normalize_log_level
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import CollectionMetadata, Band, SpatialDimension, TemporalDimension, BandDimension
import openeo.udf
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing
import openeo_driver.util.changelog
from openeo_driver.backend import (
SecondaryServices,
OpenEoBackendImplementation,
CollectionCatalog,
ServiceMetadata,
BatchJobs,
BatchJobMetadata,
BatchJobResultMetadata,
BatchJobs,
CollectionCatalog,
JobListing,
LoadParameters,
OidcProvider,
OpenEoBackendImplementation,
Processing,
SecondaryServices,
ServiceMetadata,
UserDefinedProcesses,
UserDefinedProcessMetadata,
LoadParameters,
Processing,
BatchJobResultMetadata,
)
from openeo_driver.config import OpenEoBackendConfig
from openeo_driver.constants import STAC_EXTENSION
Expand All @@ -41,17 +50,21 @@
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.dry_run import SourceConstraint
from openeo_driver.errors import (
JobNotFoundException,
FeatureUnsupportedException,
JobNotFinishedException,
ProcessGraphNotFoundException,
JobNotFoundException,
PermissionsInsufficientException,
FeatureUnsupportedException,
ProcessGraphNotFoundException,
)
from openeo_driver.jobregistry import JOB_STATUS
from openeo_driver.save_result import AggregatePolygonResult, AggregatePolygonSpatialResult
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing
from openeo_driver.save_result import (
AggregatePolygonResult,
AggregatePolygonSpatialResult,
)
from openeo_driver.users import User
import openeo_driver.util.changelog
from openeo_driver.utils import EvalEnv, generate_unique_id, WhiteListEvalEnv
from openeo_driver.util.http import UrlSafeStructCodec
from openeo_driver.utils import EvalEnv, WhiteListEvalEnv, generate_unique_id

DEFAULT_DATETIME = datetime(2020, 4, 23, 16, 20, 27)

Expand Down Expand Up @@ -671,8 +684,39 @@ def _get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata:
except KeyError:
raise JobNotFoundException(job_id)

def get_user_jobs(self, user_id: str) -> List[BatchJobMetadata]:
return [v for (k, v) in self._job_registry.items() if k[0] == user_id]
def get_user_jobs(
self,
user_id: str,
limit: Optional[int] = None,
request_parameters: Optional[werkzeug.datastructures.MultiDict] = None,
) -> JobListing:
jobs: List[BatchJobMetadata] = [v for (k, v) in self._job_registry.items() if k[0] == user_id]
next_parameters = None
if limit:
# Pagination: order by `created` and job id (as tiebreaker)
# Note that we order from more recent to older.
# As a result, "search_after" (blatantly based on ElasticSearch)
# practically means "search older"

def order_key(m: BatchJobMetadata) -> Tuple[str, str]:
return rfc3339.datetime(m.created), m.id

url_safe_codec = UrlSafeStructCodec(signature_field="_usc")

threshold = (request_parameters or {}).get("search_after")
if threshold:
threshold = url_safe_codec.decode(threshold)
jobs = [m for m in jobs if order_key(m) < threshold]
jobs = sorted(jobs, key=order_key, reverse=True)
add_next = len(jobs) > limit
jobs = jobs[:limit]
if add_next:
next_parameters = {
"limit": limit,
"search_after": url_safe_codec.encode(order_key(jobs[-1])),
}

return JobListing(jobs=jobs, next_parameters=next_parameters)

@classmethod
def _update_status(cls, job_id: str, user_id: str, status: str):
Expand Down
5 changes: 5 additions & 0 deletions openeo_driver/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ def url(self, path):
"""Build URL based on (possibly versioned) root URL."""
return re.sub("/+", "/", f"/{self.url_root}/{path}")

def extract_path(self, url: str) -> str:
"""Strip host and root_url from url to get openEO-style path"""
m = re.fullmatch(r"(https?://[\w.]+)?/" + re.escape(self.url_root.strip("/")) + r"(?P<path>/.*)", url)
return m.group("path")

def _request_headers(self, headers: dict = None) -> dict:
return {**self.default_request_headers, **(headers or {})}

Expand Down
65 changes: 64 additions & 1 deletion openeo_driver/util/http.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from typing import Set
import base64
import json
from typing import Set, Union

import requests
import requests.adapters

from openeo.util import repr_truncate


def requests_with_retry(
total: int = 3, # the number of retries, not attempts (which is retries + 1)
Expand All @@ -29,3 +33,62 @@ def requests_with_retry(
session.mount("http://", adapter)
session.mount("https://", adapter)
return session


class UrlSafeStructCodec:
"""
Utility to encode common data structures (including tuples, bytes, sets)
in a URL-safe way, based on enriched JSON serialization and base64 encoding
"""

# TODO: add compression too?
# TODO: support dicts with tuple keys

def __init__(self, signature_field: str = "_custjson"):
"""
:param signature_field: field to use as indicator for custom object constructs
"""
self._signature_field = signature_field

def encode(self, data) -> str:
# JSON serialization (with extra preprocessing for tuples)
data = json.dumps(self._json_prepare(data), separators=(",", ":"))
# Use base64 to make get URL-safe string
data = base64.urlsafe_b64encode(data.encode("utf8")).decode("utf8")
return data

def _json_prepare(self, data):
"""Prepare data before passing to `json.dumps`"""
# Note that we do it as preprocessing instead of using
# the json.JSONEncoder `default` feature,
# which does not allow to override the handling of tuples.
if isinstance(data, tuple):
return {self._signature_field: "tuple", "d": [self._json_prepare(x) for x in data]}
elif isinstance(data, set):
return {self._signature_field: "set", "d": [self._json_prepare(x) for x in data]}
elif isinstance(data, bytes):
return {self._signature_field: "bytes", "d": base64.b64encode(data).decode("utf8")}
elif isinstance(data, list):
return [self._json_prepare(x) for x in data]
elif isinstance(data, dict):
return {k: self._json_prepare(v) for k, v in data.items()}
return data

def decode(self, data: str) -> Union[dict, list, tuple, set]:
try:
data = base64.urlsafe_b64decode(data.encode("utf8")).decode("utf8")
data = json.loads(data, object_hook=self._json_object_hook)
return data
except Exception as e:
raise ValueError(f"Failed to decode {repr_truncate(data)}") from e


def _json_object_hook(self, d: dict):
"""Implementation of `object_hook` of `json.JSONDecoder` API."""
if d.get(self._signature_field) == "tuple":
return tuple(d["d"])
elif d.get(self._signature_field) == "set":
return set(d["d"])
elif d.get(self._signature_field) == "bytes":
return base64.b64decode(d["d"].encode("utf8"))
return d
37 changes: 33 additions & 4 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,17 @@
from openeo.capabilities import ComparableVersion
from openeo.util import dict_no_none, deep_get, Rfc3339, TimingLogger
from openeo_driver.urlsigning import UrlSigner
from openeo_driver.backend import ServiceMetadata, BatchJobMetadata, UserDefinedProcessMetadata, \
ErrorSummary, OpenEoBackendImplementation, BatchJobs, is_not_implemented
from openeo_driver.backend import (
ServiceMetadata,
BatchJobMetadata,
UserDefinedProcessMetadata,
ErrorSummary,
OpenEoBackendImplementation,
BatchJobs,
is_not_implemented,
function_has_argument,
JobListing,
)
from openeo_driver.config import get_backend_config, OpenEoBackendConfig
from openeo_driver.constants import STAC_EXTENSION
from openeo_driver.datacube import DriverMlModel
Expand Down Expand Up @@ -880,9 +889,23 @@ def create_job(user: User):
@blueprint.route('/jobs', methods=['GET'])
@auth_handler.requires_bearer_auth
def list_jobs(user: User):
# TODO: support for `limit` param and paging links?
# TODO: openEO API currently prescribes no pagination by default (unset limit)
# This is however not very scalable, so we might want to set a default limit here.
# Also see https://github.com/Open-EO/openeo-api/issues/550
limit = flask.request.args.get("limit", type=int)
request_parameters = flask.request.args

# TODO #332 settle on receiving just `JobListing` here and eliminate other options/code paths.
get_user_jobs = backend_implementation.batch_jobs.get_user_jobs
if function_has_argument(get_user_jobs, argument="request_parameters"):
# TODO #332 make this the one and only code path when all `get_user_jobs` implementations are migrated
listing = get_user_jobs(user_id=user.user_id, limit=limit, request_parameters=request_parameters)
else:
# TODO #332 remove support for this old API
listing = get_user_jobs(user_id=user.user_id)

listing = backend_implementation.batch_jobs.get_user_jobs(user.user_id)
# TODO #332 while eliminating old `get_user_jobs` API above, also just settle on JobListing based return,
# and drop all this legacy cruft
if isinstance(listing, list):
jobs = listing
links = []
Expand All @@ -893,6 +916,12 @@ def list_jobs(user: User):
# TODO: this "extra" whitelist is from experimental
# "federation extension API" https://github.com/Open-EO/openeo-api/pull/419
extra = {k: listing[k] for k in ["federation:missing"] if k in listing}
elif isinstance(listing, JobListing):
data = listing.to_response_dict(
url_for=lambda params: flask.url_for(".list_jobs", **params, _external=True),
api_version=requested_api_version(),
)
return flask.jsonify(data)
else:
raise InternalException(f"Invalid user jobs listing {type(listing)}")

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"moto[s3]>=5.0.0",
"time-machine>=2.8.0",
"netCDF4>=1.5.4",
"re-assert",
"re-assert", # TODO: drop this dependency in favor of dirty-equals
"pyarrow>=10.0.0",
"pystac",
"jsonschema",
Expand Down
Loading

0 comments on commit bafcd9b

Please sign in to comment.