diff --git a/core/scheduler/service/job_run_service.go b/core/scheduler/service/job_run_service.go index c39509a8a9..0c581d05ad 100644 --- a/core/scheduler/service/job_run_service.go +++ b/core/scheduler/service/job_run_service.go @@ -348,6 +348,9 @@ func (s *JobRunService) raiseJobRunStateChangeEvent(jobRun *scheduler.JobRun) { schedulerEvent, err = event.NewJobRunSuccessEvent(jobRun) case scheduler.StateFailed: schedulerEvent, err = event.NewJobRunFailedEvent(jobRun) + default: + s.l.Error("state [%s] is unrecognized, event is not published", jobRun.State) + return } if err != nil { s.l.Error("error creating event for job run state change : %s", err) @@ -379,6 +382,7 @@ func (s *JobRunService) createOperatorRun(ctx context.Context, event *scheduler. s.l.Error("error updating state for job run id [%d] to [%s]: %s", jobRun.ID, jobState, err) return err } + jobRun.State = jobState s.raiseJobRunStateChangeEvent(jobRun) } diff --git a/core/scheduler/service/job_run_service_test.go b/core/scheduler/service/job_run_service_test.go index 97b97b6ccb..bef2d232d4 100644 --- a/core/scheduler/service/job_run_service_test.go +++ b/core/scheduler/service/job_run_service_test.go @@ -481,21 +481,13 @@ func TestJobRunService(t *testing.T) { }, } - jobRun := scheduler.JobRun{ - ID: uuid.New(), - JobName: jobName, - Tenant: tnnt, - StartTime: time.Now(), - } - - operatorRun := scheduler.OperatorRun{ - ID: uuid.New(), - Name: "task_bq2bq", - JobRunID: jobRun.ID, - OperatorType: scheduler.OperatorTask, - Status: scheduler.StateRunning, - } t.Run("scenario OperatorRun not found and new operator creation fails", func(t *testing.T) { + jobRun := scheduler.JobRun{ + ID: uuid.New(), + JobName: jobName, + Tenant: tnnt, + StartTime: time.Now(), + } operatorRunRepository := new(mockOperatorRunRepository) operatorRunRepository.On("GetOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID).Return(nil, errors.NotFound(scheduler.EntityEvent, "operator not found in db")).Once() operatorRunRepository.On("CreateOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID, eventTime).Return(fmt.Errorf("some error in creating operator run")) @@ -518,6 +510,13 @@ func TestJobRunService(t *testing.T) { assert.EqualError(t, err, "some error in creating operator run") }) t.Run("scenario OperatorRun not found even after successful new operator creation", func(t *testing.T) { + jobRun := scheduler.JobRun{ + ID: uuid.New(), + JobName: jobName, + Tenant: tnnt, + StartTime: time.Now(), + } + operatorRunRepository := new(mockOperatorRunRepository) operatorRunRepository.On("GetOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID).Return(nil, errors.NotFound(scheduler.EntityEvent, "operator not found in db")).Once() operatorRunRepository.On("CreateOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID, eventTime).Return(nil) @@ -541,6 +540,20 @@ func TestJobRunService(t *testing.T) { assert.EqualError(t, err, "some error in getting operator run") }) t.Run("scenario OperatorRun found", func(t *testing.T) { + jobRun := scheduler.JobRun{ + ID: uuid.New(), + JobName: jobName, + Tenant: tnnt, + StartTime: time.Now(), + } + + operatorRun := scheduler.OperatorRun{ + ID: uuid.New(), + Name: "task_bq2bq", + JobRunID: jobRun.ID, + OperatorType: scheduler.OperatorTask, + Status: scheduler.StateRunning, + } operatorRunRepository := new(mockOperatorRunRepository) operatorRunRepository.On("GetOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID).Return(&operatorRun, nil) operatorRunRepository.On("UpdateOperatorRun", ctx, scheduler.OperatorTask, operatorRun.ID, eventTime, scheduler.StateSuccess).Return(nil)