Skip to content

Commit

Permalink
fixup! Issue #332 introduce support for job listing pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 4, 2024
1 parent 9ba267b commit f92af81
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 20 deletions.
5 changes: 5 additions & 0 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ def get_user_jobs(
"""
Get details about all batch jobs of a user
https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-jobs
:param limit: limit parameter in request (as defined by openEO API spec)
:param request_parameters: all request parameters.
Note that backend implementation has freedom here
how to leverage these to implement pagination
"""
raise NotImplementedError

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ def get_user_jobs(
def order_key(m: BatchJobMetadata) -> Tuple[str, str]:
return rfc3339.datetime(m.created), m.id

url_safe_codec = UrlSafeStructCodec()
url_safe_codec = UrlSafeStructCodec(signature_field="_usc")

threshold = request_parameters.get("search_after")
if threshold:
Expand Down
1 change: 1 addition & 0 deletions openeo_driver/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def url(self, path):
return re.sub("/+", "/", f"/{self.url_root}/{path}")

def extract_path(self, url: str) -> str:
"""Strip host and root_url from url to get openEO-style path"""
m = re.fullmatch(r"(https?://[\w.]+)?/" + re.escape(self.url_root.strip("/")) + r"(?P<path>/.*)", url)
return m.group("path")

Expand Down
34 changes: 22 additions & 12 deletions openeo_driver/util/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import requests
import requests.adapters

from openeo.util import repr_truncate


def requests_with_retry(
total: int = 3, # the number of retries, not attempts (which is retries + 1)
Expand Down Expand Up @@ -36,13 +38,17 @@ def requests_with_retry(
class UrlSafeStructCodec:
"""
Utility to encode common data structures (including tuples, bytes, sets)
in a URL-safe way.
in a URL-safe way, based on enriched JSON serialization and base64 encoding
"""

# TODO: add compression too?
# TODO: support dicts with tuple keys

def __init__(self, sniff: str = "_customserde"):
self._sniff = sniff
def __init__(self, signature_field: str = "_custjson"):
"""
:param signature_field: field to use as indicator for custom object constructs
"""
self._signature_field = signature_field

def encode(self, data) -> str:
# JSON serialization (with extra preprocessing for tuples)
Expand All @@ -57,28 +63,32 @@ def _json_prepare(self, data):
# the json.JSONEncoder `default` feature,
# which does not allow to override the handling of tuples.
if isinstance(data, tuple):
return {self._sniff: "tuple", "d": [self._json_prepare(x) for x in data]}
return {self._signature_field: "tuple", "d": [self._json_prepare(x) for x in data]}
elif isinstance(data, set):
return {self._sniff: "set", "d": [self._json_prepare(x) for x in data]}
return {self._signature_field: "set", "d": [self._json_prepare(x) for x in data]}
elif isinstance(data, bytes):
return {self._sniff: "bytes", "d": base64.b64encode(data).decode("utf8")}
return {self._signature_field: "bytes", "d": base64.b64encode(data).decode("utf8")}
elif isinstance(data, list):
return [self._json_prepare(x) for x in data]
elif isinstance(data, dict):
return {k: self._json_prepare(v) for k, v in data.items()}
return data

def decode(self, data: str) -> Union[dict, list, tuple, set]:
data = base64.urlsafe_b64decode(data.encode("utf8")).decode("utf8")
data = json.loads(data, object_hook=self._json_object_hook)
return data
try:
data = base64.urlsafe_b64decode(data.encode("utf8")).decode("utf8")
data = json.loads(data, object_hook=self._json_object_hook)
return data
except Exception as e:
raise ValueError(f"Failed to decode {repr_truncate(data)}") from e


def _json_object_hook(self, d: dict):
"""Implementation of `object_hook` of `json.JSONDecoder` API."""
if d.get(self._sniff) == "tuple":
if d.get(self._signature_field) == "tuple":
return tuple(d["d"])
elif d.get(self._sniff) == "set":
elif d.get(self._signature_field) == "set":
return set(d["d"])
elif d.get(self._sniff) == "bytes":
elif d.get(self._signature_field) == "bytes":
return base64.b64decode(d["d"].encode("utf8"))
return d
7 changes: 3 additions & 4 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,16 +896,15 @@ def list_jobs(user: User):
request_parameters = flask.request.args

get_user_jobs = backend_implementation.batch_jobs.get_user_jobs
if function_has_argument(get_user_jobs, argument="limit") and function_has_argument(
get_user_jobs, argument="request_parameters"
):
if function_has_argument(get_user_jobs, argument="request_parameters"):
# TODO: make this the one and only code path when all `get_user_jobs` implementations are migrated
listing = get_user_jobs(user_id=user.user_id, limit=limit, request_parameters=request_parameters)
else:
# TODO: remove support for this old API
listing = get_user_jobs(user_id=user.user_id)

# TODO: while eliminating old `get_user_jobs` API above, also just settle on JobListing based return
# TODO: while eliminating old `get_user_jobs` API above, also just settle on JobListing based return,
# and drop all this legacy cruft
if isinstance(listing, list):
jobs = listing
links = []
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"moto[s3]>=5.0.0",
"time-machine>=2.8.0",
"netCDF4>=1.5.4",
"re-assert",
"re-assert", # TODO: drop this dependency in favor of dirty-equals
"pyarrow>=10.0.0",
"pystac",
"jsonschema",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,7 @@ def test_list_user_jobs_paging(self, api100):
for x in [1, 2, 3, 4, 5, 6, 7, 8]
}

# First job listing
# Initial job listing with explicit limit (first page)
resp = api100.get("/jobs?limit=3", headers=self.AUTH_HEADER)
data = resp.assert_status_code(200).json
expected_next_href = dirty_equals.IsStr(
Expand Down
18 changes: 17 additions & 1 deletion tests/util/test_http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import re

import base64
import pytest
import requests.exceptions
from re_assert import Matches
Expand Down Expand Up @@ -45,6 +45,12 @@ def test_requests_with_retry_zero(caplog):


class TestUrlSafeStructCodec:
def test_simple(self):
codec = UrlSafeStructCodec(signature_field="_usc")
assert codec.encode("foo") == "ImZvbyI="
assert codec.encode([1, 2, "three"]) == "WzEsMiwidGhyZWUiXQ=="
assert codec.encode((1, 2, "three")) == "eyJfdXNjIjoidHVwbGUiLCJkIjpbMSwyLCJ0aHJlZSJdfQ=="

@pytest.mark.parametrize(
"data",
[
Expand All @@ -59,6 +65,7 @@ class TestUrlSafeStructCodec:
{"set", "set", "go"},
("nest", (1, 2), [3, 4], {5, 6}),
{"nest", (1, 2), 777},
{"deep": ["nested", ("data", [1, 2, (3, 4), {"foo": ("bar", ["x", "y"])}])]},
],
)
def test_basic(self, data):
Expand All @@ -67,3 +74,12 @@ def test_basic(self, data):
assert isinstance(encoded, str)
assert re.fullmatch("[a-zA-Z0-9_/=-]+", encoded)
assert codec.decode(encoded) == data

def test_decode_b64_garbage(self):
with pytest.raises(ValueError, match="Failed to decode"):
_ = UrlSafeStructCodec().decode(data="g@rbag%?$$!")

def test_decode_json_garbage(self):
garbage = base64.b64encode(b"nope, n0t J$ON here").decode("utf8")
with pytest.raises(ValueError, match="Failed to decode"):
_ = UrlSafeStructCodec().decode(data=garbage)

0 comments on commit f92af81

Please sign in to comment.