diff --git a/hammer/hammer.go b/hammer/hammer.go index c0fe4ef8..dfa0fbd3 100644 --- a/hammer/hammer.go +++ b/hammer/hammer.go @@ -89,7 +89,7 @@ func main() { klog.Exitf("Failed to get initial state of the log: %v", err) } - ha := newHammerAnalyser(&tracker, 100) + ha := newHammerAnalyser(func() uint64 { return tracker.LatestConsistent.Size }) go ha.updateStatsLoop(ctx) go ha.errorLoop(ctx) @@ -181,38 +181,38 @@ func (h *Hammer) updateCheckpointLoop(ctx context.Context) { } } -func newHammerAnalyser(tracker *client.LogStateTracker, chanSize int) *HammerAnalyser { - leafSampleChan := make(chan leafTime, chanSize) +func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser { + leafSampleChan := make(chan leafTime, 100) errChan := make(chan error, 20) return &HammerAnalyser{ - tracker: tracker, + treeSizeFn: treeSizeFn, seqLeafChan: leafSampleChan, errChan: errChan, - integrationTime: movingaverage.New(30), - queueTime: movingaverage.New(30), + integrationTime: movingaverage.Concurrent(movingaverage.New(30)), + queueTime: movingaverage.Concurrent(movingaverage.New(30)), } } // HammerAnalyser is responsible for measuring and interpreting the result of hammering. type HammerAnalyser struct { - tracker *client.LogStateTracker + treeSizeFn func() uint64 seqLeafChan chan leafTime errChan chan error - queueTime *movingaverage.MovingAverage - integrationTime *movingaverage.MovingAverage + queueTime *movingaverage.ConcurrentMovingAverage + integrationTime *movingaverage.ConcurrentMovingAverage } func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) { tick := time.NewTicker(100 * time.Millisecond) - size := a.tracker.LatestConsistent.Size + size := a.treeSizeFn() for { select { case <-ctx.Done(): return case <-tick.C: } - newSize := a.tracker.LatestConsistent.Size + newSize := a.treeSizeFn() if newSize <= size { continue } @@ -221,13 +221,18 @@ func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) { queueLatency := time.Duration(0) numLeaves := 0 var sample *leafTime + ReadLoop: for { if sample == nil { - l, ok := <-a.seqLeafChan - if !ok { - break + select { + case l, ok := <-a.seqLeafChan: + if !ok { + break ReadLoop + } + sample = &l + default: + break ReadLoop } - sample = &l } // Stop considering leaf times once we've caught up with that cross // either the current checkpoint or "now": diff --git a/hammer/hammer_test.go b/hammer/hammer_test.go new file mode 100644 index 00000000..25941fe0 --- /dev/null +++ b/hammer/hammer_test.go @@ -0,0 +1,67 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "sync" + "testing" + "time" +) + +func TestHammerAnalyser_Stats(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var treeSize treeSizeState + ha := newHammerAnalyser(treeSize.getSize) + + go ha.updateStatsLoop(ctx) + + time.Sleep(100 * time.Millisecond) + + baseTime := time.Now().Add(-1 * time.Minute) + for i := 0; i < 10; i++ { + ha.seqLeafChan <- leafTime{ + idx: uint64(i), + queuedAt: baseTime, + assignedAt: baseTime.Add(time.Duration(i) * time.Second), + } + } + treeSize.setSize(10) + time.Sleep(500 * time.Millisecond) + + avg := ha.queueTime.Avg() + if want := float64(4500); avg != want { + t.Errorf("integration time avg: got != want (%f != %f)", avg, want) + } +} + +type treeSizeState struct { + size uint64 + mux sync.RWMutex +} + +func (s *treeSizeState) getSize() uint64 { + s.mux.RLock() + defer s.mux.RUnlock() + return s.size +} + +func (s *treeSizeState) setSize(size uint64) { + s.mux.Lock() + defer s.mux.Unlock() + s.size = size +} diff --git a/hammer/tui.go b/hammer/tui.go index eb1c77bc..c7d9ace1 100644 --- a/hammer/tui.go +++ b/hammer/tui.go @@ -112,7 +112,7 @@ func (c *tuiController) Run(ctx context.Context) { } func (c *tuiController) updateStatsLoop(ctx context.Context, interval time.Duration) { - formatMovingAverage := func(ma *movingaverage.MovingAverage) string { + formatMovingAverage := func(ma *movingaverage.ConcurrentMovingAverage) string { aMin, _ := ma.Min() aMax, _ := ma.Max() aAvg := ma.Avg()