From a435b9f0146cf0dad613be11d50827e73f4224f2 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Thu, 19 Dec 2024 11:29:26 +0000 Subject: [PATCH] Fix leaky goroutine on metrics-collector (#1733) * Fix leaky goroutine on metrics-collector Signed-off-by: Saswata Mukherjee * Check simulated series file Signed-off-by: Saswata Mukherjee * Fix test Signed-off-by: Saswata Mukherjee --------- Signed-off-by: Saswata Mukherjee --- .../metrics/cmd/metrics-collector/main.go | 23 ++-- .../cmd/metrics-collector/main_test.go | 100 ++++++++++++++++++ 2 files changed, 114 insertions(+), 9 deletions(-) diff --git a/collectors/metrics/cmd/metrics-collector/main.go b/collectors/metrics/cmd/metrics-collector/main.go index f86a41c38..4ddac29e0 100644 --- a/collectors/metrics/cmd/metrics-collector/main.go +++ b/collectors/metrics/cmd/metrics-collector/main.go @@ -38,15 +38,16 @@ import ( func main() { opt := &Options{ - From: "http://localhost:9090", - Listen: "localhost:9002", - LimitBytes: 200 * 1024, - Matchers: []string{`{__name__="up"}`}, - Interval: 4*time.Minute + 30*time.Second, - EvaluateInterval: 30 * time.Second, - WorkerNum: 1, - DisableHyperShift: false, - DisableStatusReporting: false, + From: "http://localhost:9090", + Listen: "localhost:9002", + LimitBytes: 200 * 1024, + Matchers: []string{`{__name__="up"}`}, + Interval: 4*time.Minute + 30*time.Second, + EvaluateInterval: 30 * time.Second, + WorkerNum: 1, + DisableHyperShift: false, + DisableStatusReporting: false, + SimulatedTimeseriesFile: "", } cmd := &cobra.Command{ Short: "Remote write federated metrics from prometheus", @@ -722,6 +723,10 @@ func initShardedConfigs(o *Options, agent Agent) ([]*forwarder.Config, error) { } func runMultiWorkers(o *Options, cfg *forwarder.Config) error { + if o.WorkerNum > 1 && o.SimulatedTimeseriesFile == "" { + return nil + } + for i := 1; i < int(o.WorkerNum); i++ { opt := &Options{ From: o.From, diff --git a/collectors/metrics/cmd/metrics-collector/main_test.go b/collectors/metrics/cmd/metrics-collector/main_test.go index 5ee34be01..b47a14e7c 100644 --- a/collectors/metrics/cmd/metrics-collector/main_test.go +++ b/collectors/metrics/cmd/metrics-collector/main_test.go @@ -5,6 +5,7 @@ package main import ( + "fmt" stdlog "log" "os" "testing" @@ -65,3 +66,102 @@ func TestMultiWorkers(t *testing.T) { time.Sleep(1 * time.Second) } + +func TestSplitMatchersIntoShards(t *testing.T) { + tests := []struct { + name string + matchers []string + shardCount int + want [][]string + }{ + { + name: "single shard", + matchers: []string{"match1", "match2", "match3"}, + shardCount: 1, + want: [][]string{{"match1", "match2", "match3"}}, + }, + { + name: "two shards", + matchers: []string{"match1", "match2", "match3", "match4"}, + shardCount: 2, + want: [][]string{ + {"match1", "match3"}, + {"match2", "match4"}, + }, + }, + // This case should not happen and is restricted by CLI option validation. + { + name: "two shards", + matchers: []string{"match1", "match2", "match3", "match4"}, + shardCount: 6, + want: [][]string{ + {"match1"}, + {"match2"}, + {"match3"}, + {"match4"}, + {}, + {}, + }, + }, + { + name: "three shards", + matchers: []string{"match1", "match2", "match3", "match4", "match5"}, + shardCount: 3, + want: [][]string{ + {"match1", "match4"}, + {"match2", "match5"}, + {"match3"}, + }, + }, + { + name: "more shards than matchers", + matchers: []string{"match1", "match2"}, + shardCount: 3, + want: [][]string{ + {"match1"}, + {"match2"}, + {}, + }, + }, + { + name: "zero shards", + matchers: []string{"match1", "match2", "match3"}, + shardCount: 0, + want: [][]string{{"match1", "match2", "match3"}}, + }, + { + name: "negative shards", + matchers: []string{"match1", "match2", "match3"}, + shardCount: -1, + want: [][]string{{"match1", "match2", "match3"}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := splitMatchersIntoShards(tt.matchers, tt.shardCount) + fmt.Println(got) + // Check if number of shards matches + if len(got) != len(tt.want) { + t.Errorf("splitMatchersIntoShards() got %d shards, want %d shards", + len(got), len(tt.want)) + return + } + + // Check if each shard contains the expected matchers + for i := 0; i < len(got); i++ { + if len(got[i]) != len(tt.want[i]) { + t.Errorf("shard %d: got %d matchers, want %d matchers", + i, len(got[i]), len(tt.want[i])) + continue + } + for j := 0; j < len(got[i]); j++ { + if got[i][j] != tt.want[i][j] { + t.Errorf("shard %d matcher %d: got %s, want %s", + i, j, got[i][j], tt.want[i][j]) + } + } + } + }) + } +}