Skip to content

Commit

Permalink
feat: Optimus spec based siren alerts (#299)
Browse files Browse the repository at this point in the history
* feat: optimus spec based siren alerts

* fix: pager alert label

* fix: support team overide for owner alerts

* fix: default severity warning

* fix : job success event names
  • Loading branch information
Mryashbhardwaj authored Nov 15, 2024
1 parent 13c6218 commit b4c2e42
Show file tree
Hide file tree
Showing 26 changed files with 840 additions and 688 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "526e657b03d243a4c9f880e6c4ffbe15b116afd5"
PROTON_COMMIT := "04fa895e5b1a1c79f2bb9c0cad63617062c55495"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
6 changes: 6 additions & 0 deletions client/local/model/job_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type JobSpecBehaviorNotifier struct {
On string `yaml:"on"`
Config map[string]string `yaml:"config"`
Channels []string `yaml:"channels"`
Severity string `yaml:"severity"`
Team string `yaml:"team"`
}

type WebhookEndpoint struct {
Expand Down Expand Up @@ -215,6 +217,8 @@ func (j *JobSpec) getProtoJobSpecBehavior() *pb.JobSpecification_Behavior {
On: pb.JobEvent_Type(pb.JobEvent_Type_value[utils.ToEnumProto(notify.On, "type")]),
Channels: notify.Channels,
Config: notify.Config,
Severity: notify.Severity,
Team: notify.Team,
}
}
}
Expand Down Expand Up @@ -602,6 +606,8 @@ func toJobSpecBehavior(protoBehavior *pb.JobSpecification_Behavior, dependsOnPas
On: utils.FromEnumProto(protoNotifier.On.String(), "type"),
Config: protoNotifier.Config,
Channels: protoNotifier.Channels,
Severity: protoNotifier.Severity,
Team: protoNotifier.Team,
}
notifiers = append(notifiers, notifier)
}
Expand Down
8 changes: 5 additions & 3 deletions config/config_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ type TelemetryConfig struct {
}

type AlertingConfig struct {
EventManager EventManagerConfig `mapstructure:"alert_manager"`
Dashboard string `mapstructure:"dashboard"`
DataConsole string `mapstructure:"data_console"`
EventManager EventManagerConfig `mapstructure:"alert_manager"`
Dashboard string `mapstructure:"dashboard"`
DataConsole string `mapstructure:"data_console"`
EnableSlack bool `mapstructure:"enable_slack"`
EnablePagerDuty bool `mapstructure:"enable_pager_duty"`
}

type EventManagerConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion core/job/handler/v1beta1/job_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func toAlerts(notifiers []*pb.JobSpecification_Behavior_Notifiers) ([]*job.Alert
if err != nil {
return nil, err
}
alertConfig, err := job.NewAlertSpec(alertOn, notify.Channels, config)
alertConfig, err := job.NewAlertSpec(alertOn, notify.Channels, config, notify.GetSeverity(), notify.GetTeam())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -350,6 +350,7 @@ func fromAlerts(jobAlerts []*job.AlertSpec) []*pb.JobSpecification_Behavior_Noti
On: pb.JobEvent_Type(pb.JobEvent_Type_value[utils.ToEnumProto(alert.On(), "type")]),
Channels: alert.Channels(),
Config: alert.Config(),
Severity: alert.Severity(),
})
}
return notifiers
Expand Down
1 change: 1 addition & 0 deletions core/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type AlertAttrs struct {
Tenant tenant.Tenant
EventTime time.Time
ChangeType ChangeType
Job *Spec
}

