Skip to content

Commit

Permalink
Merge branch 'issue163-ejr-job-deletion'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 4, 2023
2 parents 25ee566 + f7c53bb commit d3542e4
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 80 deletions.
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.69.0a1"
__version__ = "0.69.1a1"
138 changes: 83 additions & 55 deletions openeo_driver/jobregistry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import contextlib
import datetime as dt
import json
import logging
Expand All @@ -8,7 +9,7 @@
import time
import typing
from decimal import Decimal
from typing import Any, Dict, List, NamedTuple, Optional, Union
from typing import Any, Dict, List, NamedTuple, Optional, Union, Sequence

import requests
from openeo.rest.auth.oidc import OidcClientCredentialsAuthenticator, OidcClientInfo, OidcProviderInfo
Expand Down Expand Up @@ -241,6 +242,19 @@ def set_user_agent(self):
user_agent += f"/{self._backend_id}"
self._session.headers["User-Agent"] = user_agent

@contextlib.contextmanager
def _with_extra_logging(self, **kwargs):
"""Context manager to temporarily add extra logging fields in a context"""
orig = self.logger
extra = kwargs
if isinstance(orig, logging.LoggerAdapter):
extra = {**orig.extra, **extra}
self.logger = logging.LoggerAdapter(logger=orig, extra=extra)
try:
yield
finally:
self.logger = orig

