Skip to content

Commit

Permalink
Track subworkflow invocations after main invocation is scheduled
Browse files Browse the repository at this point in the history
Otherwise we'd attempt to download results too early.
That's how I discovered
galaxyproject/galaxy#16705.
  • Loading branch information
mvdbeek committed Sep 17, 2023
1 parent 2f79069 commit 4a8d193
Showing 1 changed file with 60 additions and 24 deletions.
84 changes: 60 additions & 24 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,26 @@ def _execute( # noqa C901

ctx.vlog("Waiting for invocation [%s]" % invocation_id)
polling_backoff = kwds.get("polling_backoff", 0)
final_invocation_state = "new"
error_message = ""
try:
final_invocation_state = _wait_for_invocation(
ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff
)
assert final_invocation_state == "scheduled"
except Exception:
ctx.vlog("Problem waiting on invocation...")
summarize_history(ctx, user_gi, history_id)
error_message = "Final invocation state is [%s]" % final_invocation_state
ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
no_wait = kwds.get("no_wait", False)

if not kwds.get("no_wait"):
final_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
if final_state not in ("ok", "skipped"):
msg = "Failed to run workflow final history state is [%s]." % final_state
error_message = msg if not error_message else f"{error_message}. {msg}"
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)
else:
ctx.vlog("Final history state is 'ok'")

final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state not in ("ok", "skipped"):
msg = f"Failed to run workflow, at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)

response_kwds = {
"workflow_id": workflow_id,
"invocation_id": invocation_id,
"history_state": final_state if not kwds.get("no_wait") else None,
"history_state": job_state if not no_wait else None,
"invocation_state": final_invocation_state,
"error_message": error_message,
}
Expand Down Expand Up @@ -722,9 +715,52 @@ def get_dict_from_workflow(gi, workflow_id):
return gi.workflows.export_workflow_dict(workflow_id)


def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):
def wait_for_invocation_and_jobs(
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int
):
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
final_invocation_state = "new"

# TODO: hook in invocation["messages"]
error_message = ""
try:
final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff)
assert final_invocation_state == "scheduled"
except Exception as e:
ctx.vlog(f"Problem waiting on invocation: {str(e)}")
summarize_history(ctx, user_gi, history_id)
error_message = "Final invocation state is [%s]" % final_invocation_state

ctx.vlog("Final invocation state is [%s]" % final_invocation_state)

if not no_wait:
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
if job_state not in ("ok", "skipped"):
msg = "Failed to run workflow, at least one job is in [%s] state." % job_state
error_message = msg if not error_message else f"{error_message}. {msg}"
else:
# wait for possible subworkflow invocations
invocation = user_gi.invocations.show_invocation(invocation_id)
for step in invocation["steps"]:
if step.get("subworkflow_invocation_id") is not None:
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=step["subworkflow_invocation_id"],
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
return final_invocation_state, job_state, error_message

ctx.vlog("All invocation and subworkflow invocations states are 'ok'")
return final_invocation_state, job_state, error_message


def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))
return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))

return _wait_on_state(state_func, polling_backoff)

Expand Down

0 comments on commit 4a8d193

Please sign in to comment.