From 8317e83a16aa7f7816a5f8224d2447224c5ffbce Mon Sep 17 00:00:00 2001 From: Santiago Date: Thu, 19 Jan 2023 15:36:35 -0300 Subject: [PATCH] Capture and expose notification delivery errors (#31) This PR makes it possible to store the last error for each receiver in case of notification delivery failure. These errors are exposed via the `/api/v2/receivers` endpoint. Co-authored-by: gotjosh --- .gitignore | 4 ++ api/api.go | 5 +- api/v2/api.go | 47 ++++++++++-- api/v2/models/integration.go | 122 ++++++++++++++++++++++++++++++++ api/v2/models/receiver.go | 51 +++++++++++++ api/v2/openapi.yaml | 28 ++++++++ api/v2/restapi/embedded_spec.go | 82 ++++++++++++++++++++- cmd/alertmanager/main.go | 30 +++++--- cmd/alertmanager/main_test.go | 4 +- notify/notify.go | 57 ++++++++++----- notify/notify_test.go | 63 ++++++++++------- notify/receiver.go | 42 +++++++++++ 12 files changed, 469 insertions(+), 66 deletions(-) create mode 100644 api/v2/models/integration.go create mode 100644 notify/receiver.go diff --git a/.gitignore b/.gitignore index aa4b780003..ccb1b22aeb 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,7 @@ !/.travis.yml !/.promu.yml !/api/v2/openapi.yaml + +# Editor +.vscode +.DS_Store diff --git a/api/api.go b/api/api.go index 59bfb76da6..14c7533e8f 100644 --- a/api/api.go +++ b/api/api.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/dispatch" + "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/silence" "github.com/prometheus/alertmanager/types" @@ -195,9 +196,9 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux { // Update config and resolve timeout of each API. APIv2 also needs // setAlertStatus to be updated. -func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) { +func (api *API) Update(cfg *config.Config, receivers []*notify.Receiver, setAlertStatus func(model.LabelSet)) { api.v1.Update(cfg) - api.v2.Update(cfg, setAlertStatus) + api.v2.Update(cfg, setAlertStatus, receivers) } func (api *API) limitHandler(h http.Handler) http.Handler { diff --git a/api/v2/api.go b/api/v2/api.go index 1ddb2bcbae..1c6be58f84 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -32,7 +32,6 @@ import ( "github.com/prometheus/common/version" "github.com/rs/cors" - "github.com/prometheus/alertmanager/api/metrics" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/restapi" "github.com/prometheus/alertmanager/api/v2/restapi/operations" @@ -41,9 +40,12 @@ import ( general_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/general" receiver_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/receiver" silence_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/silence" + + "github.com/prometheus/alertmanager/api/metrics" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/dispatch" + "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/pkg/labels" "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/silence" @@ -71,7 +73,8 @@ type API struct { logger log.Logger m *metrics.Alerts - Handler http.Handler + Handler http.Handler + receivers []*notify.Receiver } type ( @@ -153,13 +156,14 @@ func (api *API) requestLogger(req *http.Request) log.Logger { } // Update sets the API struct members that may change between reloads of alertmanager. -func (api *API) Update(cfg *config.Config, setAlertStatus setAlertStatusFn) { +func (api *API) Update(cfg *config.Config, setAlertStatus setAlertStatusFn, receivers []*notify.Receiver) { api.mtx.Lock() defer api.mtx.Unlock() api.alertmanagerConfig = cfg api.route = dispatch.NewRoute(cfg.Route, nil) api.setAlertStatus = setAlertStatus + api.receivers = receivers } func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.Responder { @@ -220,11 +224,40 @@ func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware. func (api *API) getReceiversHandler(params receiver_ops.GetReceiversParams) middleware.Responder { api.mtx.RLock() - defer api.mtx.RUnlock() + configReceivers := api.receivers + api.mtx.RUnlock() + + receivers := make([]*open_api_models.Receiver, 0, len(configReceivers)) + for _, r := range configReceivers { + integrations := make([]*open_api_models.Integration, 0, len(r.Integrations())) + + for _, integration := range r.Integrations() { + notify, duration, err := integration.GetReport() + iname := integration.String() + sendResolved := integration.SendResolved() + integrations = append(integrations, &open_api_models.Integration{ + Name: &iname, + SendResolved: &sendResolved, + LastNotifyAttempt: strfmt.DateTime(notify.UTC()), + LastNotifyAttemptDuration: duration.String(), + LastNotifyAttemptError: func() string { + if err != nil { + return err.Error() + } + return "" + }(), + }) + } + + rName := r.Name() + active := r.Active() + model := &open_api_models.Receiver{ + Name: &rName, + Active: &active, + Integrations: integrations, + } - receivers := make([]*open_api_models.Receiver, 0, len(api.alertmanagerConfig.Receivers)) - for i := range api.alertmanagerConfig.Receivers { - receivers = append(receivers, &open_api_models.Receiver{Name: &api.alertmanagerConfig.Receivers[i].Name}) + receivers = append(receivers, model) } return receiver_ops.NewGetReceiversOK().WithPayload(receivers) diff --git a/api/v2/models/integration.go b/api/v2/models/integration.go new file mode 100644 index 0000000000..b3f8f2a9a8 --- /dev/null +++ b/api/v2/models/integration.go @@ -0,0 +1,122 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// Copyright Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "github.com/go-openapi/errors" + "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" + "github.com/go-openapi/validate" +) + +// Integration integration +// +// swagger:model integration +type Integration struct { + + // A timestamp indicating the last attempt to deliver a notification regardless of the outcome. + // Format: date-time + LastNotifyAttempt strfmt.DateTime `json:"lastNotifyAttempt,omitempty"` + + // Duration of the last attempt to deliver a notification in humanized format (`1s` or `15ms`, etc). + LastNotifyAttemptDuration string `json:"lastNotifyAttemptDuration,omitempty"` + + // Error string for the last attempt to deliver a notification. Empty if the last attempt was successful. + LastNotifyAttemptError string `json:"lastNotifyAttemptError,omitempty"` + + // name + // Required: true + Name *string `json:"name"` + + // send resolved + // Required: true + SendResolved *bool `json:"sendResolved"` +} + +// Validate validates this integration +func (m *Integration) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateLastNotifyAttempt(formats); err != nil { + res = append(res, err) + } + + if err := m.validateName(formats); err != nil { + res = append(res, err) + } + + if err := m.validateSendResolved(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *Integration) validateLastNotifyAttempt(formats strfmt.Registry) error { + + if swag.IsZero(m.LastNotifyAttempt) { // not required + return nil + } + + if err := validate.FormatOf("lastNotifyAttempt", "body", "date-time", m.LastNotifyAttempt.String(), formats); err != nil { + return err + } + + return nil +} + +func (m *Integration) validateName(formats strfmt.Registry) error { + + if err := validate.Required("name", "body", m.Name); err != nil { + return err + } + + return nil +} + +func (m *Integration) validateSendResolved(formats strfmt.Registry) error { + + if err := validate.Required("sendResolved", "body", m.SendResolved); err != nil { + return err + } + + return nil +} + +// MarshalBinary interface implementation +func (m *Integration) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *Integration) UnmarshalBinary(b []byte) error { + var res Integration + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} diff --git a/api/v2/models/receiver.go b/api/v2/models/receiver.go index 8e1bf9ee45..bc089bd12f 100644 --- a/api/v2/models/receiver.go +++ b/api/v2/models/receiver.go @@ -20,6 +20,7 @@ package models // Editing this file might prove futile when you re-run the swagger generate command import ( + "strconv" "context" "github.com/go-openapi/errors" @@ -33,6 +34,14 @@ import ( // swagger:model receiver type Receiver struct { + // active + // Required: true + Active *bool `json:"active"` + + // integrations + // Required: true + Integrations []*Integration `json:"integrations"` + // name // Required: true Name *string `json:"name"` @@ -42,6 +51,14 @@ type Receiver struct { func (m *Receiver) Validate(formats strfmt.Registry) error { var res []error + if err := m.validateActive(formats); err != nil { + res = append(res, err) + } + + if err := m.validateIntegrations(formats); err != nil { + res = append(res, err) + } + if err := m.validateName(formats); err != nil { res = append(res, err) } @@ -52,6 +69,40 @@ func (m *Receiver) Validate(formats strfmt.Registry) error { return nil } +func (m *Receiver) validateActive(formats strfmt.Registry) error { + + if err := validate.Required("active", "body", m.Active); err != nil { + return err + } + + return nil +} + +func (m *Receiver) validateIntegrations(formats strfmt.Registry) error { + + if err := validate.Required("integrations", "body", m.Integrations); err != nil { + return err + } + + for i := 0; i < len(m.Integrations); i++ { + if swag.IsZero(m.Integrations[i]) { // not required + continue + } + + if m.Integrations[i] != nil { + if err := m.Integrations[i].Validate(formats); err != nil { + if ve, ok := err.(*errors.Validation); ok { + return ve.ValidateName("integrations" + "." + strconv.Itoa(i)) + } + return err + } + } + + } + + return nil +} + func (m *Receiver) validateName(formats strfmt.Registry) error { if err := validate.Required("name", "body", m.Name); err != nil { diff --git a/api/v2/openapi.yaml b/api/v2/openapi.yaml index 801edf00bb..90853c99fb 100644 --- a/api/v2/openapi.yaml +++ b/api/v2/openapi.yaml @@ -508,8 +508,36 @@ definitions: properties: name: type: string + active: + type: boolean + integrations: + type: array + items: + $ref: '#/definitions/integration' + required: + - name + - active + - integrations + integration: + type: object + properties: + name: + type: string + sendResolved: + type: boolean + lastNotifyAttempt: + description: A timestamp indicating the last attempt to deliver a notification regardless of the outcome. + type: string + format: date-time + lastNotifyAttemptDuration: + description: Duration of the last attempt to deliver a notification in humanized format (`1s` or `15ms`, etc). + type: string + lastNotifyAttemptError: + description: Error string for the last attempt to deliver a notification. Empty if the last attempt was successful. + type: string required: - name + - sendResolved labelSet: type: object additionalProperties: diff --git a/api/v2/restapi/embedded_spec.go b/api/v2/restapi/embedded_spec.go index caf371d884..91eb4343e4 100644 --- a/api/v2/restapi/embedded_spec.go +++ b/api/v2/restapi/embedded_spec.go @@ -602,6 +602,34 @@ func init() { "$ref": "#/definitions/gettableSilence" } }, + "integration": { + "type": "object", + "required": [ + "name", + "sendResolved" + ], + "properties": { + "lastNotifyAttempt": { + "description": "A timestamp indicating the last attempt to deliver a notification regardless of the outcome.", + "type": "string", + "format": "date-time" + }, + "lastNotifyAttemptDuration": { + "description": "Duration of the last attempt to deliver a notification in humanized format (` + "`" + `1s` + "`" + ` or ` + "`" + `15ms` + "`" + `, etc).", + "type": "string" + }, + "lastNotifyAttemptError": { + "description": "Error string for the last attempt to deliver a notification. Empty if the last attempt was successful.", + "type": "string" + }, + "name": { + "type": "string" + }, + "sendResolved": { + "type": "boolean" + } + } + }, "labelSet": { "type": "object", "additionalProperties": { @@ -700,9 +728,20 @@ func init() { "receiver": { "type": "object", "required": [ - "name" + "name", + "active", + "integrations" ], "properties": { + "active": { + "type": "boolean" + }, + "integrations": { + "type": "array", + "items": { + "$ref": "#/definitions/integration" + } + }, "name": { "type": "string" } @@ -1419,6 +1458,34 @@ func init() { "$ref": "#/definitions/gettableSilence" } }, + "integration": { + "type": "object", + "required": [ + "name", + "sendResolved" + ], + "properties": { + "lastNotifyAttempt": { + "description": "A timestamp indicating the last attempt to deliver a notification regardless of the outcome.", + "type": "string", + "format": "date-time" + }, + "lastNotifyAttemptDuration": { + "description": "Duration of the last attempt to deliver a notification in humanized format (` + "`" + `1s` + "`" + ` or ` + "`" + `15ms` + "`" + `, etc).", + "type": "string" + }, + "lastNotifyAttemptError": { + "description": "Error string for the last attempt to deliver a notification. Empty if the last attempt was successful.", + "type": "string" + }, + "name": { + "type": "string" + }, + "sendResolved": { + "type": "boolean" + } + } + }, "labelSet": { "type": "object", "additionalProperties": { @@ -1517,9 +1584,20 @@ func init() { "receiver": { "type": "object", "required": [ - "name" + "name", + "active", + "integrations" ], "properties": { + "active": { + "type": "boolean" + }, + "integrations": { + "type": "array", + "items": { + "$ref": "#/definitions/integration" + } + }, "name": { "type": "string" } diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index f58ec1d01b..a22bdbdaab 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -132,10 +132,10 @@ const defaultClusterAddr = "0.0.0.0:9094" // buildReceiverIntegrations builds a list of integration notifiers off of a // receiver config. -func buildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, logger log.Logger) ([]notify.Integration, error) { +func buildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, logger log.Logger) ([]*notify.Integration, error) { var ( errs types.MultiError - integrations []notify.Integration + integrations []*notify.Integration add = func(name string, i int, rs notify.ResolvedSender, f func(l log.Logger) (notify.Notifier, error)) { n, err := f(log.With(logger, "integration", name)) if err != nil { @@ -437,26 +437,26 @@ func run() int { // Build the routing tree and record which receivers are used. routes := dispatch.NewRoute(conf.Route, nil) - activeReceivers := make(map[string]struct{}) + activeReceiversMap := make(map[string]struct{}) routes.Walk(func(r *dispatch.Route) { - activeReceivers[r.RouteOpts.Receiver] = struct{}{} + activeReceiversMap[r.RouteOpts.Receiver] = struct{}{} }) // Build the map of receiver to integrations. - receivers := make(map[string][]notify.Integration, len(activeReceivers)) + receivers := make([]*notify.Receiver, 0, len(activeReceiversMap)) var integrationsNum int for _, rcv := range conf.Receivers { - if _, found := activeReceivers[rcv.Name]; !found { + if _, found := activeReceiversMap[rcv.Name]; !found { // No need to build a receiver if no route is using it. level.Info(configLogger).Log("msg", "skipping creation of receiver not referenced by any route", "receiver", rcv.Name) + receivers = append(receivers, notify.NewReceiver(rcv.Name, false, nil)) continue } integrations, err := buildReceiverIntegrations(rcv, tmpl, logger) if err != nil { return err } - // rcv.Name is guaranteed to be unique across all receivers. - receivers[rcv.Name] = integrations + receivers = append(receivers, notify.NewReceiver(rcv.Name, true, integrations)) integrationsNum += len(integrations) } @@ -484,8 +484,16 @@ func run() int { pipelinePeer = peer } + activeReceivers := make([]*notify.Receiver, 0, len(receivers)) + for i := range receivers { + if !receivers[i].Active() { + continue + } + activeReceivers = append(activeReceivers, receivers[i]) + } + pipeline := pipelineBuilder.New( - receivers, + activeReceivers, waitFunc, inhibitor, silencer, @@ -493,10 +501,10 @@ func run() int { notificationLog, pipelinePeer, ) - configuredReceivers.Set(float64(len(activeReceivers))) + configuredReceivers.Set(float64(len(activeReceiversMap))) configuredIntegrations.Set(float64(integrationsNum)) - api.Update(conf, func(labels model.LabelSet) { + api.Update(conf, receivers, func(labels model.LabelSet) { inhibitor.Mutes(labels) silencer.Mutes(labels) }) diff --git a/cmd/alertmanager/main_test.go b/cmd/alertmanager/main_test.go index bc2a450d6f..2bfc9d245f 100644 --- a/cmd/alertmanager/main_test.go +++ b/cmd/alertmanager/main_test.go @@ -33,7 +33,7 @@ func TestBuildReceiverIntegrations(t *testing.T) { for _, tc := range []struct { receiver config.Receiver err bool - exp []notify.Integration + exp []*notify.Integration }{ { receiver: config.Receiver{ @@ -50,7 +50,7 @@ func TestBuildReceiverIntegrations(t *testing.T) { }, }, }, - exp: []notify.Integration{ + exp: []*notify.Integration{ notify.NewIntegration(nil, sendResolved(false), "webhook", 0), notify.NewIntegration(nil, sendResolved(true), "webhook", 1), }, diff --git a/notify/notify.go b/notify/notify.go index b5df8796cf..9fa438557c 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -65,11 +65,16 @@ type Integration struct { rs ResolvedSender name string idx int + + mtx sync.RWMutex + lastNotifyAttempt time.Time + lastNotifyAttemptDuration model.Duration + lastNotifyAttemptError error } // NewIntegration returns a new integration. -func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int) Integration { - return Integration{ +func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int) *Integration { + return &Integration{ notifier: notifier, rs: rs, name: name, @@ -97,6 +102,22 @@ func (i *Integration) Index() int { return i.idx } +func (i *Integration) Report(start time.Time, duration model.Duration, notifyError error) { + i.mtx.Lock() + defer i.mtx.Unlock() + + i.lastNotifyAttempt = start + i.lastNotifyAttemptDuration = duration + i.lastNotifyAttemptError = notifyError +} + +func (i *Integration) GetReport() (time.Time, model.Duration, error) { + i.mtx.RLock() + defer i.mtx.RUnlock() + + return i.lastNotifyAttempt, i.lastNotifyAttemptDuration, i.lastNotifyAttemptError +} + // String implements the Stringer interface. func (i *Integration) String() string { return fmt.Sprintf("%s[%d]", i.name, i.idx) @@ -325,7 +346,7 @@ func NewPipelineBuilder(r prometheus.Registerer) *PipelineBuilder { // New returns a map of receivers to Stages. func (pb *PipelineBuilder) New( - receivers map[string][]Integration, + receivers []*Receiver, wait func() time.Duration, inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, @@ -341,32 +362,31 @@ func (pb *PipelineBuilder) New( tms := NewTimeMuteStage(times) ss := NewMuteStage(silencer) - for name := range receivers { - st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) - rs[name] = MultiStage{ms, is, tas, tms, ss, st} + for _, r := range receivers { + st := createReceiverStage(r, wait, notificationLog, pb.metrics) + rs[r.groupName] = MultiStage{ms, is, tas, tms, ss, st} } return rs } // createReceiverStage creates a pipeline of stages for a receiver. func createReceiverStage( - name string, - integrations []Integration, + receiver *Receiver, wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, ) Stage { var fs FanoutStage - for i := range integrations { + for i := range receiver.integrations { recv := &nflogpb.Receiver{ - GroupName: name, - Integration: integrations[i].Name(), - Idx: uint32(integrations[i].Index()), + GroupName: receiver.groupName, + Integration: receiver.integrations[i].Name(), + Idx: uint32(receiver.integrations[i].Index()), } var s MultiStage s = append(s, NewWaitStage(wait)) - s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) - s = append(s, NewRetryStage(integrations[i], name, metrics)) + s = append(s, NewDedupStage(receiver.integrations[i], notificationLog, recv)) + s = append(s, NewRetryStage(receiver.integrations[i], receiver.groupName, metrics)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) fs = append(fs, s) @@ -652,13 +672,13 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { - integration Integration + integration *Integration groupName string metrics *Metrics } // NewRetryStage returns a new instance of a RetryStage. -func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { +func NewRetryStage(i *Integration, groupName string, metrics *Metrics) *RetryStage { return &RetryStage{ integration: i, groupName: groupName, @@ -736,8 +756,11 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale case <-tick.C: now := time.Now() retry, err := r.integration.Notify(ctx, sent...) - r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds()) + duration := time.Since(now) + + r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(duration.Seconds()) r.metrics.numNotificationRequestsTotal.WithLabelValues(r.integration.Name()).Inc() + r.integration.Report(now, model.Duration(duration), err) if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.integration.Name()).Inc() if !retry { diff --git a/notify/notify_test.go b/notify/notify_test.go index 996c132dd8..f8095a66aa 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -380,20 +380,19 @@ func TestRoutingStage(t *testing.T) { func TestRetryStageWithError(t *testing.T) { fail, retry := true, true sent := []*types.Alert{} - i := Integration{ - notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { - if fail { - fail = false - return retry, errors.New("fail to deliver notification") - } - sent = append(sent, alerts...) - return false, nil - }), - rs: sendResolved(false), - } + notifier := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + if fail { + fail = false + return retry, errors.New("fail to deliver notification") + } + sent = append(sent, alerts...) + return false, nil + }) + i := NewIntegration(notifier, sendResolved(false), "test integration", 1) r := RetryStage{ integration: i, metrics: NewMetrics(prometheus.NewRegistry()), + groupName: "test[0]", } alerts := []*types.Alert{ @@ -414,6 +413,10 @@ func TestRetryStageWithError(t *testing.T) { require.Equal(t, alerts, sent) require.NotNil(t, resctx) + // The integration should report no errors. + _, _, err = i.GetReport() + require.Nil(t, err) + // Notify with an unrecoverable error should fail. sent = sent[:0] fail = true @@ -421,6 +424,10 @@ func TestRetryStageWithError(t *testing.T) { resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.NotNil(t, err) require.NotNil(t, resctx) + + // The integration should report an error. + _, _, err = i.GetReport() + require.EqualError(t, err, "fail to deliver notification") } func TestRetryStageWithErrorCode(t *testing.T) { @@ -476,16 +483,15 @@ func TestRetryStageWithErrorCode(t *testing.T) { func TestRetryStageNoResolved(t *testing.T) { sent := []*types.Alert{} - i := Integration{ - notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { - sent = append(sent, alerts...) - return false, nil - }), - rs: sendResolved(false), - } + notifier := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + sent = append(sent, alerts...) + return false, nil + }) + i := NewIntegration(notifier, sendResolved(false), "test integration", 1) r := RetryStage{ integration: i, metrics: NewMetrics(prometheus.NewRegistry()), + groupName: "test[0]", } alerts := []*types.Alert{ @@ -526,20 +532,23 @@ func TestRetryStageNoResolved(t *testing.T) { require.Equal(t, alerts, res) require.Equal(t, []*types.Alert{}, sent) require.NotNil(t, resctx) + + // The integration should report no errors. + _, _, err = i.GetReport() + require.Nil(t, err) } func TestRetryStageSendResolved(t *testing.T) { sent := []*types.Alert{} - i := Integration{ - notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { - sent = append(sent, alerts...) - return false, nil - }), - rs: sendResolved(true), - } + notifier := notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { + sent = append(sent, alerts...) + return false, nil + }) + i := NewIntegration(notifier, sendResolved(true), "test integration", 1) r := RetryStage{ integration: i, metrics: NewMetrics(prometheus.NewRegistry()), + groupName: "test[0]", } alerts := []*types.Alert{ @@ -574,6 +583,10 @@ func TestRetryStageSendResolved(t *testing.T) { require.Equal(t, alerts, res) require.Equal(t, alerts, sent) require.NotNil(t, resctx) + + // The integration should report no errors. + _, _, err = i.GetReport() + require.Nil(t, err) } func TestSetNotifiesStage(t *testing.T) { diff --git a/notify/receiver.go b/notify/receiver.go new file mode 100644 index 0000000000..d8bc1d0dfc --- /dev/null +++ b/notify/receiver.go @@ -0,0 +1,42 @@ +// Copyright 2022 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notify + +type Receiver struct { + groupName string + integrations []*Integration + + // A receiver is considered active if a route is using it. + active bool +} + +func (r *Receiver) Name() string { + return r.groupName +} + +func (r *Receiver) Active() bool { + return r.active +} + +func (r *Receiver) Integrations() []*Integration { + return r.integrations +} + +func NewReceiver(name string, active bool, integrations []*Integration) *Receiver { + return &Receiver{ + groupName: name, + active: active, + integrations: integrations, + } +}