Skip to content

Commit

Permalink
Issue #332 simplify "next" page handling
Browse files Browse the repository at this point in the history
Just pass next page number as-is, no need for fancy UrlSafeStructCodec at the moment
  • Loading branch information
soxofaan committed Dec 5, 2024
1 parent 95eebff commit df56fe8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 44 deletions.
2 changes: 1 addition & 1 deletion openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ class JobListing:
(e.g. including pagination)
"""

# TODO: just implement as frozen dataclass, or does that conflict with later need to subclass?
# TODO #332 just implement as frozen dataclass, or does that conflict with later need to subclass?
__slots__ = ["_jobs", "_next_parameters"]

def __init__(self, jobs: List[BatchJobMetadata], next_parameters: Optional[dict] = None):
Expand Down
69 changes: 34 additions & 35 deletions openeo_driver/jobregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from openeo_driver.constants import JOB_STATUS
from openeo_driver.errors import InternalException, JobNotFoundException
from openeo_driver.util.auth import ClientCredentials, ClientCredentialsAccessTokenHelper
from openeo_driver.util.http import UrlSafeStructCodec
from openeo_driver.util.logging import ExtraLoggingFilter
from openeo_driver.utils import generate_unique_id

Expand Down Expand Up @@ -359,7 +358,7 @@ def _do_request(
url = url_join(self._api_url, path)
self.logger.debug(f"Doing EJR request `{method} {url}` {params=} {headers.keys()=}")
if self._debug_show_curl:
# TODO: add params to curl command
# TODO #332 add params to curl command
curl_command = self._as_curl(method=method, url=url, data=json, headers=headers)
self.logger.debug(f"Equivalent curl command: {curl_command}")
try:
Expand Down Expand Up @@ -600,7 +599,7 @@ def _search(self, query: dict, fields: Optional[List[str]] = None) -> List[JobDi
@dataclasses.dataclass(frozen=True)
class PaginatedSearchResult:
jobs: List[JobDict]
pagination: dict
next_page: Union[int, None]

def _search_paginated(
self,
Expand All @@ -612,12 +611,12 @@ def _search_paginated(
) -> PaginatedSearchResult:
fields = set(fields or [])
# Make sure to include some basic fields by default
# TODO: avoid duplication of this default field set
# TODO #332 avoid duplication of this default field set
fields.update(["job_id", "user_id", "created", "status", "updated"])
params = {}
if page_size:
params["size"] = page_size
if page_number:
if page_number is not None and page_number >= 0:
params["page"] = page_number
body = {
"query": query,
Expand All @@ -630,11 +629,29 @@ def _search_paginated(
# "jobs": [list of job docs],
# "pagination': {"previous": "size=5&page=1", "next": "size=5&page=3"}
# }
next_page_number = self._parse_response_pagination(
pagination=response.get("pagination", {}).get("next"),
expected_size=page_size,
)
self.logger.debug(f"Parsed {next_page_number=} from {response.get('pagination')=}")
return self.PaginatedSearchResult(
jobs=response.get("jobs", []),
pagination=response.get("pagination", {}),
next_page=next_page_number,
)

@staticmethod
def _parse_response_pagination(pagination: dict, expected_size: int) -> Union[int, None]:
"""Extract page number from pagination construct"""
if isinstance(pagination, str):
# TODO #332 get rid of this legacy format translation code path
params = urllib.parse.parse_qs(pagination)
if "size" in params and "page" in params:
pagination = {"size": int(params["size"][0]), "page": int(params["page"][0])}
if "size" in pagination and "page" in pagination:
if int(pagination["size"]) != expected_size:
raise ValueError(f"Page size mismatch {expected_size=} vs {pagination=}")
return int(pagination["page"])

def list_user_jobs(
self,
user_id: Optional[str],
Expand All @@ -655,34 +672,20 @@ def list_user_jobs(

if limit:
# Do paginated search
# TODO: make this the one and only code path
url_safe_codec = UrlSafeStructCodec(signature_field="_usc")
page_number = None
page_params = (request_parameters or {}).get(self.PAGINATION_URL_PARAM)
if page_params:
# Extract page number
page_params = url_safe_codec.decode(page_params)
if isinstance(page_params, str):
# TODO: this is old code path where page params where encoded as URL query string
# parse as URL params
page_params = urllib.parse.parse_qs(page_params)
assert limit == int(page_params["size"][0])
page_number = int(page_params["page"][0])
elif isinstance(page_params, dict):
# TODO: this dict handling should be (is?) the only code path?
assert limit == page_params["size"]
page_number = page_params["page"]
else:
raise ValueError(page_params)

# TODO #332 make this the one and only code path
page_number = (request_parameters or {}).get(self.PAGINATION_URL_PARAM)
if page_number:
page_number = int(page_number)
else:
page_number = None
data = self._search_paginated(query=query, fields=fields, page_size=limit, page_number=page_number)
return JobListing(
jobs=[ejr_job_info_to_metadata(j, full=False) for j in data.jobs],
next_parameters={self.PAGINATION_URL_PARAM: url_safe_codec.encode(data.pagination.get("next"))},
next_parameters={"limit": limit, self.PAGINATION_URL_PARAM: data.next_page},
)
else:
# Deprecated non-paginated search
# TODO: eliminate this code path
# TODO #332 eliminate this code path
return self._search(query=query, fields=fields)

def list_active_jobs(
Expand Down Expand Up @@ -781,11 +784,7 @@ def _parse_cli(self) -> argparse.Namespace:
cli_list_user.add_argument("--backend-id", help="Backend id to filter on.")
cli_list_user.add_argument("user_id", help="User id to filter on.")
cli_list_user.add_argument("--limit", type=int, default=None, help="Page size of job listing")
cli_list_user.add_argument(
"--pagination",
default=None,
help=f"Pagination URL fragment from 'next' link. E.g. '/jobs?{ElasticJobRegistry.PAGINATION_URL_PARAM}=InNpemU9N...'",
)
cli_list_user.add_argument("--page", default=0)
cli_list_user.set_defaults(func=self.list_user_jobs)

cli_create = subparsers.add_parser("create", help="Create a new job.")
Expand Down Expand Up @@ -881,8 +880,8 @@ def list_user_jobs(self, args: argparse.Namespace):
user_id = args.user_id
ejr = self._get_job_registry(cli_args=args)
request_parameters = {}
if args.pagination:
request_parameters[ElasticJobRegistry.PAGINATION_URL_PARAM] = args.pagination.split("?")[-1]
if args.page:
request_parameters[ElasticJobRegistry.PAGINATION_URL_PARAM] = str(args.page)
jobs = ejr.list_user_jobs(
user_id=user_id,
# TODO: option to return more fields?
Expand Down
16 changes: 8 additions & 8 deletions tests/test_jobregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,39 +545,39 @@ def _build_url(self, params: dict):
None,
{"size": ["3"]},
True,
"https://oeo.test/jobs?page=InNpemU9MyZwYWdlPTEi",
"https://oeo.test/jobs?limit=3&page=1",
["j-1", "j-2", "j-3"],
),
(
3,
None,
{"size": ["3"]},
False,
"https://oeo.test/jobs?page=eyJzaXplIjozLCJwYWdlIjoxfQ%3D%3D",
"https://oeo.test/jobs?limit=3&page=1",
["j-1", "j-2", "j-3"],
),
(
3,
{ElasticJobRegistry.PAGINATION_URL_PARAM: "InNpemU9MyZwYWdlPTEi"},
{ElasticJobRegistry.PAGINATION_URL_PARAM: "1"},
{"size": ["3"], "page": ["1"]},
True,
"https://oeo.test/jobs?page=InNpemU9MyZwYWdlPTIi",
"https://oeo.test/jobs?limit=3&page=2",
["j-4", "j-5", "j-6"],
),
(
3,
{ElasticJobRegistry.PAGINATION_URL_PARAM: "InNpemU9MyZwYWdlPTEi"},
{ElasticJobRegistry.PAGINATION_URL_PARAM: "1"},
{"size": ["3"], "page": ["1"]},
False,
"https://oeo.test/jobs?page=eyJzaXplIjozLCJwYWdlIjoyfQ%3D%3D",
"https://oeo.test/jobs?limit=3&page=2",
["j-4", "j-5", "j-6"],
),
(
5,
{ElasticJobRegistry.PAGINATION_URL_PARAM: "InNpemU9NSZwYWdlPTExMSI="},
{ElasticJobRegistry.PAGINATION_URL_PARAM: "111"},
{"size": ["5"], "page": ["111"]},
False,
"https://oeo.test/jobs?page=eyJzaXplIjo1LCJwYWdlIjoxMTJ9",
"https://oeo.test/jobs?limit=5&page=112",
["j-556", "j-557", "j-558", "j-559", "j-560"],
),
],
Expand Down

0 comments on commit df56fe8

Please sign in to comment.