Skip to content

Commit

Permalink
fix: filter false SLA alerts honor nil job end times instead of lon… (#…
Browse files Browse the repository at this point in the history
…137)

* fix: filter false sla alerts, honnor nil job end times instead of long future date

* fix: handle nil end time for operator runs

* fix: add testcase
  • Loading branch information
Mryashbhardwaj authored Sep 6, 2023
1 parent 0c94da5 commit e3a56ad
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 52 deletions.
24 changes: 16 additions & 8 deletions core/scheduler/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,33 @@ func (i JobRunID) IsEmpty() bool {
type JobRun struct {
ID uuid.UUID

JobName JobName
Tenant tenant.Tenant
State State
ScheduledAt time.Time
StartTime time.Time
SLAAlert bool
EndTime time.Time
JobName JobName
Tenant tenant.Tenant
State State
ScheduledAt time.Time
SLAAlert bool
StartTime time.Time
EndTime *time.Time
SLADefinition int64

Monitoring map[string]any
}

func (j *JobRun) HasSLABreached() bool {
if j.EndTime != nil {
return j.EndTime.After(j.StartTime.Add(time.Second * time.Duration(j.SLADefinition)))
}
return time.Now().After(j.StartTime.Add(time.Second * time.Duration(j.SLADefinition)))
}

type OperatorRun struct {
ID uuid.UUID
Name string
JobRunID uuid.UUID
OperatorType OperatorType
Status State
StartTime time.Time
EndTime time.Time
EndTime *time.Time
}

type NotifyAttrs struct {
Expand Down
63 changes: 63 additions & 0 deletions core/scheduler/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler_test

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -73,6 +74,68 @@ func TestJob(t *testing.T) {
assert.Equal(t, "jobName", jobWithDetails.GetName())
})
t.Run("SLADuration", func(t *testing.T) {
t.Run("has job breached SLA", func(t *testing.T) {
t.Run("duration 1.5 hr", func(t *testing.T) {
jobEndTime := time.Now().Add(-1*time.Hour - 30*time.Minute)
jobRun := scheduler.JobRun{
ID: uuid.UUID{},
JobName: "",
Tenant: tenant.Tenant{},
State: "",
ScheduledAt: time.Time{},
SLAAlert: false,
StartTime: time.Now().Add(-3 * time.Hour),
EndTime: &jobEndTime,
SLADefinition: 3600,
}

assert.True(t, jobRun.HasSLABreached())
})
t.Run("duration .5 hr", func(t *testing.T) {
jobEndTime := time.Now().Add(-2*time.Hour - 30*time.Minute)
jobRun := scheduler.JobRun{
ID: uuid.UUID{},
JobName: "",
Tenant: tenant.Tenant{},
State: "",
ScheduledAt: time.Time{},
SLAAlert: false,
StartTime: time.Now().Add(-3 * time.Hour),
EndTime: &jobEndTime,
SLADefinition: 3600,
}
assert.False(t, jobRun.HasSLABreached())
})
t.Run("should breach sla based on current time if job end time is nil", func(t *testing.T) {
jobRun := scheduler.JobRun{
ID: uuid.UUID{},
JobName: "",
Tenant: tenant.Tenant{},
State: "",
ScheduledAt: time.Time{},
SLAAlert: false,
StartTime: time.Now().Add(-3 * time.Hour),
EndTime: nil,
SLADefinition: 3600,
}
assert.True(t, jobRun.HasSLABreached())
})
t.Run("should 'not report SLA breach' based on current time if job end time is nil", func(t *testing.T) {
jobRun := scheduler.JobRun{
ID: uuid.UUID{},
JobName: "",
Tenant: tenant.Tenant{},
State: "",
ScheduledAt: time.Time{},
SLAAlert: false,
StartTime: time.Now().Add(-30 * time.Minute), // job started 30 min ago
EndTime: nil,
SLADefinition: 3600,
}
assert.False(t, jobRun.HasSLABreached())
})
})

t.Run("should return 0 and error if duration is incorrect format", func(t *testing.T) {
jobWithDetails := scheduler.JobWithDetails{
Name: "jobName",
Expand Down
41 changes: 37 additions & 4 deletions core/scheduler/service/job_run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ type JobRepository interface {
type JobRunRepository interface {
GetByID(ctx context.Context, id scheduler.JobRunID) (*scheduler.JobRun, error)
GetByScheduledAt(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time) (*scheduler.JobRun, error)
GetByScheduledTimes(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, scheduledTimes []time.Time) ([]*scheduler.JobRun, error)
Create(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error
Update(ctx context.Context, jobRunID uuid.UUID, endTime time.Time, jobRunStatus scheduler.State) error
UpdateState(ctx context.Context, jobRunID uuid.UUID, jobRunStatus scheduler.State) error
UpdateSLA(ctx context.Context, slaObjects []*scheduler.SLAObject) error
UpdateSLA(ctx context.Context, jobName scheduler.JobName, project tenant.ProjectName, scheduledTimes []time.Time) error
UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoring map[string]any) error
}

Expand Down Expand Up @@ -311,15 +312,47 @@ func (*JobRunService) getMonitoringValues(event *scheduler.Event) map[string]any
}

func (s *JobRunService) updateJobRunSLA(ctx context.Context, event *scheduler.Event) error {
if len(event.SLAObjectList) < 1 {
return nil
}
var scheduleTimesList []time.Time
for _, SLAObject := range event.SLAObjectList {
scheduleTimesList = append(scheduleTimesList, SLAObject.JobScheduledAt)
}
jobRuns, err := s.repo.GetByScheduledTimes(ctx, event.Tenant, event.JobName, scheduleTimesList)
if err != nil {
s.l.Error("error getting job runs by schedule time", err)
return err
}

var slaBreachedJobRunScheduleTimes []time.Time
var filteredSLAObject []*scheduler.SLAObject
for _, jobRun := range jobRuns {
if !jobRun.HasSLABreached() {
s.l.Error("received sla miss callback for job run that has not breached SLA, jobName: %s, scheduled_at: %s, start_time: %s, end_time: %s, SLA definition: %s",
jobRun.JobName, jobRun.ScheduledAt.String(), jobRun.StartTime, jobRun.EndTime, time.Second*time.Duration(jobRun.SLADefinition))
continue
}
filteredSLAObject = append(filteredSLAObject, &scheduler.SLAObject{
JobName: jobRun.JobName,
JobScheduledAt: jobRun.ScheduledAt,
})
slaBreachedJobRunScheduleTimes = append(slaBreachedJobRunScheduleTimes, jobRun.ScheduledAt)
}

event.SLAObjectList = filteredSLAObject

err = s.repo.UpdateSLA(ctx, event.JobName, event.Tenant.ProjectName(), slaBreachedJobRunScheduleTimes)
if err != nil {
s.l.Error("error updating job run sla status", err)
return err
}
telemetry.NewCounter(metricJobRunEvents, map[string]string{
"project": event.Tenant.ProjectName().String(),
"namespace": event.Tenant.NamespaceName().String(),
"name": event.JobName.String(),
"status": scheduler.SLAMissEvent.String(),
}).Inc()
if len(event.SLAObjectList) > 0 {
return s.repo.UpdateSLA(ctx, event.SLAObjectList)
}
return nil
}

Expand Down
83 changes: 81 additions & 2 deletions core/scheduler/service/job_run_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,80 @@ func TestJobRunService(t *testing.T) {
assert.EqualError(t, err, "error in getting operator run")
})
})

t.Run("updateJobRunSLA", func(t *testing.T) {
t.Run("scenario false sla notification", func(t *testing.T) {
scheduledAtTimeStamp, _ := time.Parse(scheduler.ISODateFormat, "2022-01-02T15:04:05Z")
eventTime := time.Now()
// example of an hourly job
slaBreachedJobRunScheduleTimes := []time.Time{
time.Now().Add(time.Hour * time.Duration(-3)),
time.Now().Add(time.Hour * time.Duration(-2)),
time.Now().Add(time.Hour * time.Duration(-1)),
}
var slaObjectList []*scheduler.SLAObject
for _, scheduleTime := range slaBreachedJobRunScheduleTimes {
slaObjectList = append(slaObjectList, &scheduler.SLAObject{
JobName: jobName,
JobScheduledAt: scheduleTime,
})
}

event := &scheduler.Event{
JobName: jobName,
Tenant: tnnt,
Type: scheduler.SLAMissEvent,
EventTime: eventTime,
OperatorName: "task_bq2bq",
Status: scheduler.StateSuccess,
JobScheduledAt: scheduledAtTimeStamp,
Values: map[string]any{
"status": "success",
},
SLAObjectList: slaObjectList,
}

var jobRuns []*scheduler.JobRun
for _, slaBreachedJobRunScheduleTime := range slaBreachedJobRunScheduleTimes {
jobRuns = append(jobRuns, &scheduler.JobRun{
JobName: jobName,
Tenant: tnnt,
ScheduledAt: slaBreachedJobRunScheduleTime,
SLAAlert: false,
StartTime: slaBreachedJobRunScheduleTime.Add(time.Second * time.Duration(1)),
SLADefinition: 100,
})
}

endTime0 := slaBreachedJobRunScheduleTimes[0].Add(time.Second * time.Duration(40)) // duration 40-1 = 39 Sec (Not an SLA breach)
jobRuns[0].EndTime = &endTime0
endTime1 := slaBreachedJobRunScheduleTimes[1].Add(time.Second * time.Duration(120)) // duration 120-1 = 119 Sec
jobRuns[1].EndTime = &endTime1
endTime2 := slaBreachedJobRunScheduleTimes[2].Add(time.Second * time.Duration(200)) // duration 200-1 = 199 Sec
jobRuns[2].EndTime = &endTime2

jobRunRepo := new(mockJobRunRepository)
jobRunRepo.On("GetByScheduledTimes", ctx, tnnt, jobName, slaBreachedJobRunScheduleTimes).Return(jobRuns, nil).Once()
jobRunRepo.On("UpdateSLA", ctx, event.JobName, event.Tenant.ProjectName(), []time.Time{
slaBreachedJobRunScheduleTimes[1], slaBreachedJobRunScheduleTimes[2],
}).Return(nil).Once()
defer jobRunRepo.AssertExpectations(t)

runService := service.NewJobRunService(logger,
nil, jobRunRepo, nil, nil, nil, nil, nil, nil)

err := runService.UpdateJobState(ctx, event)
assert.Nil(t, err)

t.Run("scenario false sla notification, filter the false sla alert", func(t *testing.T) {
assert.Equal(t, 2, len(event.SLAObjectList))
for _, slaObject := range event.SLAObjectList {
// slaBreachedJobRunScheduleTimes [0] should not be in the list as that is a false alert
assert.False(t, slaObject.JobScheduledAt.Equal(slaBreachedJobRunScheduleTimes[0]))
}
})
})
})
})

t.Run("JobRunInput", func(t *testing.T) {
Expand Down Expand Up @@ -1324,8 +1398,13 @@ func (m *mockJobRunRepository) UpdateState(ctx context.Context, jobRunID uuid.UU
return args.Error(0)
}

func (m *mockJobRunRepository) UpdateSLA(ctx context.Context, slaObjects []*scheduler.SLAObject) error {
args := m.Called(ctx, slaObjects)
func (m *mockJobRunRepository) GetByScheduledTimes(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, scheduledTimes []time.Time) ([]*scheduler.JobRun, error) {
args := m.Called(ctx, tenant, jobName, scheduledTimes)
return args.Get(0).([]*scheduler.JobRun), args.Error(1)
}

func (m *mockJobRunRepository) UpdateSLA(ctx context.Context, jobName scheduler.JobName, project tenant.ProjectName, scheduledTimes []time.Time) error {
args := m.Called(ctx, jobName, project, scheduledTimes)
return args.Error(0)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/store/postgres/scheduler/job_operator_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type operatorRun struct {
Status string

StartTime time.Time
EndTime time.Time
EndTime *time.Time

CreatedAt time.Time
UpdatedAt time.Time
Expand Down Expand Up @@ -96,7 +96,7 @@ func (o *OperatorRunRepository) CreateOperatorRun(ctx context.Context, name stri
if err != nil {
return err
}
insertOperatorRun := "INSERT INTO " + operatorTableName + " ( " + jobOperatorColumnsToStore + ", created_at, updated_at) values ( $1, $2, $3, $4, TIMESTAMP '3000-01-01 00:00:00', NOW(), NOW())"
insertOperatorRun := "INSERT INTO " + operatorTableName + " ( " + jobOperatorColumnsToStore + ", created_at, updated_at) values ( $1, $2, $3, $4, null, NOW(), NOW())"
_, err = o.db.Exec(ctx, insertOperatorRun, name, jobRunID, scheduler.StateRunning, startTime)
return errors.WrapIfErr(scheduler.EntityJobRun, "error while inserting the run", err)
}
Expand Down
Loading

0 comments on commit e3a56ad

Please sign in to comment.