Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make test reports more resilient to failing invocations #1400

Merged
merged 7 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion planemo/cwl/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def invocation_details(self) -> None:
return None

@property
def outputs_dict(self) -> Optional[Dict[str, Any]]:
def outputs_dict(self):
return self._outputs


Expand Down
8 changes: 7 additions & 1 deletion planemo/engine/cwltool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""Module contianing the :class:`CwlToolEngine` implementation of :class:`Engine`."""

from typing import (
Callable,
List,
Optional,
)

from planemo import cwl
from planemo.runnable import RunnableType
from .interface import BaseEngine
Expand All @@ -13,7 +19,7 @@ class CwlToolEngine(BaseEngine):

handled_runnable_types = [RunnableType.cwl_tool, RunnableType.cwl_workflow]

def _run(self, runnables, job_paths):
def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run CWL job using cwltool."""
results = []
for runnable, job_path in zip(runnables, job_paths):
Expand Down
15 changes: 12 additions & 3 deletions planemo/engine/galaxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import abc
import contextlib
from typing import TYPE_CHECKING
from typing import (
Callable,
List,
Optional,
TYPE_CHECKING,
)

from galaxy.tool_util.verify import interactor

Expand Down Expand Up @@ -44,17 +49,21 @@ class GalaxyEngine(BaseEngine, metaclass=abc.ABCMeta):
RunnableType.directory,
]

def _run(self, runnables, job_paths):
def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run job in Galaxy."""
results = []
for runnable, job_path in zip(runnables, job_paths):
if not output_collectors:
output_collectors = [lambda x: None] * len(runnables)
for runnable, job_path, collect_output in zip(runnables, job_paths, output_collectors):
self._ctx.vlog(f"Serving artifact [{runnable}] with Galaxy.")
with self.ensure_runnables_served([runnable]) as config:
self._ctx.vlog(f"Running job path [{job_path}]")
if self._ctx.verbose:
self._ctx.log(f"Running Galaxy with API configuration [{config.user_api_config}]")
run_response = execute(self._ctx, config, runnable, job_path, **self._kwds)
results.append(run_response)
if collect_output is not None:
collect_output(run_response)

return results

Expand Down
16 changes: 11 additions & 5 deletions planemo/engine/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import json
import os
import tempfile
from typing import List
from typing import (
Callable,
List,
Optional,
)

from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE
from planemo.io import error
Expand Down Expand Up @@ -48,14 +52,14 @@ def can_run(self, runnable):
def cleanup(self):
"""Default no-op cleanup method."""

def run(self, runnables, job_paths):
def run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run a job using a compatible artifact (workflow or tool)."""
self._check_can_run_all(runnables)
run_responses = self._run(runnables, job_paths)
run_responses = self._run(runnables, job_paths, output_collectors)
return run_responses

@abc.abstractmethod
def _run(self, runnables, job_path):
def _run(self, runnables, job_path, output_collectors: Optional[List[Callable]] = None):
"""Run a job using a compatible artifact (workflow or tool) wrapped as a runnable."""

