From 33f42c69a5a51c5a28f6181623ba13da2e91d6a3 Mon Sep 17 00:00:00 2001 From: Martin Hutchinson Date: Wed, 7 Aug 2024 16:26:28 +0000 Subject: [PATCH] Tests for the hammer analysis Basic tests to make sure that the average reported is correct and that async behaviour is reliable. --- hammer/hammer.go | 27 +++++++++++++---------- hammer/hammer_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 11 deletions(-) create mode 100644 hammer/hammer_test.go diff --git a/hammer/hammer.go b/hammer/hammer.go index c0fe4ef8..c956f321 100644 --- a/hammer/hammer.go +++ b/hammer/hammer.go @@ -89,7 +89,7 @@ func main() { klog.Exitf("Failed to get initial state of the log: %v", err) } - ha := newHammerAnalyser(&tracker, 100) + ha := newHammerAnalyser(func() uint64 { return tracker.LatestConsistent.Size }) go ha.updateStatsLoop(ctx) go ha.errorLoop(ctx) @@ -181,11 +181,11 @@ func (h *Hammer) updateCheckpointLoop(ctx context.Context) { } } -func newHammerAnalyser(tracker *client.LogStateTracker, chanSize int) *HammerAnalyser { - leafSampleChan := make(chan leafTime, chanSize) +func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser { + leafSampleChan := make(chan leafTime, 100) errChan := make(chan error, 20) return &HammerAnalyser{ - tracker: tracker, + treeSizeFn: treeSizeFn, seqLeafChan: leafSampleChan, errChan: errChan, integrationTime: movingaverage.New(30), @@ -195,7 +195,7 @@ func newHammerAnalyser(tracker *client.LogStateTracker, chanSize int) *HammerAna // HammerAnalyser is responsible for measuring and interpreting the result of hammering. type HammerAnalyser struct { - tracker *client.LogStateTracker + treeSizeFn func() uint64 seqLeafChan chan leafTime errChan chan error @@ -205,14 +205,14 @@ type HammerAnalyser struct { func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) { tick := time.NewTicker(100 * time.Millisecond) - size := a.tracker.LatestConsistent.Size + size := a.treeSizeFn() for { select { case <-ctx.Done(): return case <-tick.C: } - newSize := a.tracker.LatestConsistent.Size + newSize := a.treeSizeFn() if newSize <= size { continue } @@ -221,13 +221,18 @@ func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) { queueLatency := time.Duration(0) numLeaves := 0 var sample *leafTime + ReadLoop: for { if sample == nil { - l, ok := <-a.seqLeafChan - if !ok { - break + select { + case l, ok := <-a.seqLeafChan: + if !ok { + break ReadLoop + } + sample = &l + default: + break ReadLoop } - sample = &l } // Stop considering leaf times once we've caught up with that cross // either the current checkpoint or "now": diff --git a/hammer/hammer_test.go b/hammer/hammer_test.go new file mode 100644 index 00000000..410f17aa --- /dev/null +++ b/hammer/hammer_test.go @@ -0,0 +1,51 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "testing" + "time" +) + +func TestHammerAnalyser_Stats(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var treeSize uint64 + treeSizeFn := func() uint64 { return treeSize } + ha := newHammerAnalyser(treeSizeFn) + + go ha.updateStatsLoop(ctx) + + time.Sleep(100 * time.Millisecond) + + baseTime := time.Now().Add(-1 * time.Minute) + for i := 0; i < 10; i++ { + ha.seqLeafChan <- leafTime{ + idx: uint64(i), + queuedAt: baseTime, + assignedAt: baseTime.Add(time.Duration(i) * time.Second), + } + } + // Update treeSize so that treeSizeFn returns the new tree size when requested async + treeSize = 10 + time.Sleep(500 * time.Millisecond) + + avg := ha.queueTime.Avg() + if want := float64(4500); avg != want { + t.Errorf("integration time avg: got != want (%f != %f)", avg, want) + } +}