Skip to content

Commit

Permalink
Merge pull request #1400 from mvdbeek/robust_error_reports
Browse files Browse the repository at this point in the history
Make test reports more resilient to failing invocations
  • Loading branch information
mvdbeek authored Oct 29, 2023
2 parents 81ee372 + 11a7084 commit 31b2f1b
Show file tree
Hide file tree
Showing 20 changed files with 376 additions and 68 deletions.
2 changes: 1 addition & 1 deletion planemo/cwl/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def invocation_details(self) -> None:
return None

@property
def outputs_dict(self) -> Optional[Dict[str, Any]]:
def outputs_dict(self):
return self._outputs


Expand Down
8 changes: 7 additions & 1 deletion planemo/engine/cwltool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""Module contianing the :class:`CwlToolEngine` implementation of :class:`Engine`."""

from typing import (
Callable,
List,
Optional,
)

from planemo import cwl
from planemo.runnable import RunnableType
from .interface import BaseEngine
Expand All @@ -13,7 +19,7 @@ class CwlToolEngine(BaseEngine):

handled_runnable_types = [RunnableType.cwl_tool, RunnableType.cwl_workflow]

def _run(self, runnables, job_paths):
def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run CWL job using cwltool."""
results = []
for runnable, job_path in zip(runnables, job_paths):
Expand Down
15 changes: 12 additions & 3 deletions planemo/engine/galaxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import abc
import contextlib
from typing import TYPE_CHECKING
from typing import (
Callable,
List,
Optional,
TYPE_CHECKING,
)

from galaxy.tool_util.verify import interactor

Expand Down Expand Up @@ -44,17 +49,21 @@ class GalaxyEngine(BaseEngine, metaclass=abc.ABCMeta):
RunnableType.directory,
]

def _run(self, runnables, job_paths):
def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run job in Galaxy."""
results = []
for runnable, job_path in zip(runnables, job_paths):
if not output_collectors:
output_collectors = [lambda x: None] * len(runnables)
for runnable, job_path, collect_output in zip(runnables, job_paths, output_collectors):
self._ctx.vlog(f"Serving artifact [{runnable}] with Galaxy.")
with self.ensure_runnables_served([runnable]) as config:
self._ctx.vlog(f"Running job path [{job_path}]")
if self._ctx.verbose:
self._ctx.log(f"Running Galaxy with API configuration [{config.user_api_config}]")
run_response = execute(self._ctx, config, runnable, job_path, **self._kwds)
results.append(run_response)
if collect_output is not None:
collect_output(run_response)

return results

Expand Down
16 changes: 11 additions & 5 deletions planemo/engine/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import json
import os
import tempfile
from typing import List
from typing import (
Callable,
List,
Optional,
)

from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE
from planemo.io import error
Expand Down Expand Up @@ -48,14 +52,14 @@ def can_run(self, runnable):
def cleanup(self):
"""Default no-op cleanup method."""

def run(self, runnables, job_paths):
def run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run a job using a compatible artifact (workflow or tool)."""
self._check_can_run_all(runnables)
run_responses = self._run(runnables, job_paths)
run_responses = self._run(runnables, job_paths, output_collectors)
return run_responses

@abc.abstractmethod
def _run(self, runnables, job_path):
def _run(self, runnables, job_path, output_collectors: Optional[List[Callable]] = None):
"""Run a job using a compatible artifact (workflow or tool) wrapped as a runnable."""

