Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Oct 17, 2023
1 parent 665c1b0 commit 1335a7f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 51 deletions.
17 changes: 9 additions & 8 deletions alertobserver/alertobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
)

const (
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertMuted string = "muted"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertMuted string = "muted"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
)

type AlertEventMeta map[string]interface{}
Expand Down
5 changes: 5 additions & 0 deletions alertobserver/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
package alertobserver

import (
"sync"

"github.com/prometheus/alertmanager/types"
)

type FakeLifeCycleObserver struct {
AlertsPerEvent map[string][]*types.Alert
PipelineStageAlerts map[string][]*types.Alert
MetaPerEvent map[string][]AlertEventMeta
Mtx sync.RWMutex
}

func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) {
o.Mtx.Lock()
defer o.Mtx.Unlock()
if event == EventAlertPipelinePassStage {
o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...)
} else {
Expand Down
6 changes: 5 additions & 1 deletion dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,11 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
errMsg := "Too many aggregation groups, cannot create new group for alert"
level.Error(d.logger).Log("msg", errMsg, "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
if d.alertLCObserver != nil {
d.alertLCObserver.Observe(alertobserver.EventAlertFailedAddToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"msg": errMsg})
}
return
}

Expand Down
27 changes: 20 additions & 7 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,27 +596,40 @@ route:
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
m := NewDispatcherMetrics(true, prometheus.NewRegistry())
observer := alertobserver.NewFakeLifeCycleObserver()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, m, observer)
lim := limits{groups: 1}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, observer)
go dispatcher.Run()
defer dispatcher.Stop()

// Create alerts. the dispatcher will automatically create the groups.
inputAlerts := []*types.Alert{
newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
}
err = alerts.Put(inputAlerts...)
alert1 := newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"})
alert2 := newAlert(model.LabelSet{"alertname": "YetAnotherAlert", "cluster": "cc", "service": "db"})
err = alerts.Put(alert1)
if err != nil {
t.Fatal(err)
}
// Let alerts get processed.
for i := 0; len(recorder.Alerts()) != 1 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
err = alerts.Put(alert2)
if err != nil {
t.Fatal(err)
}
// Let alert get processed.
for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
observer.Mtx.RLock()
defer observer.Mtx.RUnlock()
require.Equal(t, 1, len(recorder.Alerts()))
require.Equal(t, inputAlerts[0].Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint())
groupFp := getGroupLabels(inputAlerts[0], route).Fingerprint()
require.Equal(t, alert1.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint())
groupFp := getGroupLabels(alert1, route).Fingerprint()
groupKey := dispatcher.aggrGroupsPerRoute[route][groupFp].GroupKey()
require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string))

require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup]))
require.Equal(t, alert2.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup][0].Fingerprint())
}

type recordStage struct {
Expand Down
58 changes: 23 additions & 35 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,19 +757,34 @@ func NewRetryStage(i Integration, groupName string, metrics *Metrics, o alertobs

func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
r.metrics.numNotifications.WithLabelValues(r.labelValues...).Inc()
ctx, alerts, err := r.exec(ctx, l, alerts...)
ctx, alerts, sent, err := r.exec(ctx, l, alerts...)

failureReason := DefaultReason.String()
if err != nil {
if e, ok := errors.Cause(err).(*ErrorWithReason); ok {
failureReason = e.Reason.String()
}
r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.labelValues, failureReason)...).Inc()
if r.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m)
}
} else if r.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSent, sent, m)
}
return ctx, alerts, err
}

func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, []*types.Alert, error) {
var sent []*types.Alert

// If we shouldn't send notifications for resolved alerts, but there are only
Expand All @@ -778,10 +793,10 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
if !r.integration.SendResolved() {
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, errors.New("firing alerts missing")
return ctx, nil, nil, errors.New("firing alerts missing")
}
if len(firing) == 0 {
return ctx, alerts, nil
return ctx, alerts, sent, nil
}
for _, a := range alerts {
if a.Status() != model.AlertResolved {
Expand Down Expand Up @@ -817,7 +832,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
iErr = ctx.Err()
}

return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
return ctx, nil, sent, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
default:
}

Expand All @@ -831,16 +846,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
if err != nil {
r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc()
if !retry {
if r.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"msg": "Unrecoverable error",
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m)
}
return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i)
return ctx, alerts, sent, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i)
}
if ctx.Err() == nil && (iErr == nil || err.Error() != iErr.Error()) {
// Log the error if the context isn't done and the error isn't the same as before.
Expand All @@ -856,32 +862,14 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
lvl = level.Debug(log.With(l, "alerts", fmt.Sprintf("%v", alerts)))
}

if r.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSent, alerts, m)
}
lvl.Log("msg", "Notify success", "attempts", i, "duration", dur)

return ctx, alerts, nil
return ctx, alerts, sent, nil
}
case <-ctx.Done():
if iErr == nil {
iErr = ctx.Err()
}
if r.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"msg": "Retry canceled",
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m)
}
return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
return ctx, nil, sent, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
}
}
}
Expand Down

0 comments on commit 1335a7f

Please sign in to comment.