Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track subworkflow invocations after main invocation is scheduled #1389

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 60 additions & 24 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,25 @@ def _execute( # noqa C901

ctx.vlog("Waiting for invocation [%s]" % invocation_id)
polling_backoff = kwds.get("polling_backoff", 0)
final_invocation_state = "new"
error_message = ""
try:
final_invocation_state = _wait_for_invocation(
ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff
)
assert final_invocation_state == "scheduled"
except Exception:
ctx.vlog("Problem waiting on invocation...")
summarize_history(ctx, user_gi, history_id)
error_message = "Final invocation state is [%s]" % final_invocation_state
ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
no_wait = kwds.get("no_wait", False)

if not kwds.get("no_wait"):
final_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
if final_state not in ("ok", "skipped"):
msg = "Failed to run workflow final history state is [%s]." % final_state
error_message = msg if not error_message else f"{error_message}. {msg}"
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)
else:
ctx.vlog("Final history state is 'ok'")
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state not in ("ok", "skipped"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)

response_kwds = {
"workflow_id": workflow_id,
"invocation_id": invocation_id,
"history_state": final_state if not kwds.get("no_wait") else None,
"history_state": job_state if not no_wait else None,
"invocation_state": final_invocation_state,
"error_message": error_message,
}
Expand Down Expand Up @@ -722,9 +714,53 @@ def get_dict_from_workflow(gi, workflow_id):
return gi.workflows.export_workflow_dict(workflow_id)


def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):
def wait_for_invocation_and_jobs(
nsoranzo marked this conversation as resolved.
Show resolved Hide resolved
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int
):
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
final_invocation_state = "new"

# TODO: hook in invocation["messages"]
error_message = ""
job_state = "ok"
try:
final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff)
assert final_invocation_state == "scheduled"
except Exception as e:
ctx.vlog(f"Problem waiting on invocation: {str(e)}")
summarize_history(ctx, user_gi, history_id)
error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]"

ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")

if not no_wait:
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
if job_state not in ("ok", "skipped"):
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
error_message = msg if not error_message else f"{error_message}. {msg}"
else:
# wait for possible subworkflow invocations
invocation = user_gi.invocations.show_invocation(invocation_id)
for step in invocation["steps"]:
if step.get("subworkflow_invocation_id") is not None:
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=step["subworkflow_invocation_id"],
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
return final_invocation_state, job_state, error_message

ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'")
return final_invocation_state, job_state, error_message


def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))
return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))

return _wait_on_state(state_func, polling_backoff)

Expand Down
Loading