Skip to content

Commit

Permalink
Add job metrics per invocation
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Oct 23, 2024
1 parent 051c5ab commit 5e45801
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 1 deletion.
61 changes: 61 additions & 0 deletions client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2594,6 +2594,23 @@ export interface paths {
patch?: never;
trace?: never;
};
"/api/invocations/{invocation_id}/metrics": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/** Get Invocation Metrics */
get: operations["get_invocation_metrics_api_invocations__invocation_id__metrics_get"];
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/api/invocations/{invocation_id}/prepare_store_download": {
parameters: {
query?: never;
Expand Down Expand Up @@ -26928,6 +26945,50 @@ export interface operations {
};
};
};
get_invocation_metrics_api_invocations__invocation_id__metrics_get: {
parameters: {
query?: never;
header?: {
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
"run-as"?: string | null;
};
path: {
/** @description The encoded database identifier of the Invocation. */
invocation_id: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["JobMetric"][];
};
};
/** @description Request Error */
"4XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
/** @description Server Error */
"5XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
};
};
prepare_store_download_api_invocations__invocation_id__prepare_store_download_post: {
parameters: {
query?: never;
Expand Down
39 changes: 38 additions & 1 deletion lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from collections.abc import Sequence
from datetime import (
date,
datetime,
Expand All @@ -26,6 +27,7 @@
null,
or_,
true,
union,
)
from sqlalchemy.orm import aliased
from sqlalchemy.sql import select
Expand Down Expand Up @@ -54,6 +56,7 @@
ImplicitCollectionJobs,
ImplicitCollectionJobsJobAssociation,
Job,
JobMetricNumeric,
JobParameter,
User,
Workflow,
Expand Down Expand Up @@ -729,6 +732,36 @@ def invocation_job_source_iter(sa_session, invocation_id):
yield ("ImplicitCollectionJobs", row[1], row[2])


def get_job_metrics_for_invocation(sa_session: galaxy_scoped_session, invocation_id: int):
single_job_stmnt = (
select(JobMetricNumeric)
.join(Job, JobMetricNumeric.job_id == Job.id)
.join(
WorkflowInvocationStep,
and_(
WorkflowInvocationStep.workflow_invocation_id == invocation_id, WorkflowInvocationStep.job_id == Job.id
),
)
)
collection_job_stmnt = (
select(JobMetricNumeric)
.join(Job, JobMetricNumeric.job_id == Job.id)
.join(ImplicitCollectionJobsJobAssociation, Job.id == ImplicitCollectionJobsJobAssociation.job_id)
.join(
ImplicitCollectionJobs,
ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id,
)
.join(
WorkflowInvocationStep,
and_(
WorkflowInvocationStep.workflow_invocation_id == invocation_id,
WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id,
),
)
)
return sa_session.execute(union(single_job_stmnt, collection_job_stmnt)).all()


def fetch_job_states(sa_session, job_source_ids, job_source_types):
assert len(job_source_ids) == len(job_source_types)
job_ids = set()
Expand Down Expand Up @@ -911,6 +944,10 @@ def summarize_job_metrics(trans, job):
Precondition: the caller has verified the job is accessible to the user
represented by the trans parameter.
"""
return summarize_metrics(trans, job.metrics)


def summarize_metrics(trans: ProvidesUserContext, job_metrics):
safety_level = Safety.SAFE
if trans.user_is_admin:
safety_level = Safety.UNSAFE
Expand All @@ -922,7 +959,7 @@ def summarize_job_metrics(trans, job):
m.metric_value,
m.plugin,
)
for m in job.metrics
for m in job_metrics
]
dictifiable_metrics = trans.app.job_metrics.dictifiable_metrics(raw_metrics, safety_level)
return [d.dict() for d in dictifiable_metrics]
Expand Down
9 changes: 9 additions & 0 deletions lib/galaxy/webapps/galaxy/api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
CreateWorkflowLandingRequestPayload,
InvocationSortByEnum,
InvocationsStateCounts,
JobMetric,
SetSlugPayload,
ShareWithPayload,
ShareWithStatus,
Expand Down Expand Up @@ -1760,3 +1761,11 @@ def workflow_invocation_jobs_summary(
) -> InvocationJobsResponse:
"""An alias for `GET /api/invocations/{invocation_id}/jobs_summary`. `workflow_id` is ignored."""
return self.invocation_jobs_summary(trans=trans, invocation_id=invocation_id)

@router.get("/api/invocations/{invocation_id}/metrics")
def get_invocation_metrics(
self,
invocation_id: InvocationIDPathParam,
trans: ProvidesUserContext = DependsOnTrans,
) -> List[JobMetric]:
return self.invocations_service.show_invocation_metrics(trans=trans, invocation_id=invocation_id)
6 changes: 6 additions & 0 deletions lib/galaxy/webapps/galaxy/services/invocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from galaxy.managers.histories import HistoryManager
from galaxy.managers.jobs import (
fetch_job_states,
get_job_metrics_for_invocation,
invocation_job_source_iter,
summarize_metrics,
)
from galaxy.managers.workflows import WorkflowsManager
from galaxy.model import (
Expand Down Expand Up @@ -147,6 +149,10 @@ def show_invocation_step(self, trans, step_id) -> InvocationStep:
)
return self.serialize_workflow_invocation_step(wfi_step)

def show_invocation_metrics(self, trans: ProvidesHistoryContext, invocation_id: int):
job_metrics = get_job_metrics_for_invocation(trans.sa_session, invocation_id)
return summarize_metrics(trans, job_metrics)

def update_invocation_step(self, trans, step_id, action):
wfi_step = self._workflows_manager.update_invocation_step(trans, step_id, action)
return self.serialize_workflow_invocation_step(wfi_step)
Expand Down
33 changes: 33 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3474,6 +3474,39 @@ def test_workflow_new_autocreated_history(self):
invocation_id = run_workflow_dict["id"]
self.workflow_populator.wait_for_invocation_and_jobs(new_history_id, workflow_id, invocation_id)

def test_invocation_job_metrics_simple(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id=summary.workflow_id, invocation_id=summary.invocation_id
)
job_metrics = self._get(f"invocations/{summary.invocation_id}/metrics").json()
galaxy_slots = [m for m in job_metrics if m["name"] == "galaxy_slots"]
assert len(galaxy_slots) == 1

def test_invocation_job_metrics_map_over(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
WORKFLOW_SIMPLE,
test_data={
"input1": {
"collection_type": "list",
"name": "the_dataset_list",
"elements": [
{"identifier": "el1", "value": "1.fastq", "type": "File"},
{"identifier": "el2", "value": "1.fastq", "type": "File"},
],
}
},
history_id=history_id,
)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id=summary.workflow_id, invocation_id=summary.invocation_id
)
job_metrics = self._get(f"invocations/{summary.invocation_id}/metrics").json()
galaxy_slots = [m for m in job_metrics if m["name"] == "galaxy_slots"]
assert len(galaxy_slots) == 2

def test_workflow_output_dataset(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id)
Expand Down

0 comments on commit 5e45801

Please sign in to comment.