def _check_can_run(self, runnable):
Expand Down Expand Up @@ -94,6 +98,7 @@ def _run_test_cases(self, test_cases, test_timeout):
runnables = [test_case.runnable for test_case in test_cases]
job_paths = []
tmp_paths = []
output_collectors = []
for test_case in test_cases:
if test_case.job_path is None:
job = test_case.job
Expand All @@ -111,8 +116,9 @@ def _run_test_cases(self, test_cases, test_timeout):
job_paths.append(job_path)
else:
job_paths.append(test_case.job_path)
output_collectors.append(lambda run_response: test_case.structured_test_data(run_response))
try:
run_responses = self._run(runnables, job_paths)
run_responses = self._run(runnables, job_paths, output_collectors)
finally:
for tmp_path in tmp_paths:
os.remove(tmp_path)
Expand Down
7 changes: 6 additions & 1 deletion planemo/engine/toil.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""Module contianing the :class:`ToilEngine` implementation of :class:`Engine`."""
from typing import (
Callable,
List,
Optional,
)

from planemo import cwl
from planemo.runnable import RunnableType
Expand All @@ -13,7 +18,7 @@ class ToilEngine(BaseEngine):

handled_runnable_types = [RunnableType.cwl_tool, RunnableType.cwl_workflow]

def _run(self, runnables, job_paths):
def _run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None):
"""Run CWL job using Toil."""
results = []
for runnable, job_path in zip(runnables, job_paths):
Expand Down
74 changes: 51 additions & 23 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
if TYPE_CHECKING:
from planemo.cli import PlanemoCliContext
from planemo.galaxy.config import BaseGalaxyConfig
from planemo.runnable import RunnableOutput

DEFAULT_HISTORY_NAME = "CWL Target History"
ERR_NO_SUCH_TOOL = (
Expand Down Expand Up @@ -244,10 +245,10 @@ def _execute( # noqa C901
end_datetime=datetime.now(),
**response_kwds,
)
if kwds.get("download_outputs", True):
if kwds.get("download_outputs"):
output_directory = kwds.get("output_directory", None)
ctx.vlog("collecting outputs from run...")
run_response.collect_outputs(ctx, output_directory)
run_response.collect_outputs(output_directory)
ctx.vlog("collecting outputs complete")
return run_response

Expand Down Expand Up @@ -359,7 +360,7 @@ def __init__(

self._job_info = None

self._outputs_dict = None
self._outputs_dict: Dict[str, Optional[str]] = {}
self._start_datetime = start_datetime
self._end_datetime = end_datetime
self._successful = successful
Expand All @@ -385,6 +386,13 @@ def end_datetime(self):
"""End datetime of run."""
return self._end_datetime

@property
def outputs_dict(self):
return self._outputs_dict

def output_src(self, output: "RunnableOutput", ignore_missing_outputs: Optional[bool] = False) -> Dict[str, str]:
return {}

def _get_extra_files(self, dataset_details):
extra_files_url = (
f"{self._user_gi.url}/histories/{self._history_id}/contents/{dataset_details['id']}/extra_files"
Expand All @@ -406,32 +414,44 @@ def _get_metadata(self, history_content_type, content_id):
else:
raise Exception("Unknown history content type encountered [%s]" % history_content_type)

def collect_outputs(self, ctx, output_directory):
outputs_dict = {}
def collect_outputs(
self,
output_directory: Optional[str] = None,
ignore_missing_output: Optional[bool] = False,
output_id: Optional[str] = None,
):
outputs_dict: Dict[str, Optional[str]] = {}
# TODO: rather than creating a directory just use
# Galaxy paths if they are available in this
# configuration.
output_directory = output_directory or tempfile.mkdtemp()

ctx.log("collecting outputs to directory %s" % output_directory)
self._ctx.log("collecting outputs to directory %s" % output_directory)

for runnable_output in get_outputs(self._runnable, gi=self._user_gi):
output_id = runnable_output.get_id()
if not output_id:
ctx.log("Workflow output identified without an ID (label), skipping")
runnable_output_id = runnable_output.get_id()
if not runnable_output_id:
self._ctx.log("Workflow output identified without an ID (label), skipping")
continue

if output_id and runnable_output_id != output_id:
continue

def get_dataset(dataset_details, filename=None):
parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or output_id)
parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or runnable_output_id)
file_ext = dataset_details["file_ext"]
if file_ext == "directory":
# TODO: rename output_directory to outputs_directory because we can have output directories
# and this is confusing...
the_output_directory = os.path.join(output_directory, parent_basename)
safe_makedirs(the_output_directory)
destination = self.download_output_to(ctx, dataset_details, the_output_directory, filename=filename)
destination = self.download_output_to(
self._ctx, dataset_details, the_output_directory, filename=filename
)
else:
destination = self.download_output_to(ctx, dataset_details, output_directory, filename=filename)
destination = self.download_output_to(
self._ctx, dataset_details, output_directory, filename=filename
)
if filename is None:
basename = parent_basename
else:
Expand All @@ -440,11 +460,11 @@ def get_dataset(dataset_details, filename=None):
return {"path": destination, "basename": basename}

is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]
output_src = self.output_src(runnable_output)
output_src = self.output_src(runnable_output, ignore_missing_output)
if not output_src:
# Optional workflow output
ctx.vlog(f"Optional workflow output '{output_id}' not created, skipping")
outputs_dict[output_id] = None
# Optional workflow output or invocation failed
self._ctx.vlog(f"workflow output '{runnable_output_id}' not created, skipping")
outputs_dict[runnable_output_id] = None
continue
output_dataset_id = output_src["id"]
galaxy_output = self.to_galaxy_output(runnable_output)
Expand Down Expand Up @@ -473,10 +493,12 @@ def attach_file_properties(collection, cwl_output):
attach_file_properties(output_metadata, cwl_output)
output_dict_value = output_metadata

outputs_dict[output_id] = output_dict_value
if output_id:
return output_dict_value
outputs_dict[runnable_output_id] = output_dict_value

self._outputs_dict = outputs_dict
ctx.vlog("collected outputs [%s]" % self._outputs_dict)
self._ctx.vlog("collected outputs [%s]" % self._outputs_dict)

@property
def log(self):
Expand All @@ -497,9 +519,10 @@ def job_info(self):
def invocation_details(self):
return None

@property
def outputs_dict(self):
return self._outputs_dict
def get_output(self, output_id):
if output_id not in self._outputs_dict:
self._outputs_dict[output_id] = self.collect_outputs(ignore_missing_output=True, output_id=output_id)
return self._outputs_dict[output_id]

def download_output_to(self, ctx, dataset_details, output_directory, filename=None):
if filename is None:
Expand Down Expand Up @@ -578,7 +601,7 @@ def to_galaxy_output(self, runnable_output):
output_id = runnable_output.get_id()
return tool_response_to_output(self.api_run_response, self._history_id, output_id)

def output_src(self, output):
def output_src(self, output, ignore_missing_outputs: Optional[bool] = False):
outputs = self.api_run_response["outputs"]
output_collections = self.api_run_response["output_collections"]
output_id = output.get_id()
Expand Down Expand Up @@ -632,7 +655,7 @@ def to_galaxy_output(self, runnable_output):
self._ctx.vlog("checking for output in invocation [%s]" % self._invocation)
return invocation_to_output(self._invocation, self._history_id, output_id)

def output_src(self, output):
def output_src(self, output, ignore_missing_outputs: Optional[bool] = False):
invocation = self._invocation
# Use newer workflow outputs API.

Expand All @@ -643,6 +666,9 @@ def output_src(self, output):
return invocation["output_collections"][output.get_id()]
elif output.is_optional():
return None
elif ignore_missing_outputs:
# We don't need to check this in testing mode, we'll get an error through failed invocation and failed history anyway
return None
else:
raise Exception(f"Failed to find output [{output_name}] in invocation outputs [{invocation['outputs']}]")

Expand Down Expand Up @@ -671,6 +697,8 @@ def collect_invocation_details(self, invocation_id=None):
"invocation_state": self.invocation_state,
"history_state": self.history_state,
"error_message": self.error_message,
# Messages are only present from 23.0 onward
"messages": invocation.get("messages", []),
},
}
return invocation_details
Expand Down
43 changes: 42 additions & 1 deletion planemo/reports/build_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,46 @@

TITLE = "Results (powered by Planemo)"

cancel_fragment = "Invocation scheduling cancelled because"
fail_fragment = "Invocation scheduling failed because"


def render_message_to_string(invocation_message): # noqa: C901
# ChatGPT did a reasonable job of translating this from https://github.com/galaxyproject/galaxy/blob/d92bbb144ffcda7e17368cf43dd25c8a9a3a7dd6/client/src/components/WorkflowInvocationState/InvocationMessage.vue#L93-L172
reason = invocation_message["reason"]
if reason == "user_request":
return f"{cancel_fragment} user requested cancellation."
elif reason == "history_deleted":
return f"{cancel_fragment} the history of the invocation was deleted."
elif reason == "cancelled_on_review":
return f"{cancel_fragment} the invocation was paused at step {invocation_message['workflow_step_id'] + 1} and not approved."
elif reason == "collection_failed":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} requires a dataset collection created by step {invocation_message['dependent_workflow_step_id'] + 1}, but dataset collection entered a failed state."
elif reason == "dataset_failed":
if invocation_message.get("dependent_workflow_step_id") is not None:
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} requires a dataset created by step {invocation_message['dependent_workflow_step_id'] + 1}, but dataset entered a failed state."
else:
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} requires a dataset, but dataset entered a failed state."
elif reason == "job_failed":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} depends on job(s) created in step {invocation_message['dependent_workflow_step_id'] + 1}, but a job for that step failed."
elif reason == "output_not_found":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} depends on output '{invocation_message['output_name']}' of step {invocation_message['dependent_workflow_step_id'] + 1}, but this step did not produce an output of that name."
elif reason == "expression_evaluation_failed":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} contains an expression that could not be evaluated."
elif reason == "when_not_boolean":
return f"{fail_fragment} step {invocation_message['workflow_step_id'] + 1} is a conditional step and the result of the when expression is not a boolean type."
elif reason == "unexpected_failure":
at_step = ""
if invocation_message.get("workflow_step_id") is not None:
at_step = f" at step {invocation_message['workflow_step_id'] + 1}"
if "details" in invocation_message and invocation_message["details"]:
return f"{fail_fragment} an unexpected failure occurred{at_step}: '{invocation_message['details']}'"
return f"{fail_fragment} an unexpected failure occurred{at_step}."
elif reason == "workflow_output_not_found":
return f"Defined workflow output '{invocation_message['output_name']}' was not found in step {invocation_message['workflow_step_id'] + 1}."
else:
return reason


def build_report(structured_data, report_type="html", execution_type="Test", **kwds):
"""Use report_{report_type}.tpl to build page for report."""
Expand All @@ -19,12 +59,12 @@ def build_report(structured_data, report_type="html", execution_type="Test", **k

__fix_test_ids(environment)
environment = __inject_summary(environment)
environment["execution_type"] = execution_type

if report_type == "html":
# The HTML report format needs a lot of extra, custom data.
# IMO, this seems to suggest it should be embedded.
environment["title"] = None
environment["execution_type"] = execution_type
markdown = template_data(environment, "report_markdown.tpl")
environment["title"] = " ".join((environment["execution_type"], TITLE))
environment["raw_data"] = base64.b64encode(markdown.encode("utf-8")).decode("utf-8")
Expand All @@ -50,6 +90,7 @@ def template_data(environment, template_name, **kwds):
env_kwargs["trim_blocks"] = True
env = Environment(loader=PackageLoader("planemo", "reports"), **env_kwargs)
env.filters["strip_control_characters"] = lambda x: strip_control_characters(x) if x else x
env.globals["render_message_to_string"] = render_message_to_string
template = env.get_template(template_name)
return template.render(**environment)

Expand Down
Loading