def _check_can_run(self, runnable):
Expand Down Expand Up @@ -94,6 +98,7 @@ def _run_test_cases(self, test_cases, test_timeout):
runnables = [test_case.runnable for test_case in test_cases]
job_paths = []
tmp_paths = []
output_collectors = []
for test_case in test_cases:
if test_case.job_path is None:
job = test_case.job
Expand All @@ -111,8 +116,9 @@ def _run_test_cases(self, test_cases, test_timeout):
job_paths.append(job_path)
else:
job_paths.append(test_case.job_path)
output_collectors.append(lambda run_response: test_case.structured_test_data(run_response))
try:
run_responses = self._run(runnables, job_paths)
run_responses = self._run(runnables, job_paths, output_collectors)
finally:
for tmp_path in tmp_paths:
os.remove(tmp_path)
Expand Down
7 changes: 6 additions & 1 deletion planemo/engine/toil.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""Module contianing the :class:`ToilEngine` implementation of :class:`Engine`."""
from typing import (
Callable,
List,
Optional,
)

from planemo import cwl
from planemo.runnable import RunnableType
Expand All @@ -13,7 +18,7 @@ class ToilEngine(BaseEngine):

handled_runnable_types = [RunnableType.cwl_tool, RunnableType.cwl_workflow]

def _run(self, runnables, job_paths):
def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run CWL job using Toil."""
results = []
for runnable, job_path in zip(runnables, job_paths):
Expand Down
74 changes: 51 additions & 23 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
if TYPE_CHECKING:
from planemo.cli import PlanemoCliContext
from planemo.galaxy.config import BaseGalaxyConfig
from planemo.runnable import RunnableOutput

DEFAULT_HISTORY_NAME = "CWL Target History"
ERR_NO_SUCH_TOOL = (
Expand Down Expand Up @@ -244,10 +245,10 @@ def _execute( # noqa C901
end_datetime=datetime.now(),
**response_kwds,
)
if kwds.get("download_outputs", True):
if kwds.get("download_outputs"):
output_directory = kwds.get("output_directory", None)
ctx.vlog("collecting outputs from run...")
run_response.collect_outputs(ctx, output_directory)
run_response.collect_outputs(output_directory)
ctx.vlog("collecting outputs complete")
return run_response

Expand Down Expand Up @@ -359,7 +360,7 @@ def __init__(

self._job_info = None

self._outputs_dict = None
self._outputs_dict: Dict[str, Optional[str]] = {}
self._start_datetime = start_datetime
self._end_datetime = end_datetime
self._successful = successful
Expand All @@ -385,6 +386,13 @@ def end_datetime(self):
"""End datetime of run."""
return self._end_datetime

@property
def outputs_dict(self):
return self._outputs_dict

def output_src(self, output: "RunnableOutput", ignore_missing_outputs: Optional[bool] = False) -> Dict[str, str]:
return {}

def _get_extra_files(self, dataset_details):
extra_files_url = (
f"{self._user_gi.url}/histories/{self._history_id}/contents/{dataset_details['id']}/extra_files"
Expand All @@ -406,32 +414,44 @@ def _get_metadata(self, history_content_type, content_id):
else:
raise Exception("Unknown history content type encountered [%s]" % history_content_type)

def collect_outputs(self, ctx, output_directory):
outputs_dict = {}
def collect_outputs(
self,
output_directory: Optional[str] = None,
ignore_missing_output: Optional[bool] = False,
output_id: Optional[str] = None,
):
outputs_dict: Dict[str, Optional[str]] = {}
# TODO: rather than creating a directory just use
# Galaxy paths if they are available in this
# configuration.
output_directory = output_directory or tempfile.mkdtemp()

ctx.log("collecting outputs to directory %s" % output_directory)
self._ctx.log("collecting outputs to directory %s" % output_directory)

for runnable_output in get_outputs(self._runnable, gi=self._user_gi):
output_id = runnable_output.get_id()
if not output_id:
ctx.log("Workflow output identified without an ID (label), skipping")
runnable_output_id = runnable_output.get_id()
if not runnable_output_id:
self._ctx.log("Workflow output identified without an ID (label), skipping")
continue

if output_id and runnable_output_id != output_id:
continue

def get_dataset(dataset_details, filename=None):
parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or output_id)
parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or runnable_output_id)
file_ext = dataset_details["file_ext"]
if file_ext == "directory":
# TODO: rename output_directory to outputs_directory because we can have output directories
# and this is confusing...
the_output_directory = os.path.join(output_directory, parent_basename)
safe_makedirs(the_output_directory)
destination = self.download_output_to(ctx, dataset_details, the_output_directory, filename=filename)
destination = self.download_output_to(
self._ctx, dataset_details, the_output_directory, filename=filename
)
else:
destination = self.download_output_to(ctx, dataset_details, output_directory, filename=filename)
destination = self.download_output_to(
self._ctx, dataset_details, output_directory, filename=filename
)
if filename is None:
basename = parent_basename
else:
Expand All @@ -440,11 +460,11 @@ def get_dataset(dataset_details, filename=None):
return {"path": destination, "basename": basename}

is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]
output_src = self.output_src(runnable_output)
output_src = self.output_src(runnable_output, ignore_missing_output)
if not output_src:
# Optional workflow output
ctx.vlog(f"Optional workflow output '{output_id}' not created, skipping")
outputs_dict[output_id] = None
# Optional workflow output or invocation failed
self._ctx.vlog(f"workflow output '{runnable_output_id}' not created, skipping")
outputs_dict[runnable_output_id] = None
continue
output_dataset_id = output_src["id"]
galaxy_output = self.to_galaxy_output(runnable_output)
Expand Down Expand Up @@ -473,10 +493,12 @@ def attach_file_properties(collection, cwl_output):
attach_file_properties(output_metadata, cwl_output)
output_dict_value = output_metadata

