Skip to content

Commit

Permalink
Issue #959 job listing pagination in DoubleJobRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 5, 2024
1 parent 69f85bb commit 716bec1
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 57 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ without compromising stable operations.

## Unreleased

## 0.52.0

- Throw error when trying to use unsupported `target_dimension` in `aggregate_spatial` ([#951](https://github.com/Open-EO/openeo-geopyspark-driver/issues/951))
- Allow specifying region name for an ObjectStorageWorkspace ([#955](https://github.com/Open-EO/openeo-geopyspark-driver/pull/955))
- Better print ApiException. ([#962](https://github.com/Open-EO/openeo-geopyspark-driver/pull/962))
- Include job title by default in user job listings ([#963](https://github.com/Open-EO/openeo-geopyspark-driver/issues/963))
- Support pagination of user job listings ([#959](https://github.com/Open-EO/openeo-geopyspark-driver/issues/959)/[Open-EO/openeo-python-driver#332](https://github.com/Open-EO/openeo-python-driver/issues/332))

## 0.51.0

Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.51.0a1"
__version__ = "0.52.0a1"
18 changes: 16 additions & 2 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@
from openeo.metadata import Band, BandDimension, Dimension, SpatialDimension, TemporalDimension
from openeo.util import TimingLogger, deep_get, dict_no_none, repr_truncate, rfc3339, str_truncate
from openeo_driver import backend
from openeo_driver.backend import BatchJobMetadata, ErrorSummary, LoadParameters, OidcProvider, ServiceMetadata
from openeo_driver.backend import (
BatchJobMetadata,
ErrorSummary,
LoadParameters,
OidcProvider,
ServiceMetadata,
JobListing,
)
from openeo_driver.config.load import ConfigGetter
from openeo_driver.datacube import DriverDataCube, DriverVectorCube
from openeo_driver.datastructs import SarBackscatterArgs
Expand Down Expand Up @@ -1587,12 +1594,19 @@ def processing_units_spent(value_estimate: Decimal, temporal_step: Optional[str]
else: # still some running: continue polling
pass

def get_user_jobs(self, user_id: str) -> List[BatchJobMetadata]:
def get_user_jobs(
self,
user_id: str,
limit: Optional[int] = None,
request_parameters: Optional[dict] = None,
) -> Union[List[BatchJobMetadata], JobListing]:
with self._double_job_registry as registry:
return registry.get_user_jobs(
user_id=user_id,
# Make sure to include title (in addition to the basic fields)
fields=["title"],
limit=limit,
request_parameters=request_parameters,
)

# TODO: issue #232 we should get this from S3 but should there still be an output dir then?
Expand Down
96 changes: 44 additions & 52 deletions openeogeotrellis/job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import logging
import random
import threading
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Any, List, Dict, Callable, Union, Optional, Iterator, Tuple
Expand All @@ -16,13 +15,15 @@
from kazoo.protocol.states import ZnodeStat

from openeo.util import rfc3339, dict_no_none, TimingLogger
from openeo_driver.backend import BatchJobMetadata
from openeo_driver.backend import BatchJobMetadata, JobListing
from openeo_driver.errors import JobNotFoundException
from openeo_driver.jobregistry import (
JOB_STATUS,
JobRegistryInterface,
JobDict,
ejr_job_info_to_metadata,
)
from openeo_driver.util.http import UrlSafeStructCodec
from openeogeotrellis import sentinel_hub
from openeogeotrellis.config import get_backend_config
from openeogeotrellis.configparams import ConfigParams
Expand Down Expand Up @@ -581,47 +582,6 @@ def map_safe(prop: str, f):
)


def ejr_job_info_to_metadata(job_info: JobDict, full: bool = True) -> BatchJobMetadata:
"""Convert job info dict (from JobRegistryInterface) to BatchJobMetadata"""
# TODO: eliminate zk_job_info_to_metadata/ejr_job_info_to_metadata duplication?

def map_safe(prop: str, f):
value = job_info.get(prop)
return f(value) if value else None

def get_results_metadata(result_metadata_prop: str):
return job_info.get("results_metadata", {}).get(result_metadata_prop)

def map_results_metadata_safe(result_metadata_prop: str, f):
value = get_results_metadata(result_metadata_prop)
return f(value) if value is not None else None

return BatchJobMetadata(
id=job_info["job_id"],
status=job_info["status"],
created=map_safe("created", rfc3339.parse_datetime),
process=job_info.get("process") if full else None,
job_options=job_info.get("job_options") if full else None,
title=job_info.get("title"),
description=job_info.get("description"),
updated=map_safe("updated", rfc3339.parse_datetime),
started=map_safe("started", rfc3339.parse_datetime),
finished=map_safe("finished", rfc3339.parse_datetime),
memory_time_megabyte=map_safe("memory_time_megabyte_seconds", lambda seconds: timedelta(seconds=seconds)),
cpu_time=map_safe("cpu_time_seconds", lambda seconds: timedelta(seconds=seconds)),
geometry=get_results_metadata("geometry"),
bbox=get_results_metadata("bbox"),
start_datetime=map_results_metadata_safe("start_datetime", rfc3339.parse_datetime),
end_datetime=map_results_metadata_safe("end_datetime", rfc3339.parse_datetime),
instruments=get_results_metadata("instruments"),
epsg=get_results_metadata("epsg"),
links=get_results_metadata("links"),
usage=job_info.get("usage"),
costs=job_info.get("costs"),
proj_shape=get_results_metadata("proj:shape"),
proj_bbox=get_results_metadata("proj:bbox"),
)


class InMemoryJobRegistry(JobRegistryInterface):
"""
Expand Down Expand Up @@ -726,9 +686,29 @@ def set_results_metadata(self, job_id: str, costs: Optional[float], usage: dict,
return self._update(job_id=job_id, costs=costs, usage=usage, results_metadata=results_metadata)

def list_user_jobs(
self, user_id: str, fields: Optional[List[str]] = None
) -> List[JobDict]:
return [job for job in self.db.values() if job["user_id"] == user_id]
self,
user_id: str,
*,
fields: Optional[List[str]] = None,
limit: Optional[int] = None,
request_parameters: Optional[dict] = None,
# TODO #959 settle on returning just `JobListing` and eliminate other options/code paths.
) -> Union[JobListing, List[JobDict]]:
jobs = [job for job in self.db.values() if job["user_id"] == user_id]
if limit:
pagination_param = "page"
page_number = int((request_parameters or {}).get(pagination_param, 0))
assert page_number >= 0 and limit >= 1
if len(jobs) > (page_number + 1) * limit:
next_parameters = {"limit": limit, pagination_param: page_number + 1}
else:
next_parameters = None
jobs = jobs[page_number * limit : (page_number + 1) * limit]
jobs = JobListing(
jobs=[ejr_job_info_to_metadata(j, full=False) for j in jobs],
next_parameters=next_parameters,
)
return jobs

def list_active_jobs(
self,
Expand Down Expand Up @@ -962,17 +942,29 @@ def mark_ongoing(self, job_id: str, user_id: str) -> None:
self.zk_job_registry.mark_ongoing(job_id=job_id, user_id=user_id)

def get_user_jobs(
self, user_id: str, fields: Optional[List[str]] = None
) -> List[BatchJobMetadata]:
self,
user_id: str,
*,
fields: Optional[List[str]] = None,
limit: Optional[int] = None,
request_parameters: Optional[dict] = None,
# TODO # 959 settle on returning just `JobListing` and eliminate other options/code paths.
) -> Union[List[BatchJobMetadata], JobListing]:
zk_jobs = None
ejr_jobs = None
if self.zk_job_registry:
zk_jobs = [zk_job_info_to_metadata(j) for j in self.zk_job_registry.get_user_jobs(user_id)]
if self.elastic_job_registry:
ejr_jobs = [
ejr_job_info_to_metadata(j, full=False)
for j in self.elastic_job_registry.list_user_jobs(user_id=user_id, fields=fields)
]
ejr_jobs = self.elastic_job_registry.list_user_jobs(
user_id=user_id,
fields=fields,
limit=limit,
request_parameters=request_parameters,
)
# TODO #959 Settle on just handling JobListing and drop other legacy code path
if isinstance(ejr_jobs, list):
ejr_jobs = JobListing(jobs=[ejr_job_info_to_metadata(j, full=False) for j in ejr_jobs])
assert isinstance(ejr_jobs, JobListing)

# TODO: more insightful comparison? (e.g. only consider recent jobs)
self._log.log(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
tests_require=tests_require,
install_requires=[
"openeo>=0.33.0",
"openeo_driver>=0.119.0.dev",
"openeo_driver>=0.120.1.dev",
'pyspark==3.4.2; python_version>="3.8"',
'pyspark>=2.3.1,<2.4.0; python_version<"3.8"',
'geopyspark==0.4.7+openeo',
Expand Down
82 changes: 81 additions & 1 deletion tests/test_job_registry.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import datetime
import logging
from unittest import mock
import urllib.parse

import kazoo.exceptions
import pytest
from kazoo.handlers.threading import KazooTimeoutError
from openeo_driver.backend import BatchJobMetadata
from openeo_driver.backend import BatchJobMetadata, JobListing
from openeo_driver.errors import JobNotFoundException
from openeo_driver.jobregistry import JOB_STATUS
from openeo_driver.testing import DictSubSet
import dirty_equals

from openeogeotrellis.config import get_backend_config
from openeogeotrellis.job_registry import (
Expand Down Expand Up @@ -260,6 +262,48 @@ def test_list_user_jobs(self):
]
assert jr.list_user_jobs(user_id="charlie") == []

def _build_url(self, params: dict):
return "https://oeo.test/jobs?" + urllib.parse.urlencode(query=params)

def test_list_user_jobs_paginated(self):
jr = InMemoryJobRegistry()
for j in range(1, 8):
jr.create_job(job_id=f"j-{j}", user_id="alice", process={"foo": "bar"})

# First page
listing = jr.list_user_jobs(user_id="alice", limit=3)
assert isinstance(listing, JobListing)
assert listing.to_response_dict(build_url=self._build_url) == {
"jobs": [
dirty_equals.IsPartialDict(id="j-1"),
dirty_equals.IsPartialDict(id="j-2"),
dirty_equals.IsPartialDict(id="j-3"),
],
"links": [{"href": "https://oeo.test/jobs?limit=3&page=1", "rel": "next"}],
}

# Second page
listing = jr.list_user_jobs(user_id="alice", limit=3, request_parameters={"limit": 3, "page": 1})
assert isinstance(listing, JobListing)
assert listing.to_response_dict(build_url=self._build_url) == {
"jobs": [
dirty_equals.IsPartialDict(id="j-4"),
dirty_equals.IsPartialDict(id="j-5"),
dirty_equals.IsPartialDict(id="j-6"),
],
"links": [{"href": "https://oeo.test/jobs?limit=3&page=2", "rel": "next"}],
}

# Third page
listing = jr.list_user_jobs(user_id="alice", limit=3, request_parameters={"limit": 3, "page": 2})
assert isinstance(listing, JobListing)
assert listing.to_response_dict(build_url=self._build_url) == {
"jobs": [
dirty_equals.IsPartialDict(id="j-7"),
],
"links": [],
}


class TestDoubleJobRegistry:
DUMMY_PROCESS = {
Expand Down Expand Up @@ -715,6 +759,42 @@ def test_get_user_jobs(self, double_jr, caplog):
"DoubleJobRegistry.get_user_jobs(user_id='alice') zk_jobs=[] ejr_jobs=[]",
]

def _build_url(self, params: dict):
return "https://oeo.test/jobs?" + urllib.parse.urlencode(query=params)

def test_get_user_jobs_paginated(self, memory_jr, caplog):
double_jr = DoubleJobRegistry(
zk_job_registry_factory=None,
elastic_job_registry=memory_jr,
)

with double_jr:
for j in range(1, 6):
double_jr.create_job(job_id=f"j-{j}", user_id="alice", process=self.DUMMY_PROCESS)

# First page
listing = double_jr.get_user_jobs(user_id="alice", limit=3)
assert isinstance(listing, JobListing)
assert listing.to_response_dict(build_url=self._build_url) == {
"jobs": [
dirty_equals.IsPartialDict(id="j-1"),
dirty_equals.IsPartialDict(id="j-2"),
dirty_equals.IsPartialDict(id="j-3"),
],
"links": [{"href": "https://oeo.test/jobs?limit=3&page=1", "rel": "next"}],
}

# Second page
listing = double_jr.get_user_jobs(user_id="alice", limit=3, request_parameters={"limit": 3, "page": 1})
assert isinstance(listing, JobListing)
assert listing.to_response_dict(build_url=self._build_url) == {
"jobs": [
dirty_equals.IsPartialDict(id="j-4"),
dirty_equals.IsPartialDict(id="j-5"),
],
"links": [],
}

def test_get_user_jobs_mismatch(self, double_jr, memory_jr, caplog):
with double_jr:
double_jr.create_job(
Expand Down

0 comments on commit 716bec1

Please sign in to comment.