Skip to content

Commit

Permalink
notify: Create a snapshot of the alerts that require notification
Browse files Browse the repository at this point in the history
Make sure that the alerts passed to notifyFunc are consistent regardless
of retries and other delays.

Signed-off-by: Holger Hans Peter Freyther <[email protected]>
  • Loading branch information
zecke committed May 8, 2024
1 parent 6775ea6 commit 3202af3
Show file tree
Hide file tree
Showing 32 changed files with 260 additions and 211 deletions.
30 changes: 11 additions & 19 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (d *Dispatcher) Stop() {
// notifyFunc is a function that performs notification for the alert
// with the given fingerprint. It aborts on context cancelation.
// Returns false iff notifying failed.
type notifyFunc func(context.Context, ...*types.Alert) bool
type notifyFunc func(context.Context, ...*types.AlertSnapshot) bool

// processAlert determines in which aggregation group the alert falls
// and inserts it.
Expand Down Expand Up @@ -340,7 +340,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// alert is already there.
ag.insert(alert)

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
go ag.run(func(ctx context.Context, alerts ...*types.AlertSnapshot) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
Expand Down Expand Up @@ -454,7 +454,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ag.hasFlushed = true
ag.mtx.Unlock()

ag.flush(func(alerts ...*types.Alert) bool {
ag.flush(func(alerts ...*types.AlertSnapshot) bool {
return nf(ctx, alerts...)
})

Expand Down Expand Up @@ -493,28 +493,20 @@ func (ag *aggrGroup) empty() bool {
}

// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
func (ag *aggrGroup) flush(notify func(...*types.AlertSnapshot) bool) {
if ag.empty() {
return
}

var (
alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, len(alerts))
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now()
)
for _, alert := range alerts {
a := *alert
// Ensure that alerts don't resolve as time move forwards.
if a.ResolvedAt(now) {
resolvedSlice = append(resolvedSlice, &a)
} else {
a.EndsAt = time.Time{}
alertsSlice := types.SnapshotAlerts(ag.alerts.List(), time.Now())
sort.Stable(alertsSlice)

resolvedSlice := make(types.AlertSlice, 0, len(alertsSlice))
for _, alert := range alertsSlice {
if alert.Resolved() {
resolvedSlice = append(resolvedSlice, &alert.Alert)
}
alertsSlice = append(alertsSlice, &a)
}
sort.Stable(alertsSlice)

level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))

Expand Down
51 changes: 20 additions & 31 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func TestAggrGroup(t *testing.T) {
last = time.Now()
current = time.Now()
lastCurMtx = &sync.Mutex{}
alertsCh = make(chan types.AlertSlice)
alertsCh = make(chan types.AlertsSnapshot)
)

ntfy := func(ctx context.Context, alerts ...*types.Alert) bool {
ntfy := func(ctx context.Context, alerts ...*types.AlertSnapshot) bool {
// Validate that the context is properly populated.
if _, ok := notify.Now(ctx); !ok {
t.Errorf("now missing")
Expand All @@ -123,20 +123,11 @@ func TestAggrGroup(t *testing.T) {
current = time.Now().Add(-time.Millisecond)
lastCurMtx.Unlock()

alertsCh <- types.AlertSlice(alerts)
alertsCh <- types.AlertsSnapshot(alerts)

return true
}

removeEndsAt := func(as types.AlertSlice) types.AlertSlice {
for i, a := range as {
ac := *a
ac.EndsAt = time.Time{}
as[i] = &ac
}
return as
}

// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
go ag.run(ntfy)
Expand All @@ -154,12 +145,11 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupWait {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1})
exp := types.SnapshotAlerts(types.AlertSlice{a1}, batch[0].SnapshotTime())
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
require.Equal(t, exp, batch)

}

for i := 0; i < 3; i++ {
Expand All @@ -177,7 +167,7 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1, a3})
exp := types.SnapshotAlerts(types.AlertSlice{a1, a3}, batch[0].SnapshotTime())
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand All @@ -204,7 +194,7 @@ func TestAggrGroup(t *testing.T) {
t.Fatalf("expected immediate alert but received none")

case batch := <-alertsCh:
exp := removeEndsAt(types.AlertSlice{a1, a2})
exp := types.SnapshotAlerts(types.AlertSlice{a1, a2}, batch[0].SnapshotTime())
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand All @@ -227,7 +217,7 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := removeEndsAt(types.AlertSlice{a1, a2, a3})
exp := types.SnapshotAlerts(types.AlertSlice{a1, a2, a3}, batch[0].SnapshotTime())
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand All @@ -240,7 +230,6 @@ func TestAggrGroup(t *testing.T) {
a1r := *a1
a1r.EndsAt = time.Now()
ag.insert(&a1r)
exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...)

select {
case <-time.After(2 * opts.GroupInterval):
Expand All @@ -252,6 +241,7 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := types.SnapshotAlerts(types.AlertSlice{&a1r, a2, a3}, batch[0].SnapshotTime())
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand Down Expand Up @@ -281,10 +271,9 @@ func TestAggrGroup(t *testing.T) {
}
sort.Sort(batch)

if !reflect.DeepEqual(batch, resolved) {
t.Fatalf("expected alerts %v but got %v", resolved, batch)
}
exp := types.SnapshotAlerts(resolved, batch[0].SnapshotTime())

require.Equal(t, exp, batch)
if !ag.empty() {
t.Fatalf("Expected aggregation group to be empty after resolving alerts: %v", ag)
}
Expand Down Expand Up @@ -397,7 +386,7 @@ route:
defer alerts.Close()

timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.AlertSnapshot)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
go dispatcher.Run()
defer dispatcher.Stop()
Expand Down Expand Up @@ -535,7 +524,7 @@ route:
defer alerts.Close()

timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.AlertSnapshot)}
lim := limits{groups: 6}
m := NewDispatcherMetrics(true, prometheus.NewRegistry())
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m)
Expand Down Expand Up @@ -594,13 +583,13 @@ route:

type recordStage struct {
mtx sync.RWMutex
alerts map[string]map[model.Fingerprint]*types.Alert
alerts map[string]map[model.Fingerprint]*types.AlertSnapshot
}

func (r *recordStage) Alerts() []*types.Alert {
func (r *recordStage) Alerts() []*types.AlertSnapshot {
r.mtx.RLock()
defer r.mtx.RUnlock()
alerts := make([]*types.Alert, 0)
alerts := make([]*types.AlertSnapshot, 0)
for k := range r.alerts {
for _, a := range r.alerts[k] {
alerts = append(alerts, a)
Expand All @@ -609,15 +598,15 @@ func (r *recordStage) Alerts() []*types.Alert {
return alerts
}

func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.AlertSnapshot) (context.Context, []*types.AlertSnapshot, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
gk, ok := notify.GroupKey(ctx)
if !ok {
panic("GroupKey not present!")
}
if _, ok := r.alerts[gk]; !ok {
r.alerts[gk] = make(map[model.Fingerprint]*types.Alert)
r.alerts[gk] = make(map[model.Fingerprint]*types.AlertSnapshot)
}
for _, a := range alerts {
r.alerts[gk][a.Fingerprint()] = a
Expand Down Expand Up @@ -683,7 +672,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T)
}

timeout := func(d time.Duration) time.Duration { return d }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.AlertSnapshot)}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
go dispatcher.Run()
defer dispatcher.Stop()
Expand Down
4 changes: 2 additions & 2 deletions notify/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ type webhookEmbed struct {
}

// Notify implements the Notifier interface.
func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
func (n *Notifier) Notify(ctx context.Context, as ...*types.AlertSnapshot) (bool, error) {
key, err := notify.ExtractGroupKey(ctx)
if err != nil {
return false, err
}

level.Debug(n.logger).Log("incident", key)

alerts := types.Alerts(as...)
alerts := types.Snapshot(as...)
data := notify.GetTemplateData(ctx, n.tmpl, as, n.logger)
tmpl := notify.TmplText(n.tmpl, data, &err)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions notify/discord/discord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ func TestDiscordTemplating(t *testing.T) {
ctx := context.Background()
ctx = notify.WithGroupKey(ctx, "1")

ok, err := pd.Notify(ctx, []*types.Alert{
{
ok, err := pd.Notify(ctx, []*types.AlertSnapshot{
types.NewAlertSnapshot(&types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"lbl1": "val1",
},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
},
}, time.Now()),
}...)
if tc.errMsg == "" {
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion notify/email/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (n *Email) auth(mechs string) (smtp.Auth, error) {
}

// Notify implements the Notifier interface.
func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
func (n *Email) Notify(ctx context.Context, as ...*types.AlertSnapshot) (bool, error) {
var (
c *smtp.Client
conn net.Conn
Expand Down
4 changes: 2 additions & 2 deletions notify/email/email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ func notifyEmailWithContext(ctx context.Context, cfg *config.EmailConfig, server
if cfg.Headers == nil {
cfg.Headers = make(map[string]string)
}
firingAlert := &types.Alert{
firingAlert := types.NewAlertSnapshot(&types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
}
}, time.Now())
err := server.deleteAllEmails()
if err != nil {
return nil, false, err
Expand Down
4 changes: 2 additions & 2 deletions notify/msteams/msteams.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func New(c *config.MSTeamsConfig, t *template.Template, l log.Logger, httpOpts .
return n, nil
}

func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
func (n *Notifier) Notify(ctx context.Context, as ...*types.AlertSnapshot) (bool, error) {
key, err := notify.ExtractGroupKey(ctx)
if err != nil {
return false, err
Expand All @@ -107,7 +107,7 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error)
return false, err
}

alerts := types.Alerts(as...)
alerts := types.Snapshot(as...)
color := colorGrey
switch alerts.Status() {
case model.AlertFiring:
Expand Down
10 changes: 5 additions & 5 deletions notify/msteams/msteams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ func TestMSTeamsTemplating(t *testing.T) {
ctx := context.Background()
ctx = notify.WithGroupKey(ctx, "1")

ok, err := pd.Notify(ctx, []*types.Alert{
{
ok, err := pd.Notify(ctx, []*types.AlertSnapshot{
types.NewAlertSnapshot(&types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"lbl1": "val1",
},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
},
}, time.Now()),
}...)
if tc.errMsg == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -176,12 +176,12 @@ func TestNotifier_Notify_WithReason(t *testing.T) {
ctx := context.Background()
ctx = notify.WithGroupKey(ctx, "1")

alert1 := &types.Alert{
alert1 := types.NewAlertSnapshot(&types.Alert{
Alert: model.Alert{
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
}
}, time.Now())
_, err = notifier.Notify(ctx, alert1)
if tt.noError {
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 3202af3

Please sign in to comment.