From 0076152f12df2fe5ec80268955e92d05f170181b Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 10 Dec 2024 15:27:03 +0100 Subject: [PATCH] Issue #683/#681 align additional/job_options argument in create_job, download, ... --- openeo/rest/connection.py | 57 +++++++++++++++++++++++++++++++++----- openeo/rest/datacube.py | 35 +++++++++++++++++++---- openeo/rest/mlmodel.py | 29 ++++++++++++++++--- openeo/rest/multiresult.py | 13 +++++++-- openeo/rest/vectorcube.py | 17 ++++++++++-- 5 files changed, 130 insertions(+), 21 deletions(-) diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 15e25d539..8254bd0e6 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -1644,6 +1644,8 @@ def upload_file( def _build_request_with_process_graph( self, process_graph: Union[dict, FlatGraphableMixin, str, Path, List[FlatGraphableMixin]], + additional: Optional[dict] = None, + job_options: Optional[dict] = None, **kwargs, ) -> dict: """ @@ -1655,6 +1657,13 @@ def _build_request_with_process_graph( if any(c != self for c in connections): raise OpenEoClientException(f"Mixing different connections: {self} and {connections}.") result = kwargs + + if additional: + result.update(additional) + if job_options is not None: + assert "job_options" not in result + result["job_options"] = job_options + process_graph = as_flat_graph(process_graph) if "process_graph" not in process_graph: process_graph = {"process_graph": process_graph} @@ -1702,6 +1711,8 @@ def download( timeout: Optional[int] = None, validate: Optional[bool] = None, chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE, + additional: Optional[dict] = None, + job_options: Optional[dict] = None, ) -> Union[None, bytes]: """ Downloads the result of a process graph synchronously, @@ -1715,8 +1726,16 @@ def download( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param chunk_size: chunk size for streaming response. + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") + + .. versionadded:: 0.36.0 + Added arguments ``additional`` and ``job_options``. """ - pg_with_metadata = self._build_request_with_process_graph(process_graph=graph) + pg_with_metadata = self._build_request_with_process_graph( + process_graph=graph, additional=additional, job_options=job_options + ) self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate) response = self.post( path="/result", @@ -1740,6 +1759,8 @@ def execute( timeout: Optional[int] = None, validate: Optional[bool] = None, auto_decode: bool = True, + additional: Optional[dict] = None, + job_options: Optional[dict] = None, ) -> Union[dict, requests.Response]: """ Execute a process graph synchronously and return the result. If the result is a JSON object, it will be parsed. @@ -1749,10 +1770,18 @@ def execute( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_decode: Boolean flag to enable/disable automatic JSON decoding of the response. Defaults to True. + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :return: parsed JSON response as a dict if auto_decode is True, otherwise response object + + .. versionadded:: 0.36.0 + Added arguments ``additional`` and ``job_options``. """ - pg_with_metadata = self._build_request_with_process_graph(process_graph=process_graph) + pg_with_metadata = self._build_request_with_process_graph( + process_graph=process_graph, additional=additional, job_options=job_options + ) self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate) response = self.post( path="/result", @@ -1779,6 +1808,7 @@ def create_job( plan: Optional[str] = None, budget: Optional[float] = None, additional: Optional[dict] = None, + job_options: Optional[dict] = None, validate: Optional[bool] = None, ) -> BatchJob: """ @@ -1795,23 +1825,27 @@ def create_job( :param plan: The billing plan to process and charge the job with :param budget: Maximum budget to be spent on executing the job. Note that some backends do not honor this limit. - :param additional: additional job options to pass to the backend + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :return: Created job .. versionchanged:: 0.35.0 Add :ref:`multi-result support `. + + .. versionadded:: 0.36.0 + Added argument ``job_options``. """ # TODO move all this (BatchJob factory) logic to BatchJob? pg_with_metadata = self._build_request_with_process_graph( process_graph=process_graph, + additional=additional, + job_options=job_options, **dict_no_none(title=title, description=description, plan=plan, budget=budget) ) - if additional: - # TODO: get rid of this non-standard field? https://github.com/Open-EO/openeo-api/issues/276 - pg_with_metadata["job_options"] = additional self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate) response = self.post("/jobs", json=pg_with_metadata, expected_status=201) @@ -1871,9 +1905,12 @@ def load_disk_collection( def as_curl( self, data: Union[dict, DataCube, FlatGraphableMixin], + *, path="/result", method="POST", obfuscate_auth: bool = False, + additional: Optional[dict] = None, + job_options: Optional[dict] = None, ) -> str: """ Build curl command to evaluate given process graph or data cube @@ -1891,14 +1928,20 @@ def as_curl( or ``"/jobs"`` for batch jobs :param method: HTTP method to use (typically ``"POST"``) :param obfuscate_auth: don't show actual bearer token + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :return: curl command as a string + + .. versionadded:: 0.36.0 + Added arguments ``additional`` and ``job_options``. """ cmd = ["curl", "-i", "-X", method] cmd += ["-H", "Content-Type: application/json"] if isinstance(self.auth, BearerAuth): cmd += ["-H", f"Authorization: Bearer {'...' if obfuscate_auth else self.auth.bearer}"] - pg_with_metadata = self._build_request_with_process_graph(data) + pg_with_metadata = self._build_request_with_process_graph(data, additional=additional, job_options=job_options) if path == "/validation": pg_with_metadata = pg_with_metadata["process"] post_json = json.dumps(pg_with_metadata, separators=(",", ":")) diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index d5874b727..db087f854 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -2329,6 +2329,8 @@ def download( *, validate: Optional[bool] = None, auto_add_save_result: bool = True, + additional: Optional[dict] = None, + job_options: Optional[dict] = None, ) -> Union[None, bytes]: """ Execute synchronously and download the raster data cube, e.g. as GeoTIFF. @@ -2342,11 +2344,17 @@ def download( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :return: None if the result is stored to disk, or a bytes object returned by the backend. .. versionchanged:: 0.32.0 Added ``auto_add_save_result`` option + + .. versionadded:: 0.36.0 + Added arguments ``additional`` and ``job_options``. """ # TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ... cube = self @@ -2359,7 +2367,9 @@ def download( default_format=self._DEFAULT_RASTER_FORMAT, method="DataCube.download()", ) - return self._connection.download(cube.flat_graph(), outputfile, validate=validate) + return self._connection.download( + cube.flat_graph(), outputfile, validate=validate, additional=additional, job_options=job_options + ) def validate(self) -> List[dict]: """ @@ -2463,6 +2473,7 @@ def execute_batch( print: typing.Callable[[str], None] = print, max_poll_interval: float = 60, connection_retry_interval: float = 30, + additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, @@ -2477,13 +2488,18 @@ def execute_batch( :param outputfile: The path of a file to which a result can be written :param out_format: (optional) File format to use for the job result. - :param job_options: + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. .. versionchanged:: 0.32.0 Added ``auto_add_save_result`` option + + .. versionadded:: 0.36.0 + Added argument ``additional``. """ # TODO: start showing deprecation warnings about these inconsistent argument names if "format" in format_options and not out_format: @@ -2506,6 +2522,7 @@ def execute_batch( description=description, plan=plan, budget=budget, + additional=additional, job_options=job_options, validate=validate, auto_add_save_result=False, @@ -2523,6 +2540,7 @@ def create_job( description: Optional[str] = None, plan: Optional[str] = None, budget: Optional[float] = None, + additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, @@ -2543,15 +2561,20 @@ def create_job( :param plan: The billing plan to process and charge the job with :param budget: Maximum budget to be spent on executing the job. Note that some backends do not honor this limit. - :param job_options: custom job options. + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. :return: Created job. - .. versionchanged:: 0.32.0 + .. versionadded:: 0.32.0 Added ``auto_add_save_result`` option + + .. versionadded:: 0.36.0 + Added ``additional`` argument. """ # TODO: add option to also automatically start the job? # TODO: avoid using all kwargs as format_options @@ -2572,7 +2595,8 @@ def create_job( plan=plan, budget=budget, validate=validate, - additional=job_options, + additional=additional, + job_options=job_options, ) send_job = legacy_alias(create_job, name="send_job", since="0.10.0") @@ -2617,6 +2641,7 @@ def execute(self, *, validate: Optional[bool] = None, auto_decode: bool = True) :return: parsed JSON response as a dict if auto_decode is True, otherwise response object """ + # TODO: deprecated this. It's ill-defined how to "execute" a data cube without downloading it. return self._connection.execute(self.flat_graph(), validate=validate, auto_decode=auto_decode) @staticmethod diff --git a/openeo/rest/mlmodel.py b/openeo/rest/mlmodel.py index b6214569c..1220a7701 100644 --- a/openeo/rest/mlmodel.py +++ b/openeo/rest/mlmodel.py @@ -69,7 +69,8 @@ def execute_batch( print=print, max_poll_interval=60, connection_retry_interval=30, - job_options=None, + additional: Optional[dict] = None, + job_options: Optional[dict] = None, ) -> BatchJob: """ Evaluate the process graph by creating a batch job, and retrieving the results when it is finished. @@ -81,8 +82,21 @@ def execute_batch( :param outputfile: The path of a file to which a result can be written :param out_format: (optional) Format of the job result. :param format_options: String Parameters for the job result format + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") + + .. versionadded:: 0.36.0 + Added argument ``additional``. """ - job = self.create_job(title=title, description=description, plan=plan, budget=budget, job_options=job_options) + job = self.create_job( + title=title, + description=description, + plan=plan, + budget=budget, + additional=additional, + job_options=job_options, + ) return job.run_synchronous( # TODO #135 support multi file result sets too outputfile=outputfile, @@ -96,6 +110,7 @@ def create_job( description: Optional[str] = None, plan: Optional[str] = None, budget: Optional[float] = None, + additional: Optional[dict] = None, job_options: Optional[dict] = None, ) -> BatchJob: """ @@ -106,9 +121,14 @@ def create_job( :param plan: The billing plan to process and charge the job with :param budget: Maximum budget to be spent on executing the job. Note that some backends do not honor this limit. - :param job_options: A dictionary containing (custom) job options + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :param format_options: String Parameters for the job result format :return: Created job. + + .. versionadded:: 0.36.0 + Added argument ``additional``. """ # TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ... pg = self @@ -121,5 +141,6 @@ def create_job( description=description, plan=plan, budget=budget, - additional=job_options, + additional=additional, + job_options=job_options, ) diff --git a/openeo/rest/multiresult.py b/openeo/rest/multiresult.py index 20733b68d..82f10fb5c 100644 --- a/openeo/rest/multiresult.py +++ b/openeo/rest/multiresult.py @@ -79,6 +79,7 @@ def create_job( *, title: Optional[str] = None, description: Optional[str] = None, + additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, ) -> BatchJob: @@ -86,7 +87,8 @@ def create_job( process_graph=self._multi_leaf_graph, title=title, description=description, - additional=job_options, + additional=additional, + job_options=job_options, validate=validate, ) @@ -95,8 +97,15 @@ def execute_batch( *, title: Optional[str] = None, description: Optional[str] = None, + additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, ) -> BatchJob: - job = self.create_job(title=title, description=description, job_options=job_options, validate=validate) + job = self.create_job( + title=title, + description=description, + additional=additional, + job_options=job_options, + validate=validate, + ) return job.run_synchronous() diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index 09ed77537..51c2bf69e 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -255,6 +255,7 @@ def execute_batch( print=print, max_poll_interval: float = 60, connection_retry_interval: float = 30, + additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, @@ -267,7 +268,9 @@ def execute_batch( For very long running jobs, you probably do not want to keep the client running. - :param job_options: + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :param outputfile: The path of a file to which a result can be written :param out_format: (optional) output format to use. :param format_options: (optional) additional output format options @@ -280,6 +283,9 @@ def execute_batch( .. versionchanged:: 0.32.0 Added ``auto_add_save_result`` option + + .. versionadded:: 0.36.0 + Added argument ``additional``. """ cube = self if auto_add_save_result: @@ -296,6 +302,7 @@ def execute_batch( description=description, plan=plan, budget=budget, + additional=additional, job_options=job_options, validate=validate, auto_add_save_result=False, @@ -314,6 +321,7 @@ def create_job( description: Optional[str] = None, plan: Optional[str] = None, budget: Optional[float] = None, + additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, @@ -328,7 +336,9 @@ def create_job( :param plan: The billing plan to process and charge the job with :param budget: Maximum budget to be spent on executing the job. Note that some backends do not honor this limit. - :param job_options: A dictionary containing (custom) job options + :param additional: additional (top-level) properties to set in the request body + :param job_options: dictionary of job options to pass to the backend + (under top-level property "job_options") :param format_options: String Parameters for the job result format :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). @@ -356,7 +366,8 @@ def create_job( description=description, plan=plan, budget=budget, - additional=job_options, + additional=additional, + job_options=job_options, validate=validate, )