@property
def backend_id(self) -> str:
assert self._backend_id
Expand Down Expand Up @@ -282,13 +296,9 @@ def _do_request(
json: Union[dict, list, None] = None,
use_auth: bool = True,
expected_status: int = 200,
logging_extra: Optional[dict] = None,
) -> Union[dict, list, None]:
"""Do an HTTP request to Elastic Job Tracker service."""
with TimingLogger(
logger=(lambda m: self.logger.debug(m, extra=logging_extra)),
title=f"EJR Request `{method} {path}`",
):
with TimingLogger(logger=self.logger.debug, title=f"EJR Request `{method} {path}`"):
headers = {}
if use_auth:
access_token = self._cache.get_or_call(
Expand All @@ -300,10 +310,7 @@ def _do_request(
headers["Authorization"] = f"Bearer {access_token}"

url = url_join(self._api_url, path)
self.logger.debug(
f"Doing EJR request `{method} {url}` {headers.keys()=}",
extra=logging_extra,
)
self.logger.debug(f"Doing EJR request `{method} {url}` {headers.keys()=}")
if self._debug_show_curl:
curl_command = self._as_curl(method=method, url=url, data=json, headers=headers)
self.logger.debug(f"Equivalent curl command: {curl_command}")
Expand All @@ -315,10 +322,7 @@ def _do_request(
headers=headers,
timeout=self._REQUEST_TIMEOUT,
)
self.logger.debug(
f"EJR response on `{method} {path}`: {response.status_code!r}",
extra=logging_extra,
)
self.logger.debug(f"EJR response on `{method} {path}`: {response.status_code!r}")
if expected_status and response.status_code != expected_status:
raise EjrHttpError.from_response(response=response)
else:
Expand Down Expand Up @@ -382,48 +386,74 @@ def create_job(
"api_version": api_version,
# TODO: additional technical metadata, see https://github.com/Open-EO/openeo-api/issues/472
}
logging_extra = {"job_id": job_id}
self.logger.info(f"EJR creating {job_id=} {created=}", extra=logging_extra)
return self._do_request(
"POST",
"/jobs",
json=job_data,
expected_status=201,
logging_extra=logging_extra,
)
with self._with_extra_logging(job_id=job_id):
self.logger.info(f"EJR creating {job_id=} {created=}")
result = self._do_request("POST", "/jobs", json=job_data, expected_status=201)
return result

def get_job(self, job_id: str, fields: Optional[List[str]] = None) -> JobDict:
query = {
"bool": {
"filter": [
{"term": {"backend_id": self.backend_id}},
{"term": {"job_id": job_id}},
]
with self._with_extra_logging(job_id=job_id):
self.logger.info(f"EJR get job data {job_id=}")
query = {
"bool": {
"filter": [
{"term": {"backend_id": self.backend_id}},
{"term": {"job_id": job_id}},
]
}
}
}

# Return full document, by default
jobs = self._search(query=query, fields=fields or ["*"])
if len(jobs) == 1:
job = jobs[0]
assert job["job_id"] == job_id, f"{job['job_id']=} != {job_id=}"
return job
elif len(jobs) == 0:
raise JobNotFoundException(job_id=job_id)
else:
summary = [{k: j.get(k) for k in ["user_id", "created"]} for j in jobs]
self.logger.error(f"Found multiple ({len(jobs)}) jobs for {job_id=}: {repr_truncate(summary, width=200)}")
raise InternalException(message=f"Found {len(jobs)} jobs for {job_id=}")
# Return full document, by default
jobs = self._search(query=query, fields=fields or ["*"])
if len(jobs) == 1:
job = jobs[0]
assert job["job_id"] == job_id, f"{job['job_id']=} != {job_id=}"
return job
elif len(jobs) == 0:
raise JobNotFoundException(job_id=job_id)
else:
summary = [{k: j.get(k) for k in ["user_id", "created"]} for j in jobs]
self.logger.error(
f"Found multiple ({len(jobs)}) jobs for {job_id=}: {repr_truncate(summary, width=200)}"
)
raise InternalException(message=f"Found {len(jobs)} jobs for {job_id=}")

def delete_job(self, job_id: str) -> None:
try:
self._do_request(method="DELETE", path=f"/jobs/{job_id}")
logging_extra = {"job_id": job_id}
self.logger.info(f"EJR deleted {job_id=}", extra=logging_extra)
except EjrHttpError as e:
if e.status_code == 404:
raise JobNotFoundException(job_id=job_id) from e
raise e
with self._with_extra_logging(job_id=job_id):
try:
self._do_request(method="DELETE", path=f"/jobs/{job_id}")
self.logger.info(f"EJR deleted {job_id=}")
except EjrHttpError as e:
if e.status_code == 404:
raise JobNotFoundException(job_id=job_id) from e
raise e
self._verify_job_existence(job_id=job_id, exists=False)

def _verify_job_existence(self, job_id: str, exists: bool = True, backoffs: Sequence[float] = (0, 0.1, 1.0)):
"""
Verify that EJR committed the job creation/deletion
:param job_id: job id
:param exists: whether the job should exist (after creation) or not exist (after deletion)
:return:
"""
if not backoffs:
return
for backoff in backoffs:
self.logger.debug(f"_verify_job_existence {job_id=} {exists=} {backoff=}")
time.sleep(backoff)
try:
self.get_job(job_id=job_id, fields=["job_id"])
if exists:
return
except JobNotFoundException:
if not exists:
return
except Exception as e:
# TODO: fail hard instead of just logging?
self.logger.exception(f"Unexpected error while verifying {job_id=} {exists=}: {e=}")
return
# TODO: fail hard instead of just logging?
self.logger.error(f"Failed to verify {job_id=} {exists=}")

def set_status(
self,
Expand All @@ -446,11 +476,9 @@ def set_status(

def _update(self, job_id: str, data: dict) -> JobDict:
"""Generic update method"""
logging_extra = {"job_id": job_id}
self.logger.info(f"EJR update {job_id=} {data=}", extra=logging_extra)
return self._do_request(
"PATCH", f"/jobs/{job_id}", json=data, logging_extra=logging_extra
)
with self._with_extra_logging(job_id=job_id):
self.logger.info(f"EJR update {job_id=} {data=}")
return self._do_request("PATCH", f"/jobs/{job_id}", json=data)

def set_dependencies(
self, job_id: str, dependencies: List[Dict[str, str]]
Expand Down
Loading

0 comments on commit d3542e4

Please sign in to comment.