Skip to content

Commit

Permalink
Issue #332 introduce support for job listing pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 3, 2024
1 parent b19848d commit f5d6eff
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ and start a new "In Progress" section above it.

## In progress


## 0.120.0

- NDVI process: correctly handle band dimension as part of dry run
- Introduce support for user job pagination ([#332](https://github.com/Open-EO/openeo-python-driver/issues/332))

## 0.119.0

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.119.0a1"
__version__ = "0.120.0a1"
41 changes: 39 additions & 2 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import json
import logging
import dataclasses

import inspect
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable

import flask
import werkzeug.datastructures

import openeo_driver.util.view_helpers
from openeo.capabilities import ComparableVersion
Expand Down Expand Up @@ -382,6 +383,31 @@ class BatchJobResultMetadata:
# TODO: more fields


class JobListing:
"""
Basic container/interface for user job listings,
(e.g. including pagination)
"""

# TODO: just implement as frozen dataclass, or does that conflict with later need to subclass?
__slots__ = ["_jobs", "_next_parameters"]

def __init__(self, jobs: List[BatchJobMetadata], next_parameters: Optional[dict]):
self._jobs = jobs
self._next_parameters = next_parameters

def to_response_dict(self, url_for: Callable[[dict], str], api_version: ComparableVersion) -> dict:
"""Produce `GET /jobs` response data, to be JSONified."""
links = []
if self._next_parameters:
links.append({"rel": "next", "href": url_for(self._next_parameters)})

return {
"jobs": [m.to_api_dict(full=False, api_version=api_version) for m in self._jobs],
"links": links,
}


class BatchJobs(MicroService):
"""
Base contract/implementation for Batch Jobs "microservice"
Expand Down Expand Up @@ -415,7 +441,12 @@ def get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata:
"""
raise NotImplementedError

def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
def get_user_jobs(
self,
user_id: str,
limit: Optional[int] = None,
request_parameters: Optional[werkzeug.datastructures.MultiDict] = None,
) -> Union[List[BatchJobMetadata], dict, JobListing]:
"""
Get details about all batch jobs of a user
https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-jobs
Expand Down Expand Up @@ -810,3 +841,9 @@ def request_costs(
:param job_options: job options (if any provided in the sync processing request)
"""
return None


def function_has_argument(function: Callable, argument: str) -> bool:
"""Does function support given argument?"""
signature = inspect.signature(function)
return argument in signature.parameters
39 changes: 37 additions & 2 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
from shapely.geometry import Polygon, MultiPolygon
from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry
from shapely.geometry.collection import GeometryCollection
import werkzeug.datastructures

from openeo.api.logs import normalize_log_level
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import CollectionMetadata, Band, SpatialDimension, TemporalDimension, BandDimension
import openeo.udf
from openeo.util import rfc3339
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing
from openeo_driver.backend import (
SecondaryServices,
Expand All @@ -33,6 +35,7 @@
LoadParameters,
Processing,
BatchJobResultMetadata,
JobListing,
)
from openeo_driver.config import OpenEoBackendConfig
from openeo_driver.constants import STAC_EXTENSION
Expand All @@ -51,6 +54,7 @@
from openeo_driver.save_result import AggregatePolygonResult, AggregatePolygonSpatialResult
from openeo_driver.users import User
import openeo_driver.util.changelog
from openeo_driver.util.http import UrlSafeStructCodec
from openeo_driver.utils import EvalEnv, generate_unique_id, WhiteListEvalEnv

DEFAULT_DATETIME = datetime(2020, 4, 23, 16, 20, 27)
Expand Down Expand Up @@ -671,8 +675,39 @@ def _get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata:
except KeyError:
raise JobNotFoundException(job_id)

def get_user_jobs(self, user_id: str) -> List[BatchJobMetadata]:
return [v for (k, v) in self._job_registry.items() if k[0] == user_id]
def get_user_jobs(
self,
user_id: str,
limit: Optional[int] = None,
request_parameters: Optional[werkzeug.datastructures.MultiDict] = None,
) -> JobListing:
jobs: List[BatchJobMetadata] = [v for (k, v) in self._job_registry.items() if k[0] == user_id]
next_parameters = None
if limit:
# Pagination: order by `created` and job id (as tiebreaker)
# Note that we order from more recent to older.
# As a result, "search_after" (blatantly based on ElasticSearch)
# practically means "search older"

def order_key(m: BatchJobMetadata) -> Tuple[str, str]:
return rfc3339.datetime(m.created), m.id

url_safe_codec = UrlSafeStructCodec()

threshold = request_parameters.get("search_after")
if threshold:
threshold = url_safe_codec.decode(threshold)
jobs = [m for m in jobs if order_key(m) < threshold]
jobs = sorted(jobs, key=order_key, reverse=True)
add_next = len(jobs) > limit
jobs = jobs[:limit]
if add_next:
next_parameters = {
"limit": limit,
"search_after": url_safe_codec.encode(order_key(jobs[-1])),
}

return JobListing(jobs=jobs, next_parameters=next_parameters)

@classmethod
def _update_status(cls, job_id: str, user_id: str, status: str):
Expand Down
4 changes: 4 additions & 0 deletions openeo_driver/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ def url(self, path):
"""Build URL based on (possibly versioned) root URL."""
return re.sub("/+", "/", f"/{self.url_root}/{path}")

def extract_path(self, url: str) -> str:
m = re.fullmatch(r"(https?://[\w.]+)?/" + re.escape(self.url_root.strip("/")) + r"(?P<path>/.*)", url)
return m.group("path")

def _request_headers(self, headers: dict = None) -> dict:
return {**self.default_request_headers, **(headers or {})}

Expand Down
56 changes: 54 additions & 2 deletions openeo_driver/util/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Set

import json
from typing import Set, Union
import base64
import requests
import requests.adapters

Expand Down Expand Up @@ -29,3 +30,54 @@ def requests_with_retry(
session.mount("http://", adapter)
session.mount("https://", adapter)
return session


class UrlSafeStructCodec:
"""
Utility to encode common data structures (including tuples, bytes, sets)
in a URL-safe way.
"""

# TODO: add compression too?

def __init__(self, sniff: str = "_customserde"):
self._sniff = sniff

def encode(self, data) -> str:
# JSON serialization (with extra preprocessing for tuples)
data = json.dumps(self._json_prepare(data), separators=(",", ":"))
# Use base64 to make get URL-safe string
data = base64.urlsafe_b64encode(data.encode("utf8")).decode("utf8")
return data

def _json_prepare(self, data):
"""Prepare data before passing to `json.dumps`"""
# Note that we do it as preprocessing instead of using
# the json.JSONEncoder `default` feature,
# which does not allow to override the handling of tuples.
if isinstance(data, tuple):
return {self._sniff: "tuple", "d": [self._json_prepare(x) for x in data]}
elif isinstance(data, set):
return {self._sniff: "set", "d": [self._json_prepare(x) for x in data]}
elif isinstance(data, bytes):
return {self._sniff: "bytes", "d": base64.b64encode(data).decode("utf8")}
elif isinstance(data, list):
return [self._json_prepare(x) for x in data]
elif isinstance(data, dict):
return {k: self._json_prepare(v) for k, v in data.items()}
return data

def decode(self, data: str) -> Union[dict, list, tuple, set]:
data = base64.urlsafe_b64decode(data.encode("utf8")).decode("utf8")
data = json.loads(data, object_hook=self._json_object_hook)
return data

def _json_object_hook(self, d: dict):
"""Implementation of `object_hook` of `json.JSONDecoder` API."""
if d.get(self._sniff) == "tuple":
return tuple(d["d"])
elif d.get(self._sniff) == "set":
return set(d["d"])
elif d.get(self._sniff) == "bytes":
return base64.b64decode(d["d"].encode("utf8"))
return d
37 changes: 33 additions & 4 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,17 @@
from openeo.capabilities import ComparableVersion
from openeo.util import dict_no_none, deep_get, Rfc3339, TimingLogger
from openeo_driver.urlsigning import UrlSigner
from openeo_driver.backend import ServiceMetadata, BatchJobMetadata, UserDefinedProcessMetadata, \
ErrorSummary, OpenEoBackendImplementation, BatchJobs, is_not_implemented
from openeo_driver.backend import (
ServiceMetadata,
BatchJobMetadata,
UserDefinedProcessMetadata,
ErrorSummary,
OpenEoBackendImplementation,
BatchJobs,
is_not_implemented,
function_has_argument,
JobListing,
)
from openeo_driver.config import get_backend_config, OpenEoBackendConfig
from openeo_driver.constants import STAC_EXTENSION
from openeo_driver.datacube import DriverMlModel
Expand Down Expand Up @@ -880,9 +889,23 @@ def create_job(user: User):
@blueprint.route('/jobs', methods=['GET'])
@auth_handler.requires_bearer_auth
def list_jobs(user: User):
# TODO: support for `limit` param and paging links?
# TODO: openEO API currently prescribes no pagination by default (unset limit)
# This is however not very scalable, so we might want to set a default limit here.
# Also see https://github.com/Open-EO/openeo-api/issues/550
limit = flask.request.args.get("limit", type=int)
request_parameters = flask.request.args

get_user_jobs = backend_implementation.batch_jobs.get_user_jobs
if function_has_argument(get_user_jobs, argument="limit") and function_has_argument(
get_user_jobs, argument="request_parameters"
):
# TODO: make this the one and only code path when all `get_user_jobs` implementations are migrated
listing = get_user_jobs(user_id=user.user_id, limit=limit, request_parameters=request_parameters)
else:
# TODO: remove support for this old API
listing = get_user_jobs(user_id=user.user_id)

listing = backend_implementation.batch_jobs.get_user_jobs(user.user_id)
# TODO: while eliminating old `get_user_jobs` API above, also just settle on dict based return
if isinstance(listing, list):
jobs = listing
links = []
Expand All @@ -893,6 +916,12 @@ def list_jobs(user: User):
# TODO: this "extra" whitelist is from experimental
# "federation extension API" https://github.com/Open-EO/openeo-api/pull/419
extra = {k: listing[k] for k in ["federation:missing"] if k in listing}
elif isinstance(listing, JobListing):
data = listing.to_response_dict(
url_for=lambda params: flask.url_for(".list_jobs", **params, _external=True),
api_version=requested_api_version(),
)
return flask.jsonify(data)
else:
raise InternalException(f"Invalid user jobs listing {type(listing)}")

Expand Down
55 changes: 55 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,61 @@ def get_user_jobs(self, user_id: str):
"federation:missing": ["b4"],
}

def test_list_user_jobs_paging(self, api100):
# Setup some jobs in registry
dummy_backend.DummyBatchJobs._job_registry = {
(TEST_USER, f"j-{x}{x}"): BatchJobMetadata(
id=f"j-{x}{x}",
status="created",
created=datetime(2024, 12, x),
)
for x in [1, 2, 3, 4, 5, 6, 7, 8]
}

# First job listing
resp = api100.get("/jobs?limit=3", headers=self.AUTH_HEADER)
data = resp.assert_status_code(200).json
expected_next_href = dirty_equals.IsStr(
regex=re.escape("http://oeo.net/openeo/1.0.0/jobs?limit=3&search_after=") + "[a-zA-Z0-9_/-]+(%3D)*"
)
assert data == {
"jobs": [
dirty_equals.IsPartialDict(id="j-88", created="2024-12-08T00:00:00Z"),
dirty_equals.IsPartialDict(id="j-77", created="2024-12-07T00:00:00Z"),
dirty_equals.IsPartialDict(id="j-66", created="2024-12-06T00:00:00Z"),
],
"links": [
{"rel": "next", "href": expected_next_href},
],
}

# Follow link to next page (first, linking to second page)
next_url = data["links"][0]["href"]
resp = api100.get(path=api100.extract_path(next_url), headers=self.AUTH_HEADER)
data = resp.assert_status_code(200).json
assert data == {
"jobs": [
dirty_equals.IsPartialDict(id="j-55", created="2024-12-05T00:00:00Z"),
dirty_equals.IsPartialDict(id="j-44", created="2024-12-04T00:00:00Z"),
dirty_equals.IsPartialDict(id="j-33", created="2024-12-03T00:00:00Z"),
],
"links": [
{"rel": "next", "href": expected_next_href},
],
}

# Follow link to next (second, no next page)
next_url = data["links"][0]["href"]
resp = api100.get(path=api100.extract_path(next_url), headers=self.AUTH_HEADER)
data = resp.assert_status_code(200).json
assert data == {
"jobs": [
dirty_equals.IsPartialDict(id="j-22", created="2024-12-02T00:00:00Z"),
dirty_equals.IsPartialDict(id="j-11", created="2024-12-01T00:00:00Z"),
],
"links": [],
}

def test_get_job_results_unfinished(self, api):
with self._fresh_job_registry(next_job_id="job-345"):
resp = api.get('/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', headers=self.AUTH_HEADER)
Expand Down
30 changes: 29 additions & 1 deletion tests/util/test_http.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import re

import pytest
import requests.exceptions
from re_assert import Matches

from openeo_driver.util.http import requests_with_retry
from openeo_driver.util.http import requests_with_retry, UrlSafeStructCodec


def test_requests_with_retry(caplog):
Expand Down Expand Up @@ -41,3 +42,30 @@ def test_requests_with_retry_zero(caplog):
assert caplog.messages == [
"Starting new HTTPS connection (1): example.test:443",
]


class TestUrlSafeStructCodec:
@pytest.mark.parametrize(
"data",
[
"foo",
"unsafe?str!n@:*&~%^&foo=bar&baz[]",
1234,
["apple", "banana", "coconut"],
{"type": "banana", "color": "blue"},
b"bytes",
bytes([0, 2, 128]),
("tuple", 123),
{"set", "set", "go"},
("nest", (1, 2), [3, 4], {5, 6}),
{"nest", (1, 2), 777},
],
)
def test_basic(self, data):
codec = UrlSafeStructCodec()
encoded = codec.encode(data)
assert isinstance(encoded, str)
assert re.fullmatch("[a-zA-Z0-9_/=-]+", encoded)
assert codec.decode(encoded) == data

import regex

0 comments on commit f5d6eff

Please sign in to comment.