Skip to content

Commit

Permalink
Merge pull request #41 from grafana/revert-30-fix-obsolete-tick-dedup
Browse files Browse the repository at this point in the history
Revert "Fix: DedupStage to stop pipeline if event in notification log is from the future"
  • Loading branch information
grobinson-grafana authored Mar 15, 2023
2 parents 78fedf8 + df95047 commit b399710
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 27 deletions.
13 changes: 1 addition & 12 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint
}

// Exec implements the Stage interface.
func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
Expand Down Expand Up @@ -657,17 +657,6 @@ func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al
}

if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
if entry != nil {
// get the tick time from the context.
timeNow, ok := Now(ctx)
// now make sure that the current state is from past
if ok && entry.Timestamp.After(timeNow) {
diff := entry.Timestamp.Sub(timeNow)
// this usually means that the WaitStage took longer than the group_wait, and the subsequent node in the cluster sees the event from the first node
_ = level.Info(l).Log("msg", "timestamp of notification log entry is after the current pipeline timestamp. Skip execution", "entry_timestamp", entry.Timestamp, "tick_timestamp", timeNow, "diff", diff)
return ctx, nil, nil
}
}
return ctx, alerts, nil
}
return ctx, nil, nil
Expand Down
16 changes: 1 addition & 15 deletions notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestDedupStage(t *testing.T) {
rs: sendResolved(false),
}

ctx := WithNow(context.Background(), now)
ctx := context.Background()

_, _, err := s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "group key missing")
Expand Down Expand Up @@ -295,20 +295,6 @@ func TestDedupStage(t *testing.T) {
_, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Equal(t, alerts, res, "unexpected alerts returned")

// Must return no error and no alerts if notification log entry is from the future
s.nflog = &testNflog{
qerr: nil,
qres: []*nflogpb.Entry{
{
FiringAlerts: []uint64{1, 2, 3, 4},
Timestamp: now.Add(1 * time.Millisecond),
},
},
}
_, res, err = s.Exec(ctx, log.NewNopLogger(), alerts...)
require.NoError(t, err)
require.Nil(t, res)
}

func TestMultiStage(t *testing.T) {
Expand Down

0 comments on commit b399710

Please sign in to comment.