Skip to content

Commit

Permalink
Issue #959 add tests for DoubleJobRegistry with ElasticJobRegistry
Browse files Browse the repository at this point in the history
Instead of InMemoryJobRegistry stand-in
  • Loading branch information
soxofaan committed Dec 6, 2024
1 parent ba2bf1e commit 3b01118
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 4 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
tests_require=tests_require,
install_requires=[
"openeo>=0.33.0",
"openeo_driver>=0.120.1.dev",
"openeo_driver>=0.120.2.dev",
'pyspark==3.4.2; python_version>="3.8"',
'pyspark>=2.3.1,<2.4.0; python_version<"3.8"',
'geopyspark==0.4.7+openeo',
Expand Down
95 changes: 92 additions & 3 deletions tests/test_job_registry.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import datetime
import logging
from unittest import mock
import urllib.parse
from unittest import mock

import dirty_equals
import kazoo.exceptions
import pytest
from kazoo.handlers.threading import KazooTimeoutError
from openeo.rest.auth.testing import OidcMock
from openeo_driver.backend import BatchJobMetadata, JobListing
from openeo_driver.errors import JobNotFoundException
from openeo_driver.jobregistry import JOB_STATUS
from openeo_driver.jobregistry import JOB_STATUS, ElasticJobRegistry
from openeo_driver.testing import DictSubSet
import dirty_equals
from openeo_driver.util.auth import ClientCredentials

from openeogeotrellis.config import get_backend_config
from openeogeotrellis.job_registry import (
Expand Down Expand Up @@ -1022,3 +1024,90 @@ def test_get_active_jobs(self, double_jr, zk_client, memory_jr, with_zk, with_ej

active_job_ids = set(job["job_id"] for job in active_jobs)
assert active_job_ids == {"j-456"}


class TestDoubleJobRegistryWithEjr:
EJR_API_URL = "https://ejr.test"
OIDC_ISSUER = "https://oidc.test"
OIDC_CLIENT_ID = "the_client_id"
OIDC_CLIENT_SECRET = "the_client_secret"

@pytest.fixture
def oidc_mock(self, requests_mock) -> OidcMock:
oidc_mock = OidcMock(
requests_mock=requests_mock,
oidc_issuer=self.OIDC_ISSUER,
expected_grant_type="client_credentials",
expected_client_id=self.OIDC_CLIENT_ID,
expected_fields={"client_secret": self.OIDC_CLIENT_SECRET, "scope": "openid"},
)
return oidc_mock

@pytest.fixture
def ejr(self, oidc_mock) -> ElasticJobRegistry:
"""ElasticJobRegistry set up with authentication"""
ejr = ElasticJobRegistry(api_url=self.EJR_API_URL, backend_id="unittests")
credentials = ClientCredentials(
oidc_issuer=self.OIDC_ISSUER,
client_id=self.OIDC_CLIENT_ID,
client_secret=self.OIDC_CLIENT_SECRET,
)
ejr.setup_auth_oidc_client_credentials(credentials)
return ejr

@pytest.fixture
def double_jr(self, ejr) -> DoubleJobRegistry:
return DoubleJobRegistry(
zk_job_registry_factory=None,
elastic_job_registry=ejr,
)

def _build_url(self, params: dict):
return "https://oeo.test/jobs?" + urllib.parse.urlencode(query=params)

def test_get_user_jobs_legacy(self, double_jr, requests_mock):
def post_jobs_search(request, context):
"""Handler of `POST /jobs/search"""
return [
{"job_id": "j-123", "status": "created"},
{"job_id": "j-456", "status": "created"},
]

requests_mock.post(f"{self.EJR_API_URL}/jobs/search", json=post_jobs_search)

jobs = double_jr.get_user_jobs(user_id="john")
assert len(jobs) == 2
assert jobs[0].id == "j-123"
assert jobs[1].id == "j-456"

@pytest.mark.parametrize(
["limit", "request_parameters", "expected_url"],
[
(5, None, "https://oeo.test/jobs?limit=5&page=1"),
(5, {"page": 3}, "https://oeo.test/jobs?limit=5&page=4"),
(5, {"page": 8}, None),
],
)
def test_get_user_jobs_paginated(self, double_jr, requests_mock, limit, request_parameters, expected_url):
def post_jobs_search_paginated(request, context):
"""Handler of `POST /jobs/search"""
this_page = int(request.qs.get("page", ["0"])[0])
return {
"jobs": [
{"job_id": "j-123", "status": "created"},
{"job_id": "j-456", "status": "created"},
],
"pagination": {"next": {"size": limit, "page": this_page + 1}} if this_page < 8 else {},
}

requests_mock.post(f"{self.EJR_API_URL}/jobs/search/paginated", json=post_jobs_search_paginated)

jobs = double_jr.get_user_jobs(user_id="john", limit=limit, request_parameters=request_parameters)
assert isinstance(jobs, JobListing)
assert jobs.to_response_dict(build_url=self._build_url) == {
"jobs": [
{"id": "j-123", "status": "created", "progress": 0},
{"id": "j-456", "status": "created", "progress": 0},
],
"links": [{"rel": "next", "href": expected_url}] if expected_url else [],
}

0 comments on commit 3b01118

Please sign in to comment.