Skip to content

Commit

Permalink
Feat: add replay lifecycle events (#271) (#275)
Browse files Browse the repository at this point in the history
* fix: emit replay lifecycle events
  • Loading branch information
Mryashbhardwaj authored Oct 8, 2024
1 parent 36c2207 commit a7bb2c5
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 56 deletions.
4 changes: 4 additions & 0 deletions core/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (n JobName) String() string {
return string(n)
}

func (n JobName) GetJobURN(tnnt tenant.Tenant) string {
return fmt.Sprintf("urn:optimus:%s:job:%s.%s.%s", tnnt.ProjectName(), tnnt.ProjectName(), tnnt.NamespaceName(), n)
}

type Job struct {
ID uuid.UUID
Name JobName
Expand Down
10 changes: 5 additions & 5 deletions core/scheduler/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ type AlertAttrs struct {
}

type ReplayNotificationAttrs struct {
JobName string
ReplayID string
Tenant tenant.Tenant
JobURN string
EventType ReplayEventType
JobName string
ReplayID string
Tenant tenant.Tenant
JobURN string
State ReplayState
}

type WebhookAttrs struct {
Expand Down
10 changes: 2 additions & 8 deletions core/scheduler/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
)

const (
// ReplayCreated is an event which indicates the replay has been created but not picked up yet
ReplayCreated ReplayEventType = "replay_created"

// ReplayStateCreated is an initial state which indicates the replay has been created but not picked up yet
ReplayStateCreated ReplayState = "created"

Expand All @@ -23,6 +20,8 @@ const (
// ReplayStateSuccess is a terminal state which occurs when the replay execution finished with successful job runs
ReplayStateSuccess ReplayState = "success"

ReplayStateTimeout ReplayState = "timeout"

// ReplayStateFailed is a terminal state which occurs when the replay execution failed, timed out, or finished with one of the run fails
ReplayStateFailed ReplayState = "failed"

Expand All @@ -38,7 +37,6 @@ var (
)

type (
ReplayEventType string
ReplayState string // contract status for business layer
ReplayUserState string // contract status for presentation layer
)
Expand All @@ -60,10 +58,6 @@ func ReplayStateFromString(state string) (ReplayState, error) {
}
}

func (j ReplayEventType) String() string {
return string(j)
}

func (j ReplayState) String() string {
return string(j)
}
Expand Down
20 changes: 13 additions & 7 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ type ReplayService struct {
pluginToExecutionProjectKeyMap map[string]string
}

func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) {
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, tenant, jobName)
func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) {
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, t, jobName)
if err != nil {
r.logger.Error("unable to get cron value for job [%s]: %s", jobName.String(), err.Error())
return uuid.Nil, err
}

newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, tenant, jobName, config)
newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, t, jobName, config)
if err != nil {
r.logger.Error("unable to get namespace details for job %s: %s", jobName.String(), err)
return uuid.Nil, err
}
config.JobConfig = newConfig

replayReq := scheduler.NewReplayRequest(jobName, tenant, config, scheduler.ReplayStateCreated)
replayReq := scheduler.NewReplayRequest(jobName, t, config, scheduler.ReplayStateCreated)
if err := r.validator.Validate(ctx, replayReq, jobCron); err != nil {
r.logger.Error("error validating replay request: %s", err)
return uuid.Nil, err
Expand All @@ -100,13 +100,19 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant,
return uuid.Nil, err
}

jobReplayMetric.WithLabelValues(tenant.ProjectName().String(),
tenant.NamespaceName().String(),
jobReplayMetric.WithLabelValues(t.ProjectName().String(),
t.NamespaceName().String(),
jobName.String(),
replayReq.State().String(),
).Inc()

r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{})
r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: jobName.GetJobURN(t),
State: scheduler.ReplayStateCreated,
})

go r.executor.Execute(replayID, replayReq.Tenant(), jobName)

Expand Down
68 changes: 51 additions & 17 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ const (
)

type ReplayWorker struct {
logger log.Logger
replayRepo ReplayRepository
jobRepo JobRepository
scheduler ReplayScheduler
config config.ReplayConfig
logger log.Logger
replayRepo ReplayRepository
jobRepo JobRepository
scheduler ReplayScheduler
config config.ReplayConfig
alertManager AlertManager
}

type ReplayScheduler interface {
Expand All @@ -36,13 +37,14 @@ type ReplayScheduler interface {
GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}

func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig) *ReplayWorker {
func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker {
return &ReplayWorker{
logger: logger,
jobRepo: jobRepo,
replayRepo: replayRepository,
config: cfg,
scheduler: scheduler,
logger: logger,
jobRepo: jobRepo,
replayRepo: replayRepository,
config: cfg,
scheduler: scheduler,
alertManager: alertManager,
}
}

Expand Down Expand Up @@ -71,6 +73,21 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN
errMessage := err.Error()
if errors.Is(err, context.DeadlineExceeded) {
errMessage = "replay execution timed out"
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateTimeout,
})
} else {
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateFailed,
})
}
w.logger.Error("[ReplayID: %s] unable to execute replay for job [%s]: %s", replayID.String(), jobName.String(), errMessage)

Expand Down Expand Up @@ -100,20 +117,28 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
w.logger.Info("[ReplayID: %s] executing replay...", replayID)

// sync run first
storedReplayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID)
replayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to get existing runs: %s", replayID.String(), err)
return err
}
replayWithRun := storedReplayWithRun

if storedReplayWithRun.Replay.IsTerminated() {
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), storedReplayWithRun.Replay.State().String())
if replayWithRun.Replay.IsTerminated() {
t := replayWithRun.Replay.Tenant()
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: replayWithRun.Replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: replayWithRun.Replay.JobName().GetJobURN(t),
State: replayWithRun.Replay.State(),
})
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), replayWithRun.Replay.State().String())
return nil
}

incomingRuns, err := w.fetchRuns(ctx, replayWithRun, jobCron)
if err != nil {
// todo: lets not kill watchers on such errors
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
return err
}
Expand All @@ -129,7 +154,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI

// check if replay request is on termination state
if syncedRunStatus.IsAllTerminated() {
return w.finishReplay(ctx, replayWithRun.Replay.ID(), syncedRunStatus, runStatusSummary)
return w.finishReplay(ctx, replayWithRun.Replay, syncedRunStatus, runStatusSummary)
}

// pick runs to be triggered
Expand Down Expand Up @@ -166,11 +191,20 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
}
}

func (w *ReplayWorker) finishReplay(ctx context.Context, replayID uuid.UUID, syncedRunStatus scheduler.JobRunStatusList, runStatusSummary string) error {
func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Replay, syncedRunStatus scheduler.JobRunStatusList, runStatusSummary string) error {
replayID := replay.ID()
replayState := scheduler.ReplayStateSuccess
if syncedRunStatus.IsAnyFailure() {
replayState = scheduler.ReplayStateFailed
}
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: replay.Tenant(),
JobURN: replay.JobName().GetJobURN(replay.Tenant()),
State: replayState,
})

msg := fmt.Sprintf("replay is finished with run status: %s", runStatusSummary)
w.logger.Info("[ReplayID: %s] replay finished with status %s", replayID, replayState)

Expand Down
Loading

0 comments on commit a7bb2c5

Please sign in to comment.