From b8c677370068aff997e8ab1d1a1a3492573cb536 Mon Sep 17 00:00:00 2001 From: Hamid Zare <12127420+hamidzr@users.noreply.github.com> Date: Tue, 30 Jul 2024 12:58:09 -0500 Subject: [PATCH] fix: fix missing task_stats start_time on restored allocation (#9745) fix a few issues around null and zero start_time on recorded queued tasks_stats --- docs/release-notes/fix-taskstats.rst | 6 +++ e2e_tests/tests/cluster/managed_cluster.py | 3 ++ .../tests/cluster/test_master_restart.py | 46 +++++++++++++++++++ master/internal/rm/db.go | 2 + master/internal/task/allocation.go | 6 ++- .../srv/update_aggregated_queued_time.sql | 1 + 6 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 docs/release-notes/fix-taskstats.rst diff --git a/docs/release-notes/fix-taskstats.rst b/docs/release-notes/fix-taskstats.rst new file mode 100644 index 00000000000..18cc8836681 --- /dev/null +++ b/docs/release-notes/fix-taskstats.rst @@ -0,0 +1,6 @@ +:orphan: + +**Bug Fixes** + +- Fix two places where aggregated queued stats could have shown inflated values. The total queued + aggregated time and today's queued aggregated time calculations were both affected. diff --git a/e2e_tests/tests/cluster/managed_cluster.py b/e2e_tests/tests/cluster/managed_cluster.py index 3aa0a22a6e0..ea7a99fa592 100644 --- a/e2e_tests/tests/cluster/managed_cluster.py +++ b/e2e_tests/tests/cluster/managed_cluster.py @@ -121,6 +121,9 @@ def ensure_agent_ok(self) -> None: assert agent_data[0]["draining"] is False def wait_for_agent_ok(self, ticks: int) -> None: + """ + Each tick is >= 1 second + """ sess = api_utils.user_session() for _i in range(ticks): agent_data = get_agent_data(sess) diff --git a/e2e_tests/tests/cluster/test_master_restart.py b/e2e_tests/tests/cluster/test_master_restart.py index cc20d4931e4..44c83d28631 100644 --- a/e2e_tests/tests/cluster/test_master_restart.py +++ b/e2e_tests/tests/cluster/test_master_restart.py @@ -25,6 +25,52 @@ def test_master_restart_ok(restartable_managed_cluster: managed_cluster.ManagedC restartable_managed_cluster.restart_agent(wait_for_amnesia=False) +@pytest.mark.managed_devcluster +def test_queued_time_restore(restartable_managed_cluster: managed_cluster.ManagedCluster) -> None: + sess = api_utils.user_session() + exp_id = exp.create_experiment( + sess, + conf.fixtures_path("no_op/single.yaml"), + conf.fixtures_path("no_op"), + None, + ) + start_time = time.time() + exp.wait_for_experiment_state( + sess, exp_id, bindings.experimentv1State.RUNNING, max_wait_secs=60 + ) + + def check_queued_aggregates_for_today() -> None: + exps = bindings.get_GetExperiments(sess).experiments + assert len(exps) == 1 + assert exps[0].id == exp_id + aggregates = [ + res.aggregates for res in bindings.get_GetJobQueueStats(sess).results if res.aggregates + ] + assert len(aggregates) == 1, "only a single rp of aggregates should be present" + assert len(aggregates[0]) == 1, "one day of aggregates should be present" + day_stats = aggregates[0][0] + assert day_stats.periodStart == time.strftime("%Y-%m-%d"), "day stats should be for today" + seconds_since_start = time.time() - start_time + # CM-404: checkpoint GC adds to the aggregation. + checkpoint_gc_factor = 5 + assert ( + day_stats.seconds <= seconds_since_start * checkpoint_gc_factor + ), "queued seconds should be less than when test started" + + check_queued_aggregates_for_today() + + restartable_managed_cluster.kill_agent() + restartable_managed_cluster.kill_master() + + restartable_managed_cluster.restart_master() + restartable_managed_cluster.restart_agent(True) + + exp.wait_for_experiment_state( + sess, exp_id, bindings.experimentv1State.COMPLETED, max_wait_secs=60 + ) + check_queued_aggregates_for_today() + + @pytest.mark.e2e_k8s def test_master_restart_ok_k8s(k8s_managed_cluster: managed_cluster_k8s.ManagedK8sCluster) -> None: _test_master_restart_ok(k8s_managed_cluster) diff --git a/master/internal/rm/db.go b/master/internal/rm/db.go index 76f5a9be330..0464b9ba238 100644 --- a/master/internal/rm/db.go +++ b/master/internal/rm/db.go @@ -38,6 +38,8 @@ func FetchAvgQueuedTime(pool string) ( err = db.Bun().NewSelect().TableExpr("task_stats").ColumnExpr( "avg(extract(epoch FROM end_time - start_time))", ).Where("event_type = ?", "QUEUED"). + // treat task stats with missing start time the same as aggregations bb7020a404b. + Where("start_time IS NOT NULL AND start_time != ?", "0001-01-01"). Where("end_time >= CURRENT_DATE AND allocation_id IN (?) ", subq). Scan(context.TODO(), &today) if err != nil { diff --git a/master/internal/task/allocation.go b/master/internal/task/allocation.go index 1027ce7fae2..006e11bcddc 100644 --- a/master/internal/task/allocation.go +++ b/master/internal/task/allocation.go @@ -626,10 +626,14 @@ func (a *allocation) resourcesAllocated(msg *sproto.ResourcesAllocated) error { } now := time.Now().UTC() + taskStatStartTime := msg.JobSubmissionTime + if msg.JobSubmissionTime.IsZero() && a.req.Restore { + taskStatStartTime = a.req.RequestTime + } err = db.RecordTaskStats(context.TODO(), &model.TaskStats{ AllocationID: msg.ID, EventType: "QUEUED", - StartTime: &msg.JobSubmissionTime, + StartTime: &taskStatStartTime, EndTime: &now, }) if err != nil { diff --git a/master/static/srv/update_aggregated_queued_time.sql b/master/static/srv/update_aggregated_queued_time.sql index 4cdf54043bc..99b6b4b3e57 100644 --- a/master/static/srv/update_aggregated_queued_time.sql +++ b/master/static/srv/update_aggregated_queued_time.sql @@ -41,6 +41,7 @@ total_agg AS ( end_time >= const.target_date AND end_time < (const.target_date + interval '1 day') AND event_type = 'QUEUED' + AND task_stats.start_time != '0001-01-01 00:00:00+00:00'::TIMESTAMPTZ ), all_aggs AS (