Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add replay lifecycle events #271

Merged
merged 5 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (n JobName) String() string {
return string(n)
}

func (n JobName) GetJobURN(tnnt tenant.Tenant) string {
return fmt.Sprintf("urn:optimus:%s:job:%s.%s.%s", tnnt.ProjectName(), tnnt.ProjectName(), tnnt.NamespaceName(), n)
}

type Job struct {
ID uuid.UUID
Name JobName
Expand Down
10 changes: 5 additions & 5 deletions core/scheduler/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ type AlertAttrs struct {
}

type ReplayNotificationAttrs struct {
JobName string
ReplayID string
Tenant tenant.Tenant
JobURN string
EventType ReplayEventType
JobName string
ReplayID string
Tenant tenant.Tenant
JobURN string
State ReplayState
}

type WebhookAttrs struct {
Expand Down
10 changes: 2 additions & 8 deletions core/scheduler/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
)

const (
// ReplayCreated is an event which indicates the replay has been created but not picked up yet
ReplayCreated ReplayEventType = "replay_created"

// ReplayStateCreated is an initial state which indicates the replay has been created but not picked up yet
ReplayStateCreated ReplayState = "created"

Expand All @@ -23,6 +20,8 @@ const (
// ReplayStateSuccess is a terminal state which occurs when the replay execution finished with successful job runs
ReplayStateSuccess ReplayState = "success"

ReplayStateTimeout ReplayState = "timeout"

// ReplayStateFailed is a terminal state which occurs when the replay execution failed, timed out, or finished with one of the run fails
ReplayStateFailed ReplayState = "failed"

Expand All @@ -38,7 +37,6 @@ var (
)

type (
ReplayEventType string
ReplayState string // contract status for business layer
ReplayUserState string // contract status for presentation layer
)
Expand All @@ -60,10 +58,6 @@ func ReplayStateFromString(state string) (ReplayState, error) {
}
}

func (j ReplayEventType) String() string {
return string(j)
}

func (j ReplayState) String() string {
return string(j)
}
Expand Down
20 changes: 13 additions & 7 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ type ReplayService struct {
pluginToExecutionProjectKeyMap map[string]string
}

func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) {
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, tenant, jobName)
func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) {
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, t, jobName)
if err != nil {
r.logger.Error("unable to get cron value for job [%s]: %s", jobName.String(), err.Error())
return uuid.Nil, err
}

newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, tenant, jobName, config)
newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, t, jobName, config)
if err != nil {
r.logger.Error("unable to get namespace details for job %s: %s", jobName.String(), err)
return uuid.Nil, err
}
config.JobConfig = newConfig

replayReq := scheduler.NewReplayRequest(jobName, tenant, config, scheduler.ReplayStateCreated)
replayReq := scheduler.NewReplayRequest(jobName, t, config, scheduler.ReplayStateCreated)
if err := r.validator.Validate(ctx, replayReq, jobCron); err != nil {
r.logger.Error("error validating replay request: %s", err)
return uuid.Nil, err
Expand All @@ -100,13 +100,19 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant,
return uuid.Nil, err
}

jobReplayMetric.WithLabelValues(tenant.ProjectName().String(),
tenant.NamespaceName().String(),
jobReplayMetric.WithLabelValues(t.ProjectName().String(),
t.NamespaceName().String(),
jobName.String(),
replayReq.State().String(),
).Inc()

r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{})
r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: jobName.GetJobURN(t),
State: scheduler.ReplayStateCreated,
})

go r.executor.Execute(replayID, replayReq.Tenant(), jobName)

Expand Down
67 changes: 52 additions & 15 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ const (
)

type ReplayWorker struct {
logger log.Logger
replayRepo ReplayRepository
jobRepo JobRepository
scheduler ReplayScheduler
config config.ReplayConfig
logger log.Logger
replayRepo ReplayRepository
jobRepo JobRepository
scheduler ReplayScheduler
config config.ReplayConfig
alertManager AlertManager
}

type ReplayScheduler interface {
Expand All @@ -36,13 +37,14 @@ type ReplayScheduler interface {
GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}

func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig) *ReplayWorker {
func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker {
return &ReplayWorker{
logger: logger,
jobRepo: jobRepo,
replayRepo: replayRepository,
config: cfg,
scheduler: scheduler,
logger: logger,
jobRepo: jobRepo,
replayRepo: replayRepository,
config: cfg,
scheduler: scheduler,
alertManager: alertManager,
}
}

Expand Down Expand Up @@ -71,6 +73,21 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN
errMessage := err.Error()
if errors.Is(err, context.DeadlineExceeded) {
errMessage = "replay execution timed out"
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateTimeout,
})
} else {
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateFailed,
})
}
w.logger.Error("[ReplayID: %s] unable to execute replay for job [%s]: %s", replayID.String(), jobName.String(), errMessage)

Expand Down Expand Up @@ -100,20 +117,28 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
w.logger.Info("[ReplayID: %s] executing replay...", replayID)

// sync run first
storedReplayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID)
replayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to get existing runs: %s", replayID.String(), err)
return err
}
replayWithRun := storedReplayWithRun

if storedReplayWithRun.Replay.IsTerminated() {
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), storedReplayWithRun.Replay.State().String())
if replayWithRun.Replay.IsTerminated() {
t := replayWithRun.Replay.Tenant()
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: replayWithRun.Replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: replayWithRun.Replay.JobName().GetJobURN(t),
State: replayWithRun.Replay.State(),
})
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), replayWithRun.Replay.State().String())
return nil
}