outputs_dict[output_id] = output_dict_value
if output_id:
return output_dict_value
outputs_dict[runnable_output_id] = output_dict_value

self._outputs_dict = outputs_dict
ctx.vlog("collected outputs [%s]" % self._outputs_dict)
self._ctx.vlog("collected outputs [%s]" % self._outputs_dict)

@property
def log(self):
Expand All @@ -497,9 +519,10 @@ def job_info(self):
def invocation_details(self):
return None

@property
def outputs_dict(self):
return self._outputs_dict
def get_output(self, output_id):
if output_id not in self._outputs_dict:
self._outputs_dict[output_id] = self.collect_outputs(ignore_missing_output=True, output_id=output_id)
return self._outputs_dict[output_id]

def download_output_to(self, ctx, dataset_details, output_directory, filename=None):
if filename is None:
Expand Down Expand Up @@ -578,7 +601,7 @@ def to_galaxy_output(self, runnable_output):
output_id = runnable_output.get_id()
return tool_response_to_output(self.api_run_response, self._history_id, output_id)

def output_src(self, output):
def output_src(self, output, ignore_missing_outputs: Optional[bool] = False):
outputs = self.api_run_response["outputs"]
output_collections = self.api_run_response["output_collections"]
output_id = output.get_id()
Expand Down Expand Up @@ -632,7 +655,7 @@ def to_galaxy_output(self, runnable_output):
self._ctx.vlog("checking for output in invocation [%s]" % self._invocation)
return invocation_to_output(self._invocation, self._history_id, output_id)

def output_src(self, output):
def output_src(self, output, ignore_missing_outputs: Optional[bool] = False):
invocation = self._invocation
# Use newer workflow outputs API.

Expand All @@ -643,6 +666,9 @@ def output_src(self, output):
return invocation["output_collections"][output.get_id()]
elif output.is_optional():
return None
elif ignore_missing_outputs:
# We don't need to check this in testing mode, we'll get an error through failed invocation and failed history anyway
return None
else:
raise Exception(f"Failed to find output [{output_name}] in invocation outputs [{invocation['outputs']}]")

Expand Down Expand Up @@ -671,6 +697,8 @@ def collect_invocation_details(self, invocation_id=None):
"invocation_state": self.invocation_state,
"history_state": self.history_state,
"error_message": self.error_message,
# Messages are only present from 23.0 onward
"messages": invocation.get("messages", []),
},
}
return invocation_details
Expand Down
43 changes: 42 additions & 1 deletion planemo/reports/build_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,46 @@

