diff --git a/planemo/cwl/run.py b/planemo/cwl/run.py index 7a2b8f472..bf22db886 100644 --- a/planemo/cwl/run.py +++ b/planemo/cwl/run.py @@ -62,7 +62,7 @@ def invocation_details(self) -> None: return None @property - def outputs_dict(self) -> Optional[Dict[str, Any]]: + def outputs_dict(self): return self._outputs diff --git a/planemo/engine/galaxy.py b/planemo/engine/galaxy.py index 90f76530d..1c2c6e97c 100644 --- a/planemo/engine/galaxy.py +++ b/planemo/engine/galaxy.py @@ -53,7 +53,7 @@ def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] """Run job in Galaxy.""" results = [] if not output_collectors: - output_collectors = [None] * len(runnables) + output_collectors = [lambda x: None] * len(runnables) for runnable, job_path, collect_output in zip(runnables, job_paths, output_collectors): self._ctx.vlog(f"Serving artifact [{runnable}] with Galaxy.") with self.ensure_runnables_served([runnable]) as config: @@ -62,7 +62,7 @@ def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] self._ctx.log(f"Running Galaxy with API configuration [{config.user_api_config}]") run_response = execute(self._ctx, config, runnable, job_path, **self._kwds) results.append(run_response) - if collect_output: + if collect_output is not None: collect_output(run_response) return results diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index c3a5fa5a6..6dead53da 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -360,7 +360,7 @@ def __init__( self._job_info = None - self._outputs_dict: Optional[Dict[str, Optional[str]]] = None + self._outputs_dict: Dict[str, Optional[str]] = {} self._start_datetime = start_datetime self._end_datetime = end_datetime self._successful = successful @@ -386,7 +386,11 @@ def end_datetime(self): """End datetime of run.""" return self._end_datetime - def output_src(self, output: "RunnableOutput", ignore_missing_outputs: bool = False) -> Dict[str, str]: + @property + def outputs_dict(self): + return self._outputs_dict + + def output_src(self, output: "RunnableOutput", ignore_missing_outputs: Optional[bool] = False) -> Dict[str, str]: return {} def _get_extra_files(self, dataset_details): @@ -410,7 +414,12 @@ def _get_metadata(self, history_content_type, content_id): else: raise Exception("Unknown history content type encountered [%s]" % history_content_type) - def collect_outputs(self, output_directory: Optional[str] = None, ignore_missing_output: Optional[bool] = False): + def collect_outputs( + self, + output_directory: Optional[str] = None, + ignore_missing_output: Optional[bool] = False, + output_id: Optional[str] = None, + ): outputs_dict: Dict[str, Optional[str]] = {} # TODO: rather than creating a directory just use # Galaxy paths if they are available in this @@ -420,13 +429,16 @@ def collect_outputs(self, output_directory: Optional[str] = None, ignore_missing self._ctx.log("collecting outputs to directory %s" % output_directory) for runnable_output in get_outputs(self._runnable, gi=self._user_gi): - output_id = runnable_output.get_id() - if not output_id: + runnable_output_id = runnable_output.get_id() + if not runnable_output_id: self._ctx.log("Workflow output identified without an ID (label), skipping") continue + if output_id and runnable_output_id != output_id: + continue + def get_dataset(dataset_details, filename=None): - parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or output_id) + parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or runnable_output_id) file_ext = dataset_details["file_ext"] if file_ext == "directory": # TODO: rename output_directory to outputs_directory because we can have output directories @@ -451,8 +463,8 @@ def get_dataset(dataset_details, filename=None): output_src = self.output_src(runnable_output, ignore_missing_output) if not output_src: # Optional workflow output or invocation failed - self._ctx.vlog(f"workflow output '{output_id}' not created, skipping") - outputs_dict[output_id] = None + self._ctx.vlog(f"workflow output '{runnable_output_id}' not created, skipping") + outputs_dict[runnable_output_id] = None continue output_dataset_id = output_src["id"] galaxy_output = self.to_galaxy_output(runnable_output) @@ -481,7 +493,9 @@ def attach_file_properties(collection, cwl_output): attach_file_properties(output_metadata, cwl_output) output_dict_value = output_metadata - outputs_dict[output_id] = output_dict_value + if output_id: + return output_dict_value + outputs_dict[runnable_output_id] = output_dict_value self._outputs_dict = outputs_dict self._ctx.vlog("collected outputs [%s]" % self._outputs_dict) @@ -505,11 +519,10 @@ def job_info(self): def invocation_details(self): return None - @property - def outputs_dict(self): - if self._outputs_dict is None: - self.collect_outputs(ignore_missing_output=True) - return self._outputs_dict + def get_output(self, output_id): + if output_id not in self._outputs_dict: + self._outputs_dict[output_id] = self.collect_outputs(ignore_missing_output=True, output_id=output_id) + return self._outputs_dict[output_id] def download_output_to(self, ctx, dataset_details, output_directory, filename=None): if filename is None: @@ -588,7 +601,7 @@ def to_galaxy_output(self, runnable_output): output_id = runnable_output.get_id() return tool_response_to_output(self.api_run_response, self._history_id, output_id) - def output_src(self, output, ignore_missing_outputs=False): + def output_src(self, output, ignore_missing_outputs: Optional[bool] = False): outputs = self.api_run_response["outputs"] output_collections = self.api_run_response["output_collections"] output_id = output.get_id() @@ -642,7 +655,7 @@ def to_galaxy_output(self, runnable_output): self._ctx.vlog("checking for output in invocation [%s]" % self._invocation) return invocation_to_output(self._invocation, self._history_id, output_id) - def output_src(self, output, ignore_missing_outputs=False): + def output_src(self, output, ignore_missing_outputs: Optional[bool] = False): invocation = self._invocation # Use newer workflow outputs API. diff --git a/planemo/runnable.py b/planemo/runnable.py index 19e6027e4..9e69d7bec 100644 --- a/planemo/runnable.py +++ b/planemo/runnable.py @@ -587,19 +587,26 @@ def invocation_details(self): def log(self): """If engine related log is available, return as text data.""" + @abc.abstractproperty + def outputs_dict(self): + """Return a dict of output descriptions.""" + + def get_output(self, output_id): + """Fetch output from engine.""" + return self.outputs_dict.get(output_id) + def structured_data(self, test_case: Optional[TestCase] = None) -> Dict[str, Any]: output_problems = [] if isinstance(self, SuccessfulRunResponse) and self.was_successful: - outputs_dict = self.outputs_dict execution_problem = None if test_case: for output_id, output_test in test_case.output_expectations.items(): - if output_id not in outputs_dict: + output_value = self.get_output(output_id) + if not output_value: message = f"Expected output [{output_id}] not found in results." output_problems.append(message) continue - output_value = outputs_dict[output_id] output_problems.extend(test_case._check_output(output_id, output_value, output_test)) if output_problems: status = "failure" @@ -656,10 +663,6 @@ def was_successful(self): """Return `True` to indicate this run was successful.""" return True - @abc.abstractproperty - def outputs_dict(self): - """Return a dict of output descriptions.""" - class ErrorRunResponse(RunResponse): """Description of an error while attempting to execute a Runnable.""" @@ -709,6 +712,10 @@ def log(self): """Return potentially null stored `log` text.""" return self._log + @property + def outputs_dict(self): + return {} + def __str__(self): """Print a helpful error description of run.""" message = f"Run failed with message [{self.error_message}]"