Skip to content

Commit

Permalink
stop processing pipeline if dedup stage sees event from the future (#30)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Tseretyan <[email protected]>
  • Loading branch information
Yuri Tseretyan authored Mar 8, 2023
1 parent ec19b0a commit 78fedf8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
13 changes: 12 additions & 1 deletion notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
}

// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
Expand Down Expand Up @@ -657,6 +657,17 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
}

if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
if entry != nil {
// get the tick time from the context.
timeNow, ok := Now(ctx)
// now make sure that the current state is from past
if ok && entry.Timestamp.After(timeNow) {
diff := entry.Timestamp.Sub(timeNow)
// this usually means that the WaitStage took longer than the group_wait, and the subsequent node in the cluster sees the event from the first node
_ = level.Info(l).Log("msg", "timestamp of notification log entry is after the current pipeline timestamp. Skip execution", "entry_timestamp", entry.Timestamp, "tick_timestamp", timeNow, "diff", diff)
return ctx, nil, nil
}
}
return ctx, alerts, nil
}
return ctx, nil, nil
Expand Down
16 changes: 15 additions & 1 deletion notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestDedupStage(t *testing.T) {
rs: sendResolved(false),
}

ctx := context.Background()
ctx := WithNow(context.Background(), now)

_, _, err := s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "group key missing")
Expand Down Expand Up @@ -295,6 +295,20 @@ func TestDedupStage(t *testing.T) {
_, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Equal(t, alerts, res, "unexpected alerts returned")

// Must return no error and no alerts if notification log entry is from the future
s.nflog = &testNflog{
qerr: nil,
qres: []*nflogpb.Entry{
{
FiringAlerts: []uint64{1, 2, 3, 4},
Timestamp: now.Add(1 * time.Millisecond),
},
},
}
_, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Nil(t, res)
}

func TestMultiStage(t *testing.T) {
Expand Down

0 comments on commit 78fedf8

Please sign in to comment.