Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add replay lifecycle events #271

Merged
merged 5 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (n JobName) String() string {
return string(n)
}

func (n JobName) GetJobURN(tnnt tenant.Tenant) string {
return fmt.Sprintf("urn:optimus:%s:job:%s.%s.%s", tnnt.ProjectName(), tnnt.ProjectName(), tnnt.NamespaceName(), n)
}

type Job struct {
ID uuid.UUID
Name JobName
Expand Down
10 changes: 5 additions & 5 deletions core/scheduler/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ type AlertAttrs struct {
}

type ReplayNotificationAttrs struct {
JobName string
ReplayID string
Tenant tenant.Tenant
JobURN string
EventType ReplayEventType
JobName string
ReplayID string
Tenant tenant.Tenant
JobURN string
State ReplayState
}

type WebhookAttrs struct {
Expand Down
10 changes: 2 additions & 8 deletions core/scheduler/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
)

const (
// ReplayCreated is an event which indicates the replay has been created but not picked up yet
ReplayCreated ReplayEventType = "replay_created"

// ReplayStateCreated is an initial state which indicates the replay has been created but not picked up yet
ReplayStateCreated ReplayState = "created"

Expand All @@ -23,6 +20,8 @@ const (
// ReplayStateSuccess is a terminal state which occurs when the replay execution finished with successful job runs
ReplayStateSuccess ReplayState = "success"

ReplayStateTimeout ReplayState = "timeout"

// ReplayStateFailed is a terminal state which occurs when the replay execution failed, timed out, or finished with one of the run fails
ReplayStateFailed ReplayState = "failed"

Expand All @@ -38,7 +37,6 @@ var (
)

type (
ReplayEventType string
ReplayState string // contract status for business layer
ReplayUserState string // contract status for presentation layer
)
Expand All @@ -60,10 +58,6 @@ func ReplayStateFromString(state string) (ReplayState, error) {
}
}

func (j ReplayEventType) String() string {
return string(j)
}

func (j ReplayState) String() string {
return string(j)
}
Expand Down
20 changes: 13 additions & 7 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ type ReplayService struct {
pluginToExecutionProjectKeyMap map[string]string
}

func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) {
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, tenant, jobName)
func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) {
jobCron, err := getJobCron(ctx, r.logger, r.jobRepo, t, jobName)
if err != nil {
r.logger.Error("unable to get cron value for job [%s]: %s", jobName.String(), err.Error())
return uuid.Nil, err
}

newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, tenant, jobName, config)
newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, t, jobName, config)
if err != nil {
r.logger.Error("unable to get namespace details for job %s: %s", jobName.String(), err)
return uuid.Nil, err
}
config.JobConfig = newConfig

replayReq := scheduler.NewReplayRequest(jobName, tenant, config, scheduler.ReplayStateCreated)
replayReq := scheduler.NewReplayRequest(jobName, t, config, scheduler.ReplayStateCreated)
if err := r.validator.Validate(ctx, replayReq, jobCron); err != nil {
r.logger.Error("error validating replay request: %s", err)
return uuid.Nil, err
Expand All @@ -100,13 +100,19 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant,
return uuid.Nil, err
}

jobReplayMetric.WithLabelValues(tenant.ProjectName().String(),
tenant.NamespaceName().String(),
jobReplayMetric.WithLabelValues(t.ProjectName().String(),
t.NamespaceName().String(),
jobName.String(),
replayReq.State().String(),
).Inc()

r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{})
r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: jobName.GetJobURN(t),
State: scheduler.ReplayStateCreated,
})

go r.executor.Execute(replayID, replayReq.Tenant(), jobName)

Expand Down
68 changes: 51 additions & 17 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ const (
)

type ReplayWorker struct {
logger log.Logger
replayRepo ReplayRepository
jobRepo JobRepository
scheduler ReplayScheduler
config config.ReplayConfig
logger log.Logger
replayRepo ReplayRepository
jobRepo JobRepository
scheduler ReplayScheduler
config config.ReplayConfig
alertManager AlertManager
}

type ReplayScheduler interface {
Expand All @@ -36,13 +37,14 @@ type ReplayScheduler interface {
GetJobRuns(ctx context.Context, t tenant.Tenant, criteria *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}

func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig) *ReplayWorker {
func NewReplayWorker(logger log.Logger, replayRepository ReplayRepository, jobRepo JobRepository, scheduler ReplayScheduler, cfg config.ReplayConfig, alertManager AlertManager) *ReplayWorker {
return &ReplayWorker{
logger: logger,
jobRepo: jobRepo,
replayRepo: replayRepository,
config: cfg,
scheduler: scheduler,
logger: logger,
jobRepo: jobRepo,
replayRepo: replayRepository,
config: cfg,
scheduler: scheduler,
alertManager: alertManager,
}
}

Expand Down Expand Up @@ -71,6 +73,21 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN
errMessage := err.Error()
if errors.Is(err, context.DeadlineExceeded) {
errMessage = "replay execution timed out"
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateTimeout,
})
} else {
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateFailed,
})
}
w.logger.Error("[ReplayID: %s] unable to execute replay for job [%s]: %s", replayID.String(), jobName.String(), errMessage)

Expand Down Expand Up @@ -100,20 +117,28 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
w.logger.Info("[ReplayID: %s] executing replay...", replayID)

// sync run first
storedReplayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID)
replayWithRun, err := w.replayRepo.GetReplayByID(ctx, replayID)
if err != nil {
w.logger.Error("[ReplayID: %s] unable to get existing runs: %s", replayID.String(), err)
return err
}
replayWithRun := storedReplayWithRun

if storedReplayWithRun.Replay.IsTerminated() {
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), storedReplayWithRun.Replay.State().String())
if replayWithRun.Replay.IsTerminated() {
t := replayWithRun.Replay.Tenant()
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: replayWithRun.Replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: replayWithRun.Replay.JobName().GetJobURN(t),
State: replayWithRun.Replay.State(),
})
w.logger.Info("[ReplayID: %s] replay is externally terminated with status [%s]", replayWithRun.Replay.ID().String(), replayWithRun.Replay.State().String())
return nil
}

incomingRuns, err := w.fetchRuns(ctx, replayWithRun, jobCron)
if err != nil {
// todo: lets not kill watchers on such errors
w.logger.Error("[ReplayID: %s] unable to get incoming runs: %s", replayWithRun.Replay.ID().String(), err)
return err
}
Expand All @@ -129,7 +154,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI

// check if replay request is on termination state
if syncedRunStatus.IsAllTerminated() {
return w.finishReplay(ctx, replayWithRun.Replay.ID(), syncedRunStatus, runStatusSummary)
return w.finishReplay(ctx, replayWithRun.Replay, syncedRunStatus, runStatusSummary)
}

// pick runs to be triggered
Expand Down Expand Up @@ -166,11 +191,20 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
}
}

func (w *ReplayWorker) finishReplay(ctx context.Context, replayID uuid.UUID, syncedRunStatus scheduler.JobRunStatusList, runStatusSummary string) error {
func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Replay, syncedRunStatus scheduler.JobRunStatusList, runStatusSummary string) error {
replayID := replay.ID()
replayState := scheduler.ReplayStateSuccess
if syncedRunStatus.IsAnyFailure() {
replayState = scheduler.ReplayStateFailed
}
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: replay.Tenant(),
JobURN: replay.JobName().GetJobURN(replay.Tenant()),
State: replayState,
})

msg := fmt.Sprintf("replay is finished with run status: %s", runStatusSummary)
w.logger.Info("[ReplayID: %s] replay finished with status %s", replayID, replayState)

Expand Down
Loading
Loading