TITLE = "Results (powered by Planemo)"

cancel_fragment = "Invocation scheduling cancelled because"
fail_fragment = "Invocation scheduling failed because"


def render_message_to_string(invocation_message): # noqa: C901
# ChatGPT did a reasonable job of translating this from https://github.com/galaxyproject/galaxy/blob/d92bbb144ffcda7e17368cf43dd25c8a9a3a7dd6/client/src/components/WorkflowInvocationState/InvocationMessage.vue#L93-L172
reason = invocation_message["reason"]
if reason == "user_request":
return f"{cancel_fragment} user requested cancellation."
elif reason == "history_deleted":
return f"{cancel_fragment} the history of the invocation was deleted."
elif reason == "cancelled_on_review":
return f"{cancel_fragment} the invocation was paused at step {invocation_message['workflow_step_id'] + 1} and not approved."
elif reason == "collection_failed":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} requires a dataset collection created by step {invocation_message['dependent_workflow_step_id'] + 1}, but dataset collection entered a failed state."
elif reason == "dataset_failed":
if invocation_message.get("dependent_workflow_step_id") is not None:
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} requires a dataset created by step {invocation_message['dependent_workflow_step_id'] + 1}, but dataset entered a failed state."
else:
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} requires a dataset, but dataset entered a failed state."
elif reason == "job_failed":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} depends on job(s) created in step {invocation_message['dependent_workflow_step_id'] + 1}, but a job for that step failed."
elif reason == "output_not_found":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} depends on output '{invocation_message['output_name']}' of step {invocation_message['dependent_workflow_step_id'] + 1}, but this step did not produce an output of that name."
elif reason == "expression_evaluation_failed":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} contains an expression that could not be evaluated."
elif reason == "when_not_boolean":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} is a conditional step and the result of the when expression is not a boolean type."
elif reason == "unexpected_failure":
at_step = ""
if invocation_message.get("workflow_step_id") is not None:
at_step = f" at step {invocation_message['workflow_step_id'] + 1}"
if "details" in invocation_message and invocation_message["details"]:
return f"{fail_fragment} an unexpected failure occurred{at_step}: '{invocation_message['details']}'"
return f"{fail_fragment} an unexpected failure occurred{at_step}."
elif reason == "workflow_output_not_found":
return f"Defined workflow output '{invocation_message['output_name']}' was not found in step {invocation_message['workflow_step_id'] + 1}."
else:
return reason


def build_report(structured_data, report_type="html", execution_type="Test", **kwds):
"""Use report_{report_type}.tpl to build page for report."""
Expand All @@ -19,12 +59,12 @@ def build_report(structured_data, report_type="html", execution_type="Test", **k

__fix_test_ids(environment)
environment = __inject_summary(environment)
environment["execution_type"] = execution_type

if report_type == "html":
# The HTML report format needs a lot of extra, custom data.
# IMO, this seems to suggest it should be embedded.
environment["title"] = None
environment["execution_type"] = execution_type
markdown = template_data(environment, "report_markdown.tpl")
environment["title"] = " ".join((environment["execution_type"], TITLE))
environment["raw_data"] = base64.b64encode(markdown.encode("utf-8")).decode("utf-8")
Expand All @@ -50,6 +90,7 @@ def template_data(environment, template_name, **kwds):
env_kwargs["trim_blocks"] = True
env = Environment(loader=PackageLoader("planemo", "reports"), **env_kwargs)
env.filters["strip_control_characters"] = lambda x: strip_control_characters(x) if x else x
env.globals["render_message_to_string"] = render_message_to_string
template = env.get_template(template_name)
return template.render(**environment)

Expand Down
Loading

0 comments on commit 31b2f1b

Please sign in to comment.