From 6b6da745c067dda5ab0af0204c55418c565af3da Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Tue, 24 Sep 2024 19:14:43 +0530 Subject: [PATCH 1/5] fix: replay create event --- core/scheduler/job.go | 4 ++++ core/scheduler/service/replay_service.go | 20 +++++++++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/core/scheduler/job.go b/core/scheduler/job.go index 0ca4ec4176..33b7214bc0 100644 --- a/core/scheduler/job.go +++ b/core/scheduler/job.go @@ -47,6 +47,10 @@ func (n JobName) String() string { return string(n) } +func (n JobName) GetConsoleURN(tnnt tenant.Tenant) string { + return fmt.Sprintf("urn:optimus:%s:job:%s.%s.%s", tnnt.ProjectName(), tnnt.ProjectName(), tnnt.NamespaceName(), n) +} + type Job struct { ID uuid.UUID Name JobName diff --git a/core/scheduler/service/replay_service.go b/core/scheduler/service/replay_service.go index bd7b5151f6..7ec6b7c214 100644 --- a/core/scheduler/service/replay_service.go +++ b/core/scheduler/service/replay_service.go @@ -74,21 +74,21 @@ type ReplayService struct { pluginToExecutionProjectKeyMap map[string]string } -func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) { - jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, tenant, jobName) +func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) { + jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, t, jobName) if err != nil { r.logger.Error("unable to get cron value for job [%s]: %s", jobName.String(), err.Error()) return uuid.Nil, err } - newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, tenant, jobName, config) + newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, t, jobName, config) if err != nil { r.logger.Error("unable to get namespace details for job %s: %s", jobName.String(), err) return uuid.Nil, err } config.JobConfig = newConfig - replayReq := scheduler.NewReplayRequest(jobName, tenant, config, scheduler.ReplayStateCreated) + replayReq := scheduler.NewReplayRequest(jobName, t, config, scheduler.ReplayStateCreated) if err := r.validator.Validate(ctx, replayReq, jobCron); err != nil { r.logger.Error("error validating replay request: %s", err) return uuid.Nil, err @@ -100,13 +100,19 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, return uuid.Nil, err } - jobReplayMetric.WithLabelValues(tenant.ProjectName().String(), - tenant.NamespaceName().String(), + jobReplayMetric.WithLabelValues(t.ProjectName().String(), + t.NamespaceName().String(), jobName.String(), replayReq.State().String(), ).Inc() - r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{}) + r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ + JobName: jobName.String(), + ReplayID: replayID.String(), + Tenant: t, + JobURN: jobName.GetConsoleURN(t), + EventType: scheduler.ReplayCreated, + }) go r.executor.Execute(replayID, replayReq.Tenant(), jobName) From 66050c81b57608f678848d5f1500ff178932f314 Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Mon, 30 Sep 2024 19:17:15 +0530 Subject: [PATCH 2/5] fix: emit replay lifecycle events --- core/scheduler/job_run.go | 10 +-- core/scheduler/replay.go | 10 +-- core/scheduler/service/replay_service.go | 10 +-- core/scheduler/service/replay_worker.go | 67 ++++++++++++++---- core/scheduler/service/replay_worker_test.go | 72 +++++++++++++++----- ext/notify/alertmanager/adapter.go | 11 ++- server/optimus.go | 2 +- 7 files changed, 130 insertions(+), 52 deletions(-) diff --git a/core/scheduler/job_run.go b/core/scheduler/job_run.go index 64de33a928..20646b872a 100644 --- a/core/scheduler/job_run.go +++ b/core/scheduler/job_run.go @@ -80,11 +80,11 @@ type AlertAttrs struct { } type ReplayNotificationAttrs struct { - JobName string - ReplayID string - Tenant tenant.Tenant - JobURN string - EventType ReplayEventType + JobName string + ReplayID string + Tenant tenant.Tenant + JobURN string + State ReplayState } type WebhookAttrs struct { diff --git a/core/scheduler/replay.go b/core/scheduler/replay.go index 2fdf3685cf..ca0cf2d0c6 100644 --- a/core/scheduler/replay.go +++ b/core/scheduler/replay.go @@ -11,9 +11,6 @@ import ( ) const ( - // ReplayCreated is an event which indicates the replay has been created but not picked up yet - ReplayCreated ReplayEventType = "replay_created" - // ReplayStateCreated is an initial state which indicates the replay has been created but not picked up yet ReplayStateCreated ReplayState = "created" @@ -23,6 +20,8 @@ const ( // ReplayStateSuccess is a terminal state which occurs when the replay execution finished with successful job runs ReplayStateSuccess ReplayState = "success" + ReplayStateTimeout ReplayState = "timeout" + // ReplayStateFailed is a terminal state which occurs when the replay execution failed, timed out, or finished with one of the run fails ReplayStateFailed ReplayState = "failed" @@ -38,7 +37,6 @@ var ( ) type ( - ReplayEventType string ReplayState string // contract status for business layer ReplayUserState string // contract status for presentation layer ) @@ -60,10 +58,6 @@ func ReplayStateFromString(state string) (ReplayState, error) { } } -func (j ReplayEventType) String() string { - return string(j) -} - func (j ReplayState) String() string { return string(j) } diff --git a/core/scheduler/service/replay_service.go b/core/scheduler/service/replay_service.go index 7ec6b7c214..8e9830c37a 100644 --- a/core/scheduler/service/replay_service.go +++ b/core/scheduler/service/replay_service.go @@ -107,11 +107,11 @@ func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobNa ).Inc() r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ - JobName: jobName.String(), - ReplayID: replayID.String(), - Tenant: t, - JobURN: jobName.GetConsoleURN(t), - EventType: scheduler.ReplayCreated, + JobName: jobName.String(), + ReplayID: replayID.String(), + Tenant: t, + JobURN: jobName.GetConsoleURN(t), + State: scheduler.ReplayStateCreated, }) go r.executor.Execute(replayID, replayReq.Tenant(), jobName) diff --git a/core/scheduler/service/replay_worker.go b/core/scheduler/service/replay_worker.go index 339b0e4995..d69a35f79f 100644 --- a/core/scheduler/service/replay_worker.go +++ b/core/scheduler/service/replay_worker.go @@ -21,11 +21,12 @@ const ( ) type ReplayWorker struct { - logger log.Logger - replayRepo ReplayRepository - jobRepo JobRepository - scheduler ReplayScheduler - config config.ReplayConfig + logger log.Logger + replayRepo ReplayRepository + jobRepo JobRepository + scheduler ReplayScheduler + config config.ReplayConfig + alertManager AlertManager } type ReplayScheduler interface { @@ -36,13 +37,14 @@ type ReplayScheduler interface { GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) } -func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig) *ReplayWorker { +func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker { return &ReplayWorker{ - logger: logger, - jobRepo: jobRepo, - replayRepo: replayRepository, - config: cfg, - scheduler: scheduler, + logger: logger, + jobRepo: jobRepo, + replayRepo: replayRepository, + config: cfg, + scheduler: scheduler, + alertManager: alertManager, } } @@ -71,6 +73,21 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN errMessage := err.Error() if errors.Is(err, context.DeadlineExceeded) { errMessage = "replay execution timed out" + w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ + JobName: jobName.String(), + ReplayID: replayID.String(), + Tenant: jobTenant, + JobURN: jobName.GetConsoleURN(jobTenant), + State: scheduler.ReplayStateTimeout, + }) + } else { + w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ + JobName: jobName.String(), + ReplayID: replayID.String(), + Tenant: jobTenant, + JobURN: jobName.GetConsoleURN(jobTenant), + State: scheduler.ReplayStateFailed, + }) } w.logger.Error("[ReplayID: %s] unable to execute replay for job [%s]: %s", replayID.String(), jobName.String(), errMessage) @@ -100,20 +117,28 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI w.logger.Info("[ReplayID: %s] executing replay...", replayID) // sync run first - storedReplayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID) + replayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID) if err != nil { w.logger.Error("[ReplayID: %s] unable to get existing runs: %s", replayID.String(), err) return err } - replayWithRun := storedReplayWithRun - if storedReplayWithRun.Replay.IsTerminated() { - w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), storedReplayWithRun.Replay.State().String()) + if replayWithRun.Replay.IsTerminated() { + t := replayWithRun.Replay.Tenant() + w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ + JobName: replayWithRun.Replay.JobName().String(), + ReplayID: replayID.String(), + Tenant: t, + JobURN: replayWithRun.Replay.JobName().GetConsoleURN(t), + State: replayWithRun.Replay.State(), + }) + w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), replayWithRun.Replay.State().String()) return nil } incomingRuns, err := w.fetchRuns(ctx, replayWithRun, jobCron) if err != nil { + // todo: lets not kill watchers on such errors w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err) return err } @@ -129,6 +154,18 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI // check if replay request is on termination state if syncedRunStatus.IsAllTerminated() { + t := replayWithRun.Replay.Tenant() + replayState := scheduler.ReplayStateSuccess + if syncedRunStatus.IsAnyFailure() { + replayState = scheduler.ReplayStateFailed + } + w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ + JobName: replayWithRun.Replay.JobName().String(), + ReplayID: replayID.String(), + Tenant: t, + JobURN: replayWithRun.Replay.JobName().GetConsoleURN(t), + State: replayState, + }) return w.finishReplay(ctx, replayWithRun.Replay.ID(), syncedRunStatus, runStatusSummary) } diff --git a/core/scheduler/service/replay_worker_test.go b/core/scheduler/service/replay_worker_test.go index 24870c87a3..1359f6d40e 100644 --- a/core/scheduler/service/replay_worker_test.go +++ b/core/scheduler/service/replay_worker_test.go @@ -104,7 +104,11 @@ func TestReplayWorker(t *testing.T) { summaryMsg := "replay is finished with run status: success(1)" replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, runsPhaseThree, summaryMsg).Return(nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to process sequential replay request with multiple run", func(t *testing.T) { @@ -179,7 +183,11 @@ func TestReplayWorker(t *testing.T) { summaryMsg := "replay is finished with run status: success(2)" replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to process parallel replay request", func(t *testing.T) { @@ -252,7 +260,11 @@ func TestReplayWorker(t *testing.T) { summaryMsg := "replay is finished with run status: success(2)" replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) @@ -326,8 +338,10 @@ func TestReplayWorker(t *testing.T) { replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateInProgress, mock.Anything, "").Return(nil).Once() summaryMsg := "replay is finished with run status: success(2)" replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once() - - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to process replay request with parallel mode and creating non existing runs", func(t *testing.T) { @@ -400,7 +414,11 @@ func TestReplayWorker(t *testing.T) { summaryMsg := "replay is finished with run status: success(2)" replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) @@ -429,8 +447,7 @@ func TestReplayWorker(t *testing.T) { jobRepository.On("GetJobDetails", mock.Anything, projName, jobAName).Return(nil, errors.New(errorMsg)).Once() errorMsgToStore := "internal error for entity replay: unable to get job details for jobName: job-a, project: proj: internal error" replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsgToStore).Return(nil).Once() - - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, nil) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to get replay by id", func(t *testing.T) { @@ -459,7 +476,10 @@ func TestReplayWorker(t *testing.T) { errorMsg := "internal error" replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(nil, errors.New(errorMsg)).Once() replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to fetch job runs", func(t *testing.T) { @@ -489,8 +509,10 @@ func TestReplayWorker(t *testing.T) { errorMsg := "internal error" sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(nil, errors.New(errorMsg)).Once() replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once() - - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to update replay once it is synced", func(t *testing.T) { @@ -526,7 +548,11 @@ func TestReplayWorker(t *testing.T) { replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateInProgress, mock.Anything, "").Return(errors.New(errorMsg)).Once() replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to do clear batch of runs", func(t *testing.T) { @@ -562,8 +588,10 @@ func TestReplayWorker(t *testing.T) { errorMsg := "internal error" sch.On("ClearBatch", mock.Anything, tnnt, jobAName, scheduledTime2.Add(-24*time.Hour), scheduledTime2.Add(-24*time.Hour)).Return(errors.New(errorMsg)).Once() replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once() - - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to create missing run", func(t *testing.T) { @@ -601,8 +629,11 @@ func TestReplayWorker(t *testing.T) { sch.On("CreateRun", mock.Anything, tnnt, jobAName, scheduledTime1.Add(-24*time.Hour), "replayed").Return(errors.New(errorMsg)).Once() errorMsgToStore := "create runs:\n internal error" replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsgToStore).Return(nil).Once() + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) @@ -677,7 +708,10 @@ func TestReplayWorker(t *testing.T) { replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateInProgress, mock.Anything, "").Return(nil).Once() replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, mock.Anything, mock.Anything).Return(nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) @@ -705,7 +739,11 @@ func TestReplayWorker(t *testing.T) { jobRepository.On("GetJobDetails", mock.Anything, projName, jobAName).Return(jobAWithDetails, nil).Once() replayRepository.On("GetReplayByID", mock.Anything, replayWithRun.Replay.ID()).Return(replayWithRun, nil).Once() - worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig) + alertManager := new(mockAlertManager) + alertManager.On("SendReplayEvent", mock.Anything).Return() + defer alertManager.AssertExpectations(t) + + worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) worker.Execute(replayID, tnnt, jobAName) }) }) diff --git a/ext/notify/alertmanager/adapter.go b/ext/notify/alertmanager/adapter.go index ee820f98ac..7091d22bf1 100644 --- a/ext/notify/alertmanager/adapter.go +++ b/ext/notify/alertmanager/adapter.go @@ -19,8 +19,16 @@ const ( failureAlertTemplate = "optimus-job-failure" slaAlertTemplate = "optimus-job-sla-miss" successNotificationTemplate = "optimus-job-success" + + ReplayLifeCycle ReplayEventType = "replay-lifecycle" ) +type ReplayEventType string + +func (j ReplayEventType) String() string { + return string(j) +} + func (a *AlertManager) getJobConsoleLink(project, job string) string { return fmt.Sprintf("%s/%s/%s:%s", a.dataConsole, "optimus", project, job) } @@ -106,12 +114,13 @@ func (a *AlertManager) SendReplayEvent(attr *scheduler.ReplayNotificationAttrs) "job_name": attr.JobName, "project": projectName, "namespace": attr.Tenant.NamespaceName().String(), + "state": attr.State.String(), "console_link": a.getJobConsoleLink(projectName, attr.JobName), }, Template: replayTemplate, Labels: map[string]string{ "identifier": attr.JobURN, - "event_type": strings.ToLower(attr.EventType.String()), + "event_type": strings.ToLower(ReplayLifeCycle.String()), }, }) } diff --git a/server/optimus.go b/server/optimus.go index c394252edb..9712d42ed4 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -336,7 +336,7 @@ func (s *OptimusServer) setupHandlers() error { } replayRepository := schedulerRepo.NewReplayRepository(s.dbPool) - replayWorker := schedulerService.NewReplayWorker(s.logger, replayRepository, jobProviderRepo, newScheduler, s.conf.Replay) + replayWorker := schedulerService.NewReplayWorker(s.logger, replayRepository, jobProviderRepo, newScheduler, s.conf.Replay, alertsHandler) replayValidator := schedulerService.NewValidator(replayRepository, newScheduler, jobProviderRepo) replayService := schedulerService.NewReplayService( replayRepository, jobProviderRepo, tenantService, From 64d8839f5a02964314338cd0bd50d86f57e492fb Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Tue, 1 Oct 2024 13:52:38 +0530 Subject: [PATCH 3/5] fix: refactor optimus replay template --- ext/notify/alertmanager/adapter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ext/notify/alertmanager/adapter.go b/ext/notify/alertmanager/adapter.go index 7091d22bf1..9366a44b38 100644 --- a/ext/notify/alertmanager/adapter.go +++ b/ext/notify/alertmanager/adapter.go @@ -115,6 +115,7 @@ func (a *AlertManager) SendReplayEvent(attr *scheduler.ReplayNotificationAttrs) "project": projectName, "namespace": attr.Tenant.NamespaceName().String(), "state": attr.State.String(), + "replay_id": attr.ReplayID, "console_link": a.getJobConsoleLink(projectName, attr.JobName), }, Template: replayTemplate, From 8b34253a100eb519c82baf1fb3605347755331fd Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Tue, 1 Oct 2024 14:40:00 +0530 Subject: [PATCH 4/5] fix: refactor optimus replay template --- core/scheduler/job.go | 2 +- core/scheduler/service/replay_service.go | 2 +- core/scheduler/service/replay_worker.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/scheduler/job.go b/core/scheduler/job.go index 33b7214bc0..1f3b782fd5 100644 --- a/core/scheduler/job.go +++ b/core/scheduler/job.go @@ -47,7 +47,7 @@ func (n JobName) String() string { return string(n) } -func (n JobName) GetConsoleURN(tnnt tenant.Tenant) string { +func (n JobName) GetJobURN(tnnt tenant.Tenant) string { return fmt.Sprintf("urn:optimus:%s:job:%s.%s.%s", tnnt.ProjectName(), tnnt.ProjectName(), tnnt.NamespaceName(), n) } diff --git a/core/scheduler/service/replay_service.go b/core/scheduler/service/replay_service.go index 8e9830c37a..e7770db56a 100644 --- a/core/scheduler/service/replay_service.go +++ b/core/scheduler/service/replay_service.go @@ -110,7 +110,7 @@ func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobNa JobName: jobName.String(), ReplayID: replayID.String(), Tenant: t, - JobURN: jobName.GetConsoleURN(t), + JobURN: jobName.GetJobURN(t), State: scheduler.ReplayStateCreated, }) diff --git a/core/scheduler/service/replay_worker.go b/core/scheduler/service/replay_worker.go index d69a35f79f..fb5aa8e0c4 100644 --- a/core/scheduler/service/replay_worker.go +++ b/core/scheduler/service/replay_worker.go @@ -77,7 +77,7 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN JobName: jobName.String(), ReplayID: replayID.String(), Tenant: jobTenant, - JobURN: jobName.GetConsoleURN(jobTenant), + JobURN: jobName.GetJobURN(jobTenant), State: scheduler.ReplayStateTimeout, }) } else { @@ -85,7 +85,7 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN JobName: jobName.String(), ReplayID: replayID.String(), Tenant: jobTenant, - JobURN: jobName.GetConsoleURN(jobTenant), + JobURN: jobName.GetJobURN(jobTenant), State: scheduler.ReplayStateFailed, }) } @@ -129,7 +129,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI JobName: replayWithRun.Replay.JobName().String(), ReplayID: replayID.String(), Tenant: t, - JobURN: replayWithRun.Replay.JobName().GetConsoleURN(t), + JobURN: replayWithRun.Replay.JobName().GetJobURN(t), State: replayWithRun.Replay.State(), }) w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), replayWithRun.Replay.State().String()) @@ -163,7 +163,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI JobName: replayWithRun.Replay.JobName().String(), ReplayID: replayID.String(), Tenant: t, - JobURN: replayWithRun.Replay.JobName().GetConsoleURN(t), + JobURN: replayWithRun.Replay.JobName().GetJobURN(t), State: replayState, }) return w.finishReplay(ctx, replayWithRun.Replay.ID(), syncedRunStatus, runStatusSummary) From 72ecfdfe84a6181f90f8c9c7efbd432c9209060c Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Fri, 4 Oct 2024 11:42:52 +0530 Subject: [PATCH 5/5] fix: refactor --- core/scheduler/service/replay_worker.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/core/scheduler/service/replay_worker.go b/core/scheduler/service/replay_worker.go index fb5aa8e0c4..508c939d0c 100644 --- a/core/scheduler/service/replay_worker.go +++ b/core/scheduler/service/replay_worker.go @@ -154,19 +154,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI // check if replay request is on termination state if syncedRunStatus.IsAllTerminated() { - t := replayWithRun.Replay.Tenant() - replayState := scheduler.ReplayStateSuccess - if syncedRunStatus.IsAnyFailure() { - replayState = scheduler.ReplayStateFailed - } - w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ - JobName: replayWithRun.Replay.JobName().String(), - ReplayID: replayID.String(), - Tenant: t, - JobURN: replayWithRun.Replay.JobName().GetJobURN(t), - State: replayState, - }) - return w.finishReplay(ctx, replayWithRun.Replay.ID(), syncedRunStatus, runStatusSummary) + return w.finishReplay(ctx, replayWithRun.Replay, syncedRunStatus, runStatusSummary) } // pick runs to be triggered @@ -203,11 +191,20 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI } } -func (w *ReplayWorker) finishReplay(ctx context.Context, replayID uuid.UUID, syncedRunStatus scheduler.JobRunStatusList, runStatusSummary string) error { +func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Replay, syncedRunStatus scheduler.JobRunStatusList, runStatusSummary string) error { + replayID := replay.ID() replayState := scheduler.ReplayStateSuccess if syncedRunStatus.IsAnyFailure() { replayState = scheduler.ReplayStateFailed } + w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{ + JobName: replay.JobName().String(), + ReplayID: replayID.String(), + Tenant: replay.Tenant(), + JobURN: replay.JobName().GetJobURN(replay.Tenant()), + State: replayState, + }) + msg := fmt.Sprintf("replay is finished with run status: %s", runStatusSummary) w.logger.Info("[ReplayID: %s] replay finished with status %s", replayID, replayState)