Skip to content

Commit

Permalink
Merge pull request #746 from crim-ca/job-post-cli
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored Nov 13, 2024
2 parents d16ca30 + 28cb33c commit 0cebfa0
Show file tree
Hide file tree
Showing 4 changed files with 776 additions and 379 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Changes:
--------
- Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution
(fixes `#716 <https://github.com/crim-ca/weaver/issues/716>`_).
- Add `CLI` operations ``update_job``, ``trigger_job`` and ``inputs`` corresponding to the required `Job` operations
defined by *OGC API - Processes - Part 4: Job Management*.
- Add ``headers``, ``mode`` and ``response`` parameters along the ``inputs`` and ``outputs`` returned by
the ``GET /jobs/{jobID}/inputs`` endpoint to better describe the expected resolution strategy of the
multiple `Job` execution options according to submitted request parameters.
Expand Down
115 changes: 105 additions & 10 deletions tests/functional/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import smtplib
import tempfile
import uuid
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import mock
import pytest
Expand All @@ -38,6 +38,7 @@
from weaver.base import classproperty
from weaver.cli import AuthHandler, BearerAuthHandler, WeaverClient, main as weaver_cli
from weaver.datatype import DockerAuthentication, Service
from weaver.execute import ExecuteReturnPreference
from weaver.formats import ContentType, OutputFormat, get_cwl_file_format, repr_json
from weaver.notify import decrypt_email
from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, ProcessSchema
Expand All @@ -48,9 +49,11 @@
from weaver.wps.utils import get_wps_output_url, map_wps_output_location

if TYPE_CHECKING:
from typing import Dict, Optional
from typing import Any, Callable, Dict, Optional, Union

from weaver.typedefs import AnyRequestType, AnyResponseType, CWL
from weaver.cli import OperationResult
from weaver.status import AnyStatusType
from weaver.typedefs import AnyRequestType, AnyResponseType, CWL, JSON


class FakeAuthHandler(object):
Expand Down Expand Up @@ -134,6 +137,7 @@ def setup_test_file(self, original_file, substitutions):
return test_file_path

def process_listing_op(self, operation, **op_kwargs):
# type: (Callable[[Any, ...], OperationResult], **Any) -> OperationResult
result = mocked_sub_requests(self.app, operation, only_local=True, **op_kwargs)
assert result.success
assert "processes" in result.body
Expand Down Expand Up @@ -452,9 +456,17 @@ def test_describe(self):
"Dummy process that simply echo's back the input message for testing purposes."
), "CLI should not have overridden the process description field."

def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
preload=False, location=False, expect_success=True,
mock_exec=True, **exec_kwargs):
def run_execute_inputs_schema_variant(
self,
inputs_param, # type: Union[JSON, str]
process="Echo", # type: str
preload=False, # type: bool
location=False, # type: Optional[str]
expect_success=True, # type: bool
expect_status=None, # type: Optional[AnyStatusType]
mock_exec=True, # type: bool
**exec_kwargs, # type: Any
): # type: (...) -> OperationResult
if isinstance(inputs_param, str):
ref = {"location": inputs_param} if location else {"ref_name": inputs_param}
if preload:
Expand All @@ -469,16 +481,19 @@ def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
mock_exec_func = None
for mock_exec_proc in mocked_execute_celery(func_execute_task=mock_exec_func):
stack_exec.enter_context(mock_exec_proc)
result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process],
inputs=inputs_param, **exec_kwargs)
result = cast(
"OperationResult",
mocked_sub_requests(self.app, self.client.execute, self.test_process[process],
inputs=inputs_param, **exec_kwargs)
)
if expect_success:
assert result.success, result.message + (result.text if result.text else "")
assert "jobID" in result.body
assert "processID" in result.body
assert "status" in result.body
assert "location" in result.body
assert result.body["processID"] == self.test_process[process]
assert result.body["status"] == Status.ACCEPTED
assert result.body["status"] == expect_status or Status.ACCEPTED
assert result.body["location"] == result.headers["Location"]
assert "undefined" not in result.message
else:
Expand Down Expand Up @@ -722,6 +737,86 @@ def test_execute_inputs_representation_literal_schema_auto_resolve_vault(self):
]:
self.run_execute_inputs_with_vault_file(input_data, "CatFile", preload=False, embed=True)

def test_execute_trigger(self):
result = self.run_execute_inputs_schema_variant(
"Execute_Echo_cwl_schema.yml",
preload=True,
pending=True, # this is the parameter of interest for this test
expect_status=Status.CREATED,
)
assert result.success
assert result.message == (
"Job successfully submitted for creation. "
"Waiting on trigger request to being execution."
)
job_id = result.body["jobID"]

# technically, trigger only need to submit the job to the execution queue
# however, because we do not have an actual celery worker queue configured in tests, mock the execution inline
# the response will be as if we only "accepted" the submission, but the job will be completed for next steps
with contextlib.ExitStack() as stack_exec:
for mock_exec_proc in mocked_execute_celery():
stack_exec.enter_context(mock_exec_proc)
result = mocked_sub_requests(self.app, self.client.trigger_job, job_id)

assert result.success
assert result.code == 202
result = mocked_sub_requests(self.app, self.client.monitor, job_id, timeout=5, interval=1)
assert result.success

result = mocked_sub_requests(self.app, self.client.results, job_id)
assert result.success

output = result.body["output"]["href"]
output = map_wps_output_location(output, self.settings, exists=True)
assert os.path.isfile(output)
with open(output, mode="r", encoding="utf-8") as out_file:
out_data = out_file.read().strip()
assert out_data == "Test message"

def test_update_job(self):
result = self.run_execute_inputs_schema_variant(
"Execute_Echo_cwl_schema.yml",
preload=True,
pending=True, # pre-requirement for updating job is that it must not already be queued/running
expect_status=Status.CREATED,
)
assert result.success
assert result.message == (
"Job successfully submitted for creation. "
"Waiting on trigger request to being execution."
)
job_id = result.body["jobID"]

result = mocked_sub_requests(self.app, self.client.status, job_id)
assert result.success
assert "title" not in result.body

result = mocked_sub_requests(
self.app,
self.client.update_job,
job_id,
title="Random Title",
headers={"Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}"},
inputs={"message": "new message"},
output_filter={"output": {}},
output_context="test",
subscribers={"successUri": "https://example.com"},
)
assert result.success
assert result.code == 204
assert result.body is None

result = mocked_sub_requests(self.app, self.client.status, job_id)
assert result.success
assert result.body["title"] == "Random Title"

result = mocked_sub_requests(self.app, self.client.inputs, job_id)
assert result.success
assert result.body["inputs"] == {"message": "new message"}
assert result.body["outputs"] == {"output": {}}
assert result.body["headers"]["Prefer"] == f"return={ExecuteReturnPreference.REPRESENTATION}; respond-async"

@mocked_dismiss_process()
def test_dismiss(self):
for status in [Status.ACCEPTED, Status.FAILED, Status.RUNNING, Status.SUCCEEDED]:
Expand Down Expand Up @@ -2375,7 +2470,7 @@ def auth_view(request):

def proxy_view(request):
# type: (AnyRequestType) -> AnyResponseType
auth = request.headers.get("Authorization") # should be added by a auth-handler called inline of operation
auth = request.headers.get("Authorization") # should be added by an auth-handler called inline of operation
if not auth:
return HTTPUnauthorized()
token = auth.split(" ")[-1]
Expand Down
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def read(self, chuck_size=None): # noqa # E811, parameter not used, but must b


def mocked_sub_requests(app, # type: TestApp
method_function, # type: Union[AnyRequestMethod, Callable[[Any], MockReturnType]]
method_function, # type: Union[AnyRequestMethod, Callable[[Any, ...], MockReturnType]]
*args, # type: Any
only_local=False, # type: bool
**kwargs, # type: Any
Expand Down
Loading

0 comments on commit 0cebfa0

Please sign in to comment.