type UpdateImpact string
Expand Down
1 change: 1 addition & 0 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ func (j *JobService) raiseUpdateEvent(incomingJob *job.Job, impactType job.Updat
Tenant: incomingJob.Tenant(),
EventTime: time.Now(),
ChangeType: job.ChangeTypeUpdate,
Job: incomingJob.Spec(),
})
jobEvent, err := event.NewJobUpdateEvent(incomingJob, impactType)
if err != nil {
Expand Down
30 changes: 29 additions & 1 deletion core/job/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ type AlertSpec struct {

channels []string
config Config
severity string
team string
}

type WebhookEndPoint struct {
Expand All @@ -560,14 +562,32 @@ type WebhookSpec struct {
Endpoints []WebhookEndPoint
}

func NewAlertSpec(on string, channels []string, config Config) (*AlertSpec, error) {
const (
WarningSeverity = "WARNING"
CriticalSeverity = "CRITICAL"
InfoSeverity = "INFO"
DefaultSeverity = WarningSeverity
)

func getSeverity(severity string) string {
switch strings.ToUpper(severity) {
case WarningSeverity, InfoSeverity, CriticalSeverity:
return strings.ToUpper(severity)
default:
return DefaultSeverity
}
}

func NewAlertSpec(on string, channels []string, config Config, severity, team string) (*AlertSpec, error) {
if err := validateMap(config); err != nil {
return nil, err
}
return &AlertSpec{
on: on,
channels: channels,
config: config,
severity: getSeverity(severity),
team: team,
}, nil
}

Expand All @@ -583,6 +603,14 @@ func (a AlertSpec) Config() Config {
return a.config
}

func (a AlertSpec) Severity() string {
return a.severity
}

func (a AlertSpec) Team() string {
return a.team
}

// TODO: reconsider whether we still need it or not
type SpecHTTPUpstream struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion core/job/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestEntitySpec(t *testing.T) {
httpUpstreamHeader := map[string]string{"header-key": "sample-header-val"}
httpUpstream, _ := job.NewSpecHTTPUpstreamBuilder("sample-name", "sample-url").WithParams(httpUpstreamConfig).WithHeaders(httpUpstreamHeader).Build()
specUpstream, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{"job-d"}).WithSpecHTTPUpstream([]*job.SpecHTTPUpstream{httpUpstream}).Build()
alert, _ := job.NewAlertSpec("sla_miss", []string{"sample-channel"}, jobAlertConfig)
alert, _ := job.NewAlertSpec("sla_miss", []string{"sample-channel"}, jobAlertConfig, "CRITICAL", "")
assetMap := map[string]string{"key": "value"}
asset, _ := job.AssetFrom(assetMap)
resourceRequestConfig := job.NewMetadataResourceConfig("250m", "128Mi")
Expand Down
21 changes: 13 additions & 8 deletions core/scheduler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@ import (
)

type (
EventName string
EventStatus string
JobEventType string
JobEventCategory string
EventName string
EventStatus string
JobEventType string
EventCategory string
)

const (
EntityEvent = "event"

ISODateFormat = "2006-01-02T15:04:05Z"

EventCategorySLAMiss JobEventCategory = "sla_miss"
EventCategoryJobFailure JobEventCategory = "failure"
EventCategoryJobSuccess JobEventCategory = "success"
EventCategorySLAMiss EventCategory = "sla_miss"
EventCategoryJobFailure EventCategory = "failure"
EventCategoryJobSuccess EventCategory = "job_success"
EventCategoryReplay EventCategory = "replay_lifecycle"

SLAMissEvent JobEventType = "sla_miss"
JobFailureEvent JobEventType = "failure"
Expand Down Expand Up @@ -111,7 +112,11 @@ type Event struct {
SLAObjectList []*SLAObject
}

func (event JobEventType) IsOfType(category JobEventCategory) bool {
func (e EventCategory) String() string {
return string(e)
}

func (event JobEventType) IsOfType(category EventCategory) bool {
switch category {
case EventCategoryJobFailure:
if event == JobFailureEvent {
Expand Down
6 changes: 3 additions & 3 deletions core/scheduler/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ func TestFromStringToEventType(t *testing.T) {
assert.Equal(t, &outputObj, output)
})
})
t.Run("IsOfType JobEventCategory", func(t *testing.T) {
positiveExpectationMap := map[scheduler.JobEventType]scheduler.JobEventCategory{
t.Run("IsOfType EventCategory", func(t *testing.T) {
positiveExpectationMap := map[scheduler.JobEventType]scheduler.EventCategory{
scheduler.JobFailureEvent: scheduler.EventCategoryJobFailure,
scheduler.SLAMissEvent: scheduler.EventCategorySLAMiss,
}
for eventType, category := range positiveExpectationMap {
assert.True(t, eventType.IsOfType(category))
}
negativeExpectationMap := map[scheduler.JobEventType]scheduler.JobEventCategory{
negativeExpectationMap := map[scheduler.JobEventType]scheduler.EventCategory{
scheduler.SLAMissEvent: scheduler.EventCategoryJobFailure,
scheduler.SensorRetryEvent: scheduler.EventCategoryJobFailure,
scheduler.SensorSuccessEvent: scheduler.EventCategorySLAMiss,
Expand Down
6 changes: 4 additions & 2 deletions core/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ type Retry struct {
}

type Alert struct {
On JobEventCategory
On EventCategory
Channels []string
Config map[string]string
Severity string
Team string
}

type WebhookEndPoint struct {
Expand All @@ -202,7 +204,7 @@ type WebhookEndPoint struct {
}

type Webhook struct {
On JobEventCategory
On EventCategory
Endpoints []WebhookEndPoint
}

Expand Down
4 changes: 4 additions & 0 deletions core/scheduler/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type AlertAttrs struct {
SchedulerHost string
Status EventStatus
JobEvent *Event

JobWithDetails *JobWithDetails
}

type ReplayNotificationAttrs struct {
Expand All @@ -85,6 +87,8 @@ type ReplayNotificationAttrs struct {
Tenant tenant.Tenant
JobURN string
State ReplayState

JobWithDetails *JobWithDetails
}

type WebhookAttrs struct {
Expand Down
22 changes: 16 additions & 6 deletions core/scheduler/service/events_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func (e *EventsService) Relay(ctx context.Context, event *scheduler.Event) error
} else {
status = scheduler.StatusResolved
}

e.alertManager.SendJobRunEvent(&scheduler.AlertAttrs{
Owner: jobDetails.JobMetadata.Owner,
JobURN: jobDetails.Job.URN(),
Title: "Optimus Job Alert",
SchedulerHost: schedulerHost,
Status: status,
JobEvent: event,
Owner: jobDetails.JobMetadata.Owner,
JobURN: jobDetails.Job.URN(),
Title: "Optimus Job Alert",
SchedulerHost: schedulerHost,
Status: status,
JobEvent: event,
JobWithDetails: jobDetails,
})
return nil
}
Expand Down Expand Up @@ -131,6 +133,10 @@ func (e *EventsService) Webhook(ctx context.Context, event *scheduler.Event) err
}

func (e *EventsService) Push(ctx context.Context, event *scheduler.Event) error {
if !(event.Type.IsOfType(scheduler.EventCategoryJobFailure) || event.Type.IsOfType(scheduler.EventCategorySLAMiss)) {
return nil
}

jobDetails, err := e.jobRepo.GetJobDetails(ctx, event.Tenant.ProjectName(), event.JobName)
if err != nil {
e.l.Error("error getting detail for job [%s]: %s", event.JobName, err)
Expand All @@ -146,6 +152,10 @@ func (e *EventsService) Push(ctx context.Context, event *scheduler.Event) error
for _, channel := range notify.Channels {
chanParts := strings.Split(channel, "://")
scheme := chanParts[0]
if _, ok := e.notifyChannels[scheme]; !ok {
e.l.Warn("Scheme: %s, is not enabled", scheme)
continue
}
route := chanParts[1]

e.l.Debug("notification event for job: %s , event: %+v", event.JobName, event)
Expand Down
4 changes: 3 additions & 1 deletion core/scheduler/service/events_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNotificationService(t *testing.T) {
event := &scheduler.Event{
JobName: jobName,
Tenant: tnnt,
Type: scheduler.TaskStartEvent,
Type: scheduler.JobFailureEvent,
Values: map[string]any{},
}
err := notifyService.Push(ctx, event)
Expand Down Expand Up @@ -142,6 +142,8 @@ func TestNotificationService(t *testing.T) {
SchedulerHost: "localhost",
Status: scheduler.StatusFiring,
JobEvent: event,

JobWithDetails: &jobWithDetails,
})
tenantService := new(mockTenantService)
tenantWithDetails, _ := tenant.NewTenantDetails(project, namespace, []*tenant.PlainTextSecret{})
Expand Down
16 changes: 10 additions & 6 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,17 @@ func (r *ReplayService) CreateReplay(ctx context.Context, t tenant.Tenant, jobNa
jobName.String(),
replayReq.State().String(),
).Inc()

jobWithDetails, err := r.jobRepo.GetJobDetails(ctx, t.ProjectName(), jobName)
if err != nil {
return uuid.Nil, err
}
r.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: jobName.GetJobURN(t),
State: scheduler.ReplayStateCreated,
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: t,
JobURN: jobName.GetJobURN(t),
State: scheduler.ReplayStateCreated,
JobWithDetails: jobWithDetails,
})

go r.executor.Execute(replayID, replayReq.Tenant(), jobName) //nolint:contextcheck
Expand Down
25 changes: 21 additions & 4 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ func (w *ReplayWorker) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobN
errMessage := err.Error()
if errors.Is(err, context.DeadlineExceeded) {
errMessage = "replay execution timed out"
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
JobURN: jobName.GetJobURN(jobTenant),
State: scheduler.ReplayStateTimeout,
})
} else {
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: jobName.String(),
ReplayID: replayID.String(),
Tenant: jobTenant,
Expand Down Expand Up @@ -159,7 +159,7 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI

if replayWithRun.Replay.IsTerminated() {
t := replayWithRun.Replay.Tenant()
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{
w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: replayWithRun.Replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: t,
Expand Down Expand Up @@ -254,7 +254,8 @@ func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Repla
if syncedRunStatus.IsAnyFailure() {
replayState = scheduler.ReplayStateFailed
}
w.alertManager.SendReplayEvent(&scheduler.ReplayNotificationAttrs{

w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: replay.Tenant(),
Expand Down Expand Up @@ -432,3 +433,19 @@ func (w *ReplayWorker) getRequestsToProcess(ctx context.Context, replays []*sche
replayReqLag.Set(maxLag)
return requestsToProcess
}

func (w *ReplayWorker) sendReplayEvent(ctx context.Context, attr scheduler.ReplayNotificationAttrs) error {
jobName, err := scheduler.JobNameFrom(attr.JobName)
if err != nil {
w.logger.Error("[ReplayID: %s] unable adapt job name from %s", attr.ReplayID, attr.JobName)
return err
}
jobWithDetails, err := w.jobRepo.GetJobDetails(ctx, attr.Tenant.ProjectName(), jobName)
if err != nil {
w.logger.Error("[ReplayID: %s] unable get jobWithDetails for %s", attr.ReplayID, jobName)
return err
}
attr.JobWithDetails = jobWithDetails
w.alertManager.SendReplayEvent(&attr)
return nil
}
Loading

0 comments on commit b4c2e42

Please sign in to comment.