diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index c491e61b..b56c47b3 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -216,14 +216,8 @@ func (s *Scraper) Run(ctx context.Context) { // TODO(mem): keep count of the number of successive errors and // collect logs if threshold is reached. - var ( - frequency = ms(s.check.Frequency) - offset = ms(s.check.Offset) - ) - - if offset == 0 { - offset = randDuration(min(frequency, maxPublishInterval)) - } + frequency := ms(s.check.Frequency) + offset := computeOffset(ms(s.check.Offset), frequency, timeFromNs(s.check.Created), time.Now()) scrapeHandler := scrapeHandler{scraper: s} @@ -334,6 +328,48 @@ func ms(n int64) time.Duration { return time.Duration(n) * time.Millisecond } +func timeFromNs(ns float64) time.Time { + sec := int64(math.Floor(ns / 1e9)) + nsec := int64(math.Mod(ns, 1e9)) + return time.Unix(sec, nsec) +} + +func computeOffset(offset, frequency time.Duration, t0, now time.Time) time.Duration { + if now.Sub(t0) < frequency { + // The check was created less than the frequency ago, we should + // starting running it right away. + if offset != 0 { + return offset + } + + return randDuration(min(frequency, maxPublishInterval)) + } + + // The check was created more than the frequency ago, so we need to + // compute the time until the next time the check should run. + // + // Compute the number of runs since t0, add one for the next run and + // multiply by the frequency in order to obtain its timestamp. Finally, + // compute the remaining time until that timestamp. + + runs := (now.UnixMilli() - t0.UnixMilli()) / frequency.Milliseconds() + + timeUntilNextRun := t0.Add(time.Duration(runs+1) * frequency).Sub(now) + + if timeUntilNextRun <= maxPublishInterval { + return timeUntilNextRun + } + + // The reason why we need to ignore the computed offset is that the + // check ran in the past, and it's possible that it was filling the + // data with repeated samples that we no longer have access to. We + // cannot wait until the next run because that might be a long time + // from now, creating a gap in the data. Instead we wait for a random + // value that avoids creating gaps (assuming the last published sample + // was recent). + return randDuration(maxPublishInterval) +} + func randDuration(d time.Duration) time.Duration { return time.Duration(rand.Int63n(int64(d))) } diff --git a/internal/scraper/scraper_test.go b/internal/scraper/scraper_test.go index 9734a171..c45f1f60 100644 --- a/internal/scraper/scraper_test.go +++ b/internal/scraper/scraper_test.go @@ -1991,3 +1991,113 @@ func TestTickWithOffset(t *testing.T) { }) } } + +func TestTimeFromNs(t *testing.T) { + testcases := map[string]struct { + ns float64 + expected time.Time + }{ + "zero": { + ns: 0, + expected: time.Unix(0, 0), + }, + "one": { + ns: 1, + expected: time.Unix(0, 1), + }, + "2020-01-01 00:00:00.000000000": { + ns: 1577836800 * 1e9, + expected: time.Unix(1577836800, 0), + }, + "2024-07-02 21:21:50.123456768": { + ns: 1719955310*1e9 + 123456789, + expected: time.Unix(1719955310, 123456789), + }, + "2262-04-11 23:47:15.999999999": { + ns: 9223372035*1e9 + 999999999, // This is close to the maximum value that can be represented by a time.Time + expected: time.Unix(9223372035, 999999999), + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + actual := timeFromNs(tc.ns) + // Why UnixMicro instead of UnixNano? Because some + // precision is lost during the conversion from int64 + // to float64, and getting this right at the microsecond + // level is good enough. + require.InDelta(t, tc.expected.UnixMicro(), actual.UnixMicro(), 1) + }) + } +} + +func TestComputeOffset(t *testing.T) { + t0 := time.Unix(1_000_000, 0) + + testcases := map[string]struct { + frequency time.Duration + offset time.Duration + now time.Time + expected time.Duration + }{ + "zero": { + offset: 0, + frequency: 60 * time.Second, + now: t0.Add(0), + expected: 0, + }, + "1s": { + offset: 1 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(0), + expected: 1 * time.Second, + }, + "30s": { + offset: 30 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(0), + expected: 30 * time.Second, + }, + "created 100 seconds ago": { + offset: 0 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(100 * time.Second), + expected: 20 * time.Second, // 100 - 60 = 40 -> 60 - 40 = 20 + }, + "created 1000 seconds ago": { + offset: 0 * time.Second, + frequency: 60 * time.Second, + now: t0.Add(1000 * time.Second), + expected: 20 * time.Second, // 1000 / 60 = 16 -> 1000 - 60 * 16 = 40 -> 60 - 40 = 20 + }, + "slow check": { + offset: 0 * time.Second, + frequency: 5 * time.Minute, + now: t0.Add(1000 * time.Minute), + expected: 0, + }, + "slow check close to next run": { + offset: 0 * time.Second, + frequency: 5 * time.Minute, + now: t0.Add(999 * time.Minute), + expected: 1 * time.Minute, + }, + "slow check just ran": { + offset: 0 * time.Second, + frequency: 5 * time.Minute, + now: t0.Add(1001 * time.Minute), + expected: 0, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + actual := computeOffset(tc.offset, tc.frequency, t0, tc.now) + if tc.expected != 0 { + require.Equal(t, tc.expected, actual) + } else { + require.LessOrEqual(t, actual, maxPublishInterval) + } + }) + } +}