Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for paged job listing #333

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ and start a new "In Progress" section above it.

## In progress


## 0.120.0

- mask: also apply at load time when resample_spatial is used
- NDVI process: correctly handle band dimension as part of dry run
- Introduce support for user job pagination ([#332](https://github.com/Open-EO/openeo-python-driver/issues/332))


## 0.119.0

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.119.0a1"
__version__ = "0.120.1a1"
61 changes: 58 additions & 3 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import json
import logging
import dataclasses

import inspect
import sys
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -31,7 +31,7 @@
from openeo_driver.datastructs import SarBackscatterArgs
from openeo_driver.dry_run import SourceConstraint
from openeo_driver.errors import CollectionNotFoundException, ServiceUnsupportedException, FeatureUnsupportedException
from openeo_driver.jobregistry import JOB_STATUS
from openeo_driver.constants import JOB_STATUS
from openeo_driver.processes import ProcessRegistry
from openeo_driver.users import User
from openeo_driver.users.oidc import OidcProvider
Expand Down Expand Up @@ -382,6 +382,44 @@ class BatchJobResultMetadata:
# TODO: more fields


class JobListing:
"""
Basic container/interface for user job listings,
(e.g. including pagination)
"""

# TODO #332 just implement as frozen dataclass, or does that conflict with later need to subclass?
__slots__ = ["_jobs", "_next_parameters"]

def __init__(self, jobs: List[BatchJobMetadata], next_parameters: Optional[dict] = None):
"""

:param jobs: list of job metadata constructs
:param next_parameters: dictionary of URL parameters to add to `GET /jobs` URL to link to next page
"""
self._jobs = jobs
self._next_parameters = next_parameters

def to_response_dict(self, build_url: Callable[[dict], str], api_version: ComparableVersion = None) -> dict:
"""
Produce `GET /jobs` response data, to be JSONified.

:param build_url: function to generate a paginated" URL from given pagination related parameters,
e.g. `lambda params: flask.url_for(".list_jobs", **params, _external=True)`
"""
links = []
if self._next_parameters:
links.append({"rel": "next", "href": build_url(self._next_parameters)})

return {
"jobs": [m.to_api_dict(full=False, api_version=api_version) for m in self._jobs],
"links": links,
}

def __len__(self) -> int:
return len(self._jobs)


class BatchJobs(MicroService):
"""
Base contract/implementation for Batch Jobs "microservice"
Expand Down Expand Up @@ -415,10 +453,21 @@ def get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata:
"""
raise NotImplementedError

def get_user_jobs(self, user_id: str) -> Union[List[BatchJobMetadata], dict]:
def get_user_jobs(
self,
user_id: str,
limit: Optional[int] = None,
request_parameters: Optional[dict] = None,
# TODO #332 settle on returning just `JobListing` and eliminate other options/code paths.
) -> Union[List[BatchJobMetadata], dict, JobListing]:
"""
Get details about all batch jobs of a user
https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-jobs

:param limit: limit parameter in request (as defined by openEO API spec)
:param request_parameters: all request parameters.
Note that backend implementation has freedom here
how to leverage these to implement pagination
"""
raise NotImplementedError

Expand Down Expand Up @@ -810,3 +859,9 @@ def request_costs(
:param job_options: job options (if any provided in the sync processing request)
"""
return None


def function_has_argument(function: Callable, argument: str) -> bool:
"""Does function support given argument?"""
signature = inspect.signature(function)
return argument in signature.parameters
15 changes: 15 additions & 0 deletions openeo_driver/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,18 @@ class STAC_EXTENSION:
MLMODEL = "https://stac-extensions.github.io/ml-model/v1.0.0/schema.json"
CARD4LOPTICAL = "https://stac-extensions.github.io/card4l/v0.1.0/optical/schema.json"
CARD4LSAR = "https://stac-extensions.github.io/card4l/v0.1.0/sar/schema.json"


class JOB_STATUS:
"""
Container of batch job status constants.

Allows to easily find places where batch job status is checked/set/updated.
"""

CREATED = "created"
QUEUED = "queued"
RUNNING = "running"
CANCELED = "canceled"
FINISHED = "finished"
ERROR = "error"
96 changes: 69 additions & 27 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,68 @@
import importlib.metadata
import json
import numbers
import unittest.mock
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import List, Dict, Union, Tuple, Optional, Iterable, Any, Sequence
import unittest.mock
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
from unittest.mock import Mock
import importlib.metadata

import flask
import numpy
import openeo.udf
import xarray
from shapely.geometry import Polygon, MultiPolygon
from openeo.api.logs import normalize_log_level
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import (
Band,
BandDimension,
CollectionMetadata,
SpatialDimension,
TemporalDimension,
)
from openeo.util import rfc3339
from shapely.geometry import MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry
from shapely.geometry.collection import GeometryCollection

from openeo.api.logs import normalize_log_level
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import CollectionMetadata, Band, SpatialDimension, TemporalDimension, BandDimension
import openeo.udf
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing
import openeo_driver.util.changelog
from openeo_driver.backend import (
SecondaryServices,
OpenEoBackendImplementation,
CollectionCatalog,
ServiceMetadata,
BatchJobs,
BatchJobMetadata,
BatchJobResultMetadata,
BatchJobs,
CollectionCatalog,
JobListing,
LoadParameters,
OidcProvider,
OpenEoBackendImplementation,
Processing,
SecondaryServices,
ServiceMetadata,
UserDefinedProcesses,
UserDefinedProcessMetadata,
LoadParameters,
Processing,
BatchJobResultMetadata,
)
from openeo_driver.config import OpenEoBackendConfig
from openeo_driver.constants import STAC_EXTENSION
from openeo_driver.constants import JOB_STATUS, STAC_EXTENSION
from openeo_driver.datacube import DriverDataCube, DriverMlModel, DriverVectorCube
from openeo_driver.datastructs import StacAsset
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.dry_run import SourceConstraint
from openeo_driver.errors import (
JobNotFoundException,
FeatureUnsupportedException,
JobNotFinishedException,
ProcessGraphNotFoundException,
JobNotFoundException,
PermissionsInsufficientException,
FeatureUnsupportedException,
ProcessGraphNotFoundException,
)
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing
from openeo_driver.save_result import (
AggregatePolygonResult,
AggregatePolygonSpatialResult,
)
from openeo_driver.jobregistry import JOB_STATUS
from openeo_driver.save_result import AggregatePolygonResult, AggregatePolygonSpatialResult
from openeo_driver.users import User
import openeo_driver.util.changelog
from openeo_driver.utils import EvalEnv, generate_unique_id, WhiteListEvalEnv
from openeo_driver.util.http import UrlSafeStructCodec
from openeo_driver.utils import EvalEnv, WhiteListEvalEnv, generate_unique_id

DEFAULT_DATETIME = datetime(2020, 4, 23, 16, 20, 27)

Expand Down Expand Up @@ -671,8 +682,39 @@ def _get_job_info(self, job_id: str, user_id: str) -> BatchJobMetadata:
except KeyError:
raise JobNotFoundException(job_id)

def get_user_jobs(self, user_id: str) -> List[BatchJobMetadata]:
return [v for (k, v) in self._job_registry.items() if k[0] == user_id]
def get_user_jobs(
self,
user_id: str,
limit: Optional[int] = None,
request_parameters: Optional[dict] = None,
) -> JobListing:
jobs: List[BatchJobMetadata] = [v for (k, v) in self._job_registry.items() if k[0] == user_id]
next_parameters = None
if limit:
# Pagination: order by `created` and job id (as tiebreaker)
# Note that we order from more recent to older.
# As a result, "search_after" (blatantly based on ElasticSearch)
# practically means "search older"

def order_key(m: BatchJobMetadata) -> Tuple[str, str]:
return rfc3339.datetime(m.created), m.id

url_safe_codec = UrlSafeStructCodec(signature_field="_usc")

threshold = (request_parameters or {}).get("search_after")
if threshold:
threshold = url_safe_codec.decode(threshold)
jobs = [m for m in jobs if order_key(m) < threshold]
jobs = sorted(jobs, key=order_key, reverse=True)
add_next = len(jobs) > limit
jobs = jobs[:limit]
if add_next:
next_parameters = {
"limit": limit,
"search_after": url_safe_codec.encode(order_key(jobs[-1])),
}

return JobListing(jobs=jobs, next_parameters=next_parameters)

@classmethod
def _update_status(cls, job_id: str, user_id: str, status: str):
Expand Down
Loading