From 069827c89d77d91fcc9ece362e98d17248c52399 Mon Sep 17 00:00:00 2001 From: Yash Bhardwaj Date: Wed, 23 Oct 2024 14:20:13 +0530 Subject: [PATCH] fix: do not pass context to run replay execute goroutine (#287) --- core/scheduler/service/replay_service.go | 4 +-- core/scheduler/service/replay_service_test.go | 8 +++--- core/scheduler/service/replay_worker.go | 6 ++--- core/scheduler/service/replay_worker_test.go | 27 +++++++++---------- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/core/scheduler/service/replay_service.go b/core/scheduler/service/replay_service.go index e12324c613..2027089570 100644 --- a/core/scheduler/service/replay_service.go +++ b/core/scheduler/service/replay_service.go @@ -57,7 +57,7 @@ type ReplayValidator interface { } type ReplayExecutor interface { - Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) + Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) FetchRunsWithDetails(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec) (scheduler.JobRunDetailsList, error) CancelReplayRunsOnScheduler(ctx context.Context, replay *scheduler.Replay, jobCron *cron.ScheduleSpec, runs []*scheduler.JobRunWithDetails) []*scheduler.JobRunStatus @@ -123,7 +123,7 @@ func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobNa State: scheduler.ReplayStateCreated, }) - go r.executor.Execute(context.Background(), replayID, replayReq.Tenant(), jobName) + go r.executor.Execute(replayID, replayReq.Tenant(), jobName) //nolint:contextcheck return replayID, nil } diff --git a/core/scheduler/service/replay_service_test.go b/core/scheduler/service/replay_service_test.go index 7516c8c55d..af3a10957d 100644 --- a/core/scheduler/service/replay_service_test.go +++ b/core/scheduler/service/replay_service_test.go @@ -106,7 +106,7 @@ func TestReplayService(t *testing.T) { jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil) replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil) replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil) - replayWorker.On("Execute", ctx, replayID, tnnt, jobName).Return().Maybe() + replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe() alertManager := new(mockAlertManager) alertManager.On("SendReplayEvent", mock.Anything).Return() @@ -151,7 +151,7 @@ func TestReplayService(t *testing.T) { jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil) replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil) replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil) - replayWorker.On("Execute", ctx, replayID, tnnt, jobName).Return().Maybe() + replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe() alertManager := new(mockAlertManager) alertManager.On("SendReplayEvent", mock.Anything).Return() @@ -866,8 +866,8 @@ type ReplayExecutor struct { } // Execute provides a mock function with given fields: ctx, replayRequest -func (_m *ReplayExecutor) Execute(ctx context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) { - _m.Called(ctx, replayID, jobTenant, jobName) +func (_m *ReplayExecutor) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) { + _m.Called(replayID, jobTenant, jobName) } func (_m *ReplayExecutor) FetchAndSyncStatus(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec) (scheduler.JobRunStatusList, error) { diff --git a/core/scheduler/service/replay_worker.go b/core/scheduler/service/replay_worker.go index a709c291b8..062a0fd039 100644 --- a/core/scheduler/service/replay_worker.go +++ b/core/scheduler/service/replay_worker.go @@ -58,8 +58,8 @@ func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRe } } -func (w *ReplayWorker) Execute(ctxBack context.Context, replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) { - ctx, cancelFn := context.WithTimeout(ctxBack, time.Minute*time.Duration(w.config.ReplayTimeoutInMinutes)) +func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) { + ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute*time.Duration(w.config.ReplayTimeoutInMinutes)) defer cancelFn() w.logger.Info("[ReplayID: %s] starting to execute replay", replayID) @@ -384,7 +384,7 @@ func (w *ReplayWorker) ScanReplayRequest(ctx context.Context) { } requestsToProcess := w.getRequestsToProcess(ctx, replays) for _, req := range requestsToProcess { - go w.Execute(ctx, req.ID(), req.Tenant(), req.JobName()) + go w.Execute(req.ID(), req.Tenant(), req.JobName()) //nolint:contextcheck } } } diff --git a/core/scheduler/service/replay_worker_test.go b/core/scheduler/service/replay_worker_test.go index 925e669039..014fab8415 100644 --- a/core/scheduler/service/replay_worker_test.go +++ b/core/scheduler/service/replay_worker_test.go @@ -20,7 +20,6 @@ import ( func TestReplayWorker(t *testing.T) { logger := log.NewNoop() now := time.Now() - ctx := context.Background() replayServerConfig := config.ReplayConfig{ ExecutionIntervalInSeconds: 1, ReplayTimeoutInMinutes: 5, @@ -117,7 +116,7 @@ func TestReplayWorker(t *testing.T) { defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to process sequential replay request with multiple run", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -202,7 +201,7 @@ func TestReplayWorker(t *testing.T) { defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to process parallel replay request", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -285,7 +284,7 @@ func TestReplayWorker(t *testing.T) { defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to process replay request with sequential mode and creating non existing runs", func(t *testing.T) { @@ -368,7 +367,7 @@ func TestReplayWorker(t *testing.T) { alertManager.On("SendReplayEvent", mock.Anything).Return() defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to process replay request with parallel mode and creating non existing runs", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -451,7 +450,7 @@ func TestReplayWorker(t *testing.T) { defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to get job details", func(t *testing.T) { @@ -480,7 +479,7 @@ func TestReplayWorker(t *testing.T) { errorMsgToStore := "internal error for entity replay: unable to get job details for jobName: job-a, project: proj: internal error" replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsgToStore).Return(nil).Once() worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, nil) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to get replay by id", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -514,7 +513,7 @@ func TestReplayWorker(t *testing.T) { alertManager.On("SendReplayEvent", mock.Anything).Return() defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to fetch job runs", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -549,7 +548,7 @@ func TestReplayWorker(t *testing.T) { alertManager.On("SendReplayEvent", mock.Anything).Return() defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to update replay once it is synced", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -591,7 +590,7 @@ func TestReplayWorker(t *testing.T) { defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to do clear batch of runs", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -635,7 +634,7 @@ func TestReplayWorker(t *testing.T) { alertManager.On("SendReplayEvent", mock.Anything).Return() defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to update replay state as failed if unable to create missing run", func(t *testing.T) { replayRepository := new(ReplayRepository) @@ -681,7 +680,7 @@ func TestReplayWorker(t *testing.T) { defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to still process replay if some of the runs are in failed state", func(t *testing.T) { @@ -765,7 +764,7 @@ func TestReplayWorker(t *testing.T) { alertManager.On("SendReplayEvent", mock.Anything).Return() defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) t.Run("should able to stop replay process if it is being terminated externally", func(t *testing.T) { @@ -797,7 +796,7 @@ func TestReplayWorker(t *testing.T) { defer alertManager.AssertExpectations(t) worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager) - worker.Execute(ctx, replayID, tnnt, jobAName) + worker.Execute(replayID, tnnt, jobAName) }) }) }