Skip to content

Commit

Permalink
fix: fix missing task_stats start_time on restored allocation (#9745)
Browse files Browse the repository at this point in the history
fix a few issues around null and zero start_time on recorded queued tasks_stats
  • Loading branch information
hamidzr authored Jul 30, 2024
1 parent a094ea1 commit b8c6773
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/release-notes/fix-taskstats.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
:orphan:

**Bug Fixes**

- Fix two places where aggregated queued stats could have shown inflated values. The total queued
aggregated time and today's queued aggregated time calculations were both affected.
3 changes: 3 additions & 0 deletions e2e_tests/tests/cluster/managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def ensure_agent_ok(self) -> None:
assert agent_data[0]["draining"] is False

def wait_for_agent_ok(self, ticks: int) -> None:
"""
Each tick is >= 1 second
"""
sess = api_utils.user_session()
for _i in range(ticks):
agent_data = get_agent_data(sess)
Expand Down
46 changes: 46 additions & 0 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,52 @@ def test_master_restart_ok(restartable_managed_cluster: managed_cluster.ManagedC
restartable_managed_cluster.restart_agent(wait_for_amnesia=False)


@pytest.mark.managed_devcluster
def test_queued_time_restore(restartable_managed_cluster: managed_cluster.ManagedCluster) -> None:
sess = api_utils.user_session()
exp_id = exp.create_experiment(
sess,
conf.fixtures_path("no_op/single.yaml"),
conf.fixtures_path("no_op"),
None,
)
start_time = time.time()
exp.wait_for_experiment_state(
sess, exp_id, bindings.experimentv1State.RUNNING, max_wait_secs=60
)

def check_queued_aggregates_for_today() -> None:
exps = bindings.get_GetExperiments(sess).experiments
assert len(exps) == 1
assert exps[0].id == exp_id
aggregates = [
res.aggregates for res in bindings.get_GetJobQueueStats(sess).results if res.aggregates
]
assert len(aggregates) == 1, "only a single rp of aggregates should be present"
assert len(aggregates[0]) == 1, "one day of aggregates should be present"
day_stats = aggregates[0][0]
assert day_stats.periodStart == time.strftime("%Y-%m-%d"), "day stats should be for today"
seconds_since_start = time.time() - start_time
# CM-404: checkpoint GC adds to the aggregation.
checkpoint_gc_factor = 5
assert (
day_stats.seconds <= seconds_since_start * checkpoint_gc_factor
), "queued seconds should be less than when test started"

check_queued_aggregates_for_today()

restartable_managed_cluster.kill_agent()
restartable_managed_cluster.kill_master()

restartable_managed_cluster.restart_master()
restartable_managed_cluster.restart_agent(True)

exp.wait_for_experiment_state(
sess, exp_id, bindings.experimentv1State.COMPLETED, max_wait_secs=60
)
check_queued_aggregates_for_today()


@pytest.mark.e2e_k8s
def test_master_restart_ok_k8s(k8s_managed_cluster: managed_cluster_k8s.ManagedK8sCluster) -> None:
_test_master_restart_ok(k8s_managed_cluster)
Expand Down
2 changes: 2 additions & 0 deletions master/internal/rm/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func FetchAvgQueuedTime(pool string) (
err = db.Bun().NewSelect().TableExpr("task_stats").ColumnExpr(
"avg(extract(epoch FROM end_time - start_time))",
).Where("event_type = ?", "QUEUED").
// treat task stats with missing start time the same as aggregations bb7020a404b.
Where("start_time IS NOT NULL AND start_time != ?", "0001-01-01").
Where("end_time >= CURRENT_DATE AND allocation_id IN (?) ", subq).
Scan(context.TODO(), &today)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion master/internal/task/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,10 +626,14 @@ func (a *allocation) resourcesAllocated(msg *sproto.ResourcesAllocated) error {
}

now := time.Now().UTC()
taskStatStartTime := msg.JobSubmissionTime
if msg.JobSubmissionTime.IsZero() && a.req.Restore {
taskStatStartTime = a.req.RequestTime
}
err = db.RecordTaskStats(context.TODO(), &model.TaskStats{
AllocationID: msg.ID,
EventType: "QUEUED",
StartTime: &msg.JobSubmissionTime,
StartTime: &taskStatStartTime,
EndTime: &now,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions master/static/srv/update_aggregated_queued_time.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ total_agg AS (
end_time >= const.target_date
AND end_time < (const.target_date + interval '1 day')
AND event_type = 'QUEUED'
AND task_stats.start_time != '0001-01-01 00:00:00+00:00'::TIMESTAMPTZ
),

all_aggs AS (
Expand Down

0 comments on commit b8c6773

Please sign in to comment.