Skip to content

Commit

Permalink
fix: job run state change console event (#107)
Browse files Browse the repository at this point in the history
* fix: job run state change console event
  • Loading branch information
Mryashbhardwaj authored Aug 4, 2023
1 parent ccfa271 commit 9c336d9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
4 changes: 4 additions & 0 deletions core/scheduler/service/job_run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ func (s *JobRunService) raiseJobRunStateChangeEvent(jobRun *scheduler.JobRun) {
schedulerEvent, err = event.NewJobRunSuccessEvent(jobRun)
case scheduler.StateFailed:
schedulerEvent, err = event.NewJobRunFailedEvent(jobRun)
default:
s.l.Error("state [%s] is unrecognized, event is not published", jobRun.State)
return
}
if err != nil {
s.l.Error("error creating event for job run state change : %s", err)
Expand Down Expand Up @@ -379,6 +382,7 @@ func (s *JobRunService) createOperatorRun(ctx context.Context, event *scheduler.
s.l.Error("error updating state for job run id [%d] to [%s]: %s", jobRun.ID, jobState, err)
return err
}
jobRun.State = jobState
s.raiseJobRunStateChangeEvent(jobRun)
}

Expand Down
41 changes: 27 additions & 14 deletions core/scheduler/service/job_run_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,21 +481,13 @@ func TestJobRunService(t *testing.T) {
},
}

jobRun := scheduler.JobRun{
ID: uuid.New(),
JobName: jobName,
Tenant: tnnt,
StartTime: time.Now(),
}

operatorRun := scheduler.OperatorRun{
ID: uuid.New(),
Name: "task_bq2bq",
JobRunID: jobRun.ID,
OperatorType: scheduler.OperatorTask,
Status: scheduler.StateRunning,
}
t.Run("scenario OperatorRun not found and new operator creation fails", func(t *testing.T) {
jobRun := scheduler.JobRun{
ID: uuid.New(),
JobName: jobName,
Tenant: tnnt,
StartTime: time.Now(),
}
operatorRunRepository := new(mockOperatorRunRepository)
operatorRunRepository.On("GetOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID).Return(nil, errors.NotFound(scheduler.EntityEvent, "operator not found in db")).Once()
operatorRunRepository.On("CreateOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID, eventTime).Return(fmt.Errorf("some error in creating operator run"))
Expand All @@ -518,6 +510,13 @@ func TestJobRunService(t *testing.T) {
assert.EqualError(t, err, "some error in creating operator run")
})
t.Run("scenario OperatorRun not found even after successful new operator creation", func(t *testing.T) {
jobRun := scheduler.JobRun{
ID: uuid.New(),
JobName: jobName,
Tenant: tnnt,
StartTime: time.Now(),
}

operatorRunRepository := new(mockOperatorRunRepository)
operatorRunRepository.On("GetOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID).Return(nil, errors.NotFound(scheduler.EntityEvent, "operator not found in db")).Once()
operatorRunRepository.On("CreateOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID, eventTime).Return(nil)
Expand All @@ -541,6 +540,20 @@ func TestJobRunService(t *testing.T) {
assert.EqualError(t, err, "some error in getting operator run")
})
t.Run("scenario OperatorRun found", func(t *testing.T) {
jobRun := scheduler.JobRun{
ID: uuid.New(),
JobName: jobName,
Tenant: tnnt,
StartTime: time.Now(),
}

operatorRun := scheduler.OperatorRun{
ID: uuid.New(),
Name: "task_bq2bq",
JobRunID: jobRun.ID,
OperatorType: scheduler.OperatorTask,
Status: scheduler.StateRunning,
}
operatorRunRepository := new(mockOperatorRunRepository)
operatorRunRepository.On("GetOperatorRun", ctx, event.OperatorName, scheduler.OperatorTask, jobRun.ID).Return(&operatorRun, nil)
operatorRunRepository.On("UpdateOperatorRun", ctx, scheduler.OperatorTask, operatorRun.ID, eventTime, scheduler.StateSuccess).Return(nil)
Expand Down

0 comments on commit 9c336d9

Please sign in to comment.