incomingRuns, err := w.fetchRuns(ctx, replayWithRun, jobCron)
if err != nil {
// todo: lets not kill watchers on such errors
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
return err
}
Expand All @@ -129,6 +154,18 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI

// check if replay request is on termination state
if syncedRunStatus.IsAllTerminated() {
t := replayWithRun.Replay.Tenant()
replayState := scheduler.ReplayStateSuccess
if syncedRunStatus.IsAnyFailure() {
arinda-arif marked this conversation as resolved.
Show resolved Hide resolved
replayState = scheduler.ReplayStateFailed
}
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: replayWithRun.Replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: replayWithRun.Replay.JobName().GetJobURN(t),
State: replayState,
})
return w.finishReplay(ctx, replayWithRun.Replay.ID(), syncedRunStatus, runStatusSummary)
}

Expand Down
72 changes: 55 additions & 17 deletions core/scheduler/service/replay_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ func TestReplayWorker(t *testing.T) {
summaryMsg := "replay is finished with run status: success(1)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, runsPhaseThree, summaryMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to process sequential replay request with multiple run", func(t *testing.T) {
Expand Down Expand Up @@ -179,7 +183,11 @@ func TestReplayWorker(t *testing.T) {
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to process parallel replay request", func(t *testing.T) {
Expand Down Expand Up @@ -252,7 +260,11 @@ func TestReplayWorker(t *testing.T) {
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})

Expand Down Expand Up @@ -326,8 +338,10 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateInProgress, mock.Anything, "").Return(nil).Once()
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to process replay request with parallel mode and creating non existing runs", func(t *testing.T) {
Expand Down Expand Up @@ -400,7 +414,11 @@ func TestReplayWorker(t *testing.T) {
summaryMsg := "replay is finished with run status: success(2)"
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateSuccess, mock.Anything, summaryMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})

Expand Down Expand Up @@ -429,8 +447,7 @@ func TestReplayWorker(t *testing.T) {
jobRepository.On("GetJobDetails", mock.Anything, projName, jobAName).Return(nil, errors.New(errorMsg)).Once()
errorMsgToStore := "internal error for entity replay: unable to get job details for jobName: job-a, project: proj: internal error"
replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsgToStore).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, nil)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to get replay by id", func(t *testing.T) {
Expand Down Expand Up @@ -459,7 +476,10 @@ func TestReplayWorker(t *testing.T) {
errorMsg := "internal error"
replayRepository.On("GetReplayByID", mock.Anything, replayReq.Replay.ID()).Return(nil, errors.New(errorMsg)).Once()
replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once()
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to fetch job runs", func(t *testing.T) {
Expand Down Expand Up @@ -489,8 +509,10 @@ func TestReplayWorker(t *testing.T) {
errorMsg := "internal error"
sch.On("GetJobRuns", mock.Anything, tnnt, mock.Anything, jobCron).Return(nil, errors.New(errorMsg)).Once()
replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to update replay once it is synced", func(t *testing.T) {
Expand Down Expand Up @@ -526,7 +548,11 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateInProgress, mock.Anything, "").Return(errors.New(errorMsg)).Once()
replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to do clear batch of runs", func(t *testing.T) {
Expand Down Expand Up @@ -562,8 +588,10 @@ func TestReplayWorker(t *testing.T) {
errorMsg := "internal error"
sch.On("ClearBatch", mock.Anything, tnnt, jobAName, scheduledTime2.Add(-24*time.Hour), scheduledTime2.Add(-24*time.Hour)).Return(errors.New(errorMsg)).Once()
replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsg).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
t.Run("should able to update replay state as failed if unable to create missing run", func(t *testing.T) {
Expand Down Expand Up @@ -601,8 +629,11 @@ func TestReplayWorker(t *testing.T) {
sch.On("CreateRun", mock.Anything, tnnt, jobAName, scheduledTime1.Add(-24*time.Hour), "replayed").Return(errors.New(errorMsg)).Once()
errorMsgToStore := "create runs:\n internal error"
replayRepository.On("UpdateReplayStatus", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, errorMsgToStore).Return(nil).Once()
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})

Expand Down Expand Up @@ -677,7 +708,10 @@ func TestReplayWorker(t *testing.T) {
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateInProgress, mock.Anything, "").Return(nil).Once()
replayRepository.On("UpdateReplay", mock.Anything, replayReq.Replay.ID(), scheduler.ReplayStateFailed, mock.Anything, mock.Anything).Return(nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)
worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})

Expand Down Expand Up @@ -705,7 +739,11 @@ func TestReplayWorker(t *testing.T) {
jobRepository.On("GetJobDetails", mock.Anything, projName, jobAName).Return(jobAWithDetails, nil).Once()
replayRepository.On("GetReplayByID", mock.Anything, replayWithRun.Replay.ID()).Return(replayWithRun, nil).Once()

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig)
alertManager := new(mockAlertManager)
alertManager.On("SendReplayEvent", mock.Anything).Return()
defer alertManager.AssertExpectations(t)

worker := service.NewReplayWorker(logger, replayRepository, jobRepository, sch, replayServerConfig, alertManager)
worker.Execute(replayID, tnnt, jobAName)
})
})
Expand Down
Loading
Loading