Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to align runs with frequency #765

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions internal/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,8 @@ func (s *Scraper) Run(ctx context.Context) {
// TODO(mem): keep count of the number of successive errors and
// collect logs if threshold is reached.

var (
frequency = ms(s.check.Frequency)
offset = ms(s.check.Offset)
)

if offset == 0 {
offset = randDuration(min(frequency, maxPublishInterval))
}
frequency := ms(s.check.Frequency)
offset := computeOffset(ms(s.check.Offset), frequency, timeFromNs(s.check.Created), time.Now())

scrapeHandler := scrapeHandler{scraper: s}

Expand Down Expand Up @@ -334,6 +328,48 @@ func ms(n int64) time.Duration {
return time.Duration(n) * time.Millisecond
}

func timeFromNs(ns float64) time.Time {
sec := int64(math.Floor(ns / 1e9))
nsec := int64(math.Mod(ns, 1e9))
return time.Unix(sec, nsec)
}
Comment on lines +331 to +335
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this redundant? I thought you could just time.Unix(0, ns):
https://go.dev/play/p/zIkG3JFBBvL

The docs state you can:

// It is valid to pass nsec outside the range [0, 999999999].


func computeOffset(offset, frequency time.Duration, t0, now time.Time) time.Duration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit, but I'd suggest naming t0 created instead. t0 suggest some beginning of times, but doesn't seem immediately obvious which one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same about offset, I'm not being able to figure out what that is 🤔

if now.Sub(t0) < frequency {
// The check was created less than the frequency ago, we should
// starting running it right away.
if offset != 0 {
return offset
}

return randDuration(min(frequency, maxPublishInterval))
}

// The check was created more than the frequency ago, so we need to
// compute the time until the next time the check should run.
//
// Compute the number of runs since t0, add one for the next run and
// multiply by the frequency in order to obtain its timestamp. Finally,
// compute the remaining time until that timestamp.

runs := (now.UnixMilli() - t0.UnixMilli()) / frequency.Milliseconds()

timeUntilNextRun := t0.Add(time.Duration(runs+1) * frequency).Sub(now)
Comment on lines +348 to +357
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I barely grasp what we're doing here, but not enough to try and find gaps in the logic. Looks right but I'm not positive about that.


if timeUntilNextRun <= maxPublishInterval {
return timeUntilNextRun
}

// The reason why we need to ignore the computed offset is that the
// check ran in the past, and it's possible that it was filling the
// data with repeated samples that we no longer have access to. We
// cannot wait until the next run because that might be a long time
// from now, creating a gap in the data. Instead we wait for a random
// value that avoids creating gaps (assuming the last published sample
// was recent).
return randDuration(maxPublishInterval)
}

func randDuration(d time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(d)))
}
Expand Down
110 changes: 110 additions & 0 deletions internal/scraper/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1991,3 +1991,113 @@ func TestTickWithOffset(t *testing.T) {
})
}
}

func TestTimeFromNs(t *testing.T) {
testcases := map[string]struct {
ns float64
expected time.Time
}{
"zero": {
ns: 0,
expected: time.Unix(0, 0),
},
"one": {
ns: 1,
expected: time.Unix(0, 1),
},
"2020-01-01 00:00:00.000000000": {
ns: 1577836800 * 1e9,
expected: time.Unix(1577836800, 0),
},
"2024-07-02 21:21:50.123456768": {
ns: 1719955310*1e9 + 123456789,
expected: time.Unix(1719955310, 123456789),
},
"2262-04-11 23:47:15.999999999": {
ns: 9223372035*1e9 + 999999999, // This is close to the maximum value that can be represented by a time.Time
expected: time.Unix(9223372035, 999999999),
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
actual := timeFromNs(tc.ns)
// Why UnixMicro instead of UnixNano? Because some
// precision is lost during the conversion from int64
// to float64, and getting this right at the microsecond
// level is good enough.
require.InDelta(t, tc.expected.UnixMicro(), actual.UnixMicro(), 1)
})
}
}

func TestComputeOffset(t *testing.T) {
t0 := time.Unix(1_000_000, 0)

testcases := map[string]struct {
frequency time.Duration
offset time.Duration
now time.Time
expected time.Duration
}{
"zero": {
offset: 0,
frequency: 60 * time.Second,
now: t0.Add(0),
expected: 0,
},
"1s": {
offset: 1 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(0),
expected: 1 * time.Second,
},
"30s": {
offset: 30 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(0),
expected: 30 * time.Second,
},
"created 100 seconds ago": {
offset: 0 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(100 * time.Second),
expected: 20 * time.Second, // 100 - 60 = 40 -> 60 - 40 = 20
},
"created 1000 seconds ago": {
offset: 0 * time.Second,
frequency: 60 * time.Second,
now: t0.Add(1000 * time.Second),
expected: 20 * time.Second, // 1000 / 60 = 16 -> 1000 - 60 * 16 = 40 -> 60 - 40 = 20
},
"slow check": {
offset: 0 * time.Second,
frequency: 5 * time.Minute,
now: t0.Add(1000 * time.Minute),
expected: 0,
},
"slow check close to next run": {
offset: 0 * time.Second,
frequency: 5 * time.Minute,
now: t0.Add(999 * time.Minute),
expected: 1 * time.Minute,
},
"slow check just ran": {
offset: 0 * time.Second,
frequency: 5 * time.Minute,
now: t0.Add(1001 * time.Minute),
expected: 0,
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
actual := computeOffset(tc.offset, tc.frequency, t0, tc.now)
if tc.expected != 0 {
require.Equal(t, tc.expected, actual)
} else {
require.LessOrEqual(t, actual, maxPublishInterval)
}
})
}
}