diff --git a/tests/functional/test_cli.py b/tests/functional/test_cli.py index 2a3d0801d..9ca78e655 100644 --- a/tests/functional/test_cli.py +++ b/tests/functional/test_cli.py @@ -42,7 +42,7 @@ from weaver.status import Status, StatusCategory from weaver.utils import fully_qualified_name from weaver.visibility import Visibility -from weaver.wps.utils import map_wps_output_location +from weaver.wps.utils import get_wps_output_url, map_wps_output_location if TYPE_CHECKING: from typing import Dict, Optional @@ -1559,6 +1559,66 @@ def test_execute_result_by_reference(self): data = file.read() assert msg in data # technically, output is log of echoed input message, so not exactly equal + def test_execute_output_context(self): + proc = self.test_process["Echo"] + with contextlib.ExitStack() as stack_exec: + for mock_exec_proc in mocked_execute_celery(): + stack_exec.enter_context(mock_exec_proc) + + lines = mocked_sub_requests( + self.app, run_command, + [ + # "weaver", + "execute", + "-u", self.url, + "-p", proc, + "-I", "message='TEST MESSAGE!'", + "-M", + "-T", 10, + "-W", 1, + "-oP", # request public WPS output context + ], + trim=False, + entrypoint=weaver_cli, + only_local=True, + ) + assert any(f"\"status\": \"{Status.SUCCEEDED}\"" in line for line in lines) + job_id = None + for line in lines: + if line.startswith("jobID: "): + job_id = line.split(":")[-1].strip() + break + assert job_id + + lines = mocked_sub_requests( + self.app, run_command, + [ + # "weaver", + "results", + "-u", self.url, + "-j", job_id, + "-wH", # must display header to get 'Link' + "-F", OutputFormat.YAML, + ], + trim=False, + entrypoint=weaver_cli, + only_local=True, + ) + sep = lines.index("---") + headers = lines[:sep] + content = lines[sep + 1:] + assert content + link = None + for header in headers: + if "Link:" in header: + link = header.split(":", 1)[-1].strip() + break + assert link + link = link.split(";")[0].strip("<>") + wps_url = get_wps_output_url(self.settings) + wps_path = link.split(wps_url)[-1] + assert wps_path == f"/public/{job_id}/output/output.txt" + def test_execute_help_details(self): """ Verify that formatting of the execute operation help provides multiple paragraphs with more details. diff --git a/weaver/cli.py b/weaver/cli.py index 62520c602..9f9e48537 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -1059,6 +1059,7 @@ def execute(self, request_retries=None, # type: Optional[int] output_format=None, # type: Optional[AnyOutputFormat] output_refs=None, # type: Optional[Iterable[str]] + output_context=None, # type: Optional[str] ): # type: (...) -> OperationResult """ Execute a :term:`Job` for the specified :term:`Process` with provided inputs. @@ -1072,7 +1073,8 @@ def execute(self, include details as the ``class: File`` and ``path`` with location. .. seealso:: - :ref:`proc_op_execute` + - :ref:`proc_op_execute` + - :ref:`exec_output_location` .. note:: Execution requests are always accomplished asynchronously. To obtain the final :term:`Job` status as if @@ -1110,6 +1112,10 @@ def execute(self, containing the data. outputs that refer to a file reference will simply contain that URL reference as link. With value transmission mode (default behavior when outputs are not specified in this list), outputs are returned as direct values (literal or href) within the response content body. + :param output_context: + Specify an output context for which the `Weaver` instance should attempt storing the :term:`Job` results + under the nested location of its configured :term:`WPS` outputs. Note that the instance is not required + to fulfill that preference, and can ignore this value if it deems that the provided context is inadequate. :returns: Results of the operation. """ base = self._get_url(url) # raise before inputs parsing if not available @@ -1156,6 +1162,8 @@ def execute(self, exec_headers = {"Prefer": "respond-async"} # for more recent servers, OGC-API compliant async request exec_headers.update(self._headers) exec_headers.update(auth_headers) + if output_context: + exec_headers["X-WPS-Output-Context"] = str(output_context) resp = self._request("POST", exec_url, json=data, headers=exec_headers, x_headers=headers, settings=self._settings, auth=auth, request_timeout=request_timeout, request_retries=request_retries) @@ -2521,6 +2529,23 @@ def make_parser(): Example: ``-R output-one -R output-two`` """) ) + op_execute_output_context = op_execute.add_mutually_exclusive_group() + op_execute_output_context.add_argument( + "-oP", "--output-public", dest="output_context", const="public", + help=( + "Set header 'X-WPS-Output-Context: public' to indicate preference of job output context to be " + "located under the public WPS output location of the server. The server is not mandated to fulfill this " + "preference, but will apply it if supported and considered a valid value." + ) + ) + op_execute_output_context.add_argument( + "-oC", "--output-context", dest="output_context", type=str, + help=( + "Set header 'X-WPS-Output-Context' with the specified value to indicate preference of job output context " + "located under the requested WPS output location of the server. The server is not mandated to fulfill this " + "preference, but will apply it if supported and considered a valid value." + ) + ) op_execute.add_argument( "-M", "--monitor", dest="monitor", action="store_true", help="Automatically perform the monitoring operation following job submission to retrieve final results. "