From 9e27a2ca5b8d9a3a12777727caeb58885f2bb515 Mon Sep 17 00:00:00 2001 From: warmans Date: Wed, 28 Jul 2021 22:37:26 +0200 Subject: [PATCH] Refactor target storage. Prevent target duplication --- .gitignore | 3 +- Makefile | 26 ++++++++ README.md | 16 +++++ cmd/main.go | 177 ++++++++++++++++++++++++++++++++++------------------ 4 files changed, 161 insertions(+), 61 deletions(-) diff --git a/.gitignore b/.gitignore index 1d02479..73d18a6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ /.tarballs /bin /VERSION -/*.iml \ No newline at end of file +/*.iml +.cache diff --git a/Makefile b/Makefile index 74c21c5..0e7fec8 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,32 @@ build: echo ">> building binaries" go build -o ${BIN_DIR}/prometheus-aggregate-exporter -ldflags "-X main.Version=${GIT_TAG}" ./cmd + +# Manual Testing +#---------------------------------------------------------------------- +.PHONY: test.run-fixture-server +test.run-fixture-server: + cd fixture; go run serve.go + +.PHONY: test.run +test.run: build + ./bin/prometheus-aggregate-exporter \ + -targets="t1=http://localhost:3000/histogram.txt,t2=http://localhost:3000/histogram-2.txt" \ + -server.bind=":8080" \ + -verbose=true \ + -targets.dynamic.registration=true \ + -targets.cache.path=".cache" + +.PHONY: test.scrape +test.scrape: + curl localhost:8080/metrics + +test.unregister: + curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d "name=t1&address=localhost:3000/histogram.txt" localhost:8080/unregister + +test.register: + curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d "name=t1&address=localhost:3000/histogram.txt" localhost:8080/register + # Packaging #----------------------------------------------------------------------- diff --git a/README.md b/README.md index d23b59a..e2ef95a 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,22 @@ And you'll have Docker compile the binary and make it available under the image Alternatively the image is available though docker-hub: https://hub.docker.com/r/warmans/prometheus-aggregate-exporter +### Manual Testing + +You can run the exporter against some static fixture files by running the following make targets +in separate terminals. + +```shell +$ make test.run-fixture-server +$ make test.run +``` + +then to view the `/metrics` page: + +```shell +$ make test.scrape +``` + ### Example Usage ``` ./bin/prometheus-aggregate-exporter \ diff --git a/cmd/main.go b/cmd/main.go index 167ca47..53fecec 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,6 +6,7 @@ import ( "log" "os" "strings" + "sync" "crypto/tls" "fmt" @@ -41,8 +42,8 @@ var ( targetScrapeTimeout *int targets *string insecureSkipVerifyFlag *bool - cacheFilePath *string - dynamicRegistration *bool + cacheFilePath *string + dynamicRegistration *bool ) func init() { @@ -82,17 +83,23 @@ func main() { if len(config.Targets) < 1 { if *dynamicRegistration { - log.Print("WARN: no targets configured, using registration only") + log.Print("No initial targets configured, using registration only") } else { - log.Fatal("No targets configured and dynamic registration is disabled") + log.Fatal("FATAL: No initial targets configured and dynamic registration is disabled.") } } + var cacheFile string if *dynamicRegistration { log.Println("Dynamic target registration enabled") if *cacheFilePath != "" { - config.Targets = appendCachedTargets(config.Targets, *cacheFilePath) log.Printf("Using targets cache file %s\n", *cacheFilePath) + cacheFile = *cacheFilePath + } + } else { + if *cacheFilePath != "" { + // cache makes no sense if dynamic registration is not enabled. + log.Printf("WARN: Dynamic registration is disabled but cache file path was given. Cache will be ignored.") } } @@ -104,6 +111,8 @@ func main() { aggregator := &Aggregator{HTTP: &http.Client{Timeout: time.Duration(config.Timeout) * time.Millisecond}} + targets := NewTargets(config.Targets, cacheFile) + mux := http.NewServeMux() mux.HandleFunc("/metrics", func(rw http.ResponseWriter, r *http.Request) { defer r.Body.Close() @@ -114,13 +123,14 @@ func main() { } if t := r.Form.Get("t"); t != "" { targetKey, err := strconv.Atoi(t) - if err != nil || len(config.Targets)-1 < targetKey { + targetList := targets.Targets() + if err != nil || len(targetList)-1 < targetKey { http.Error(rw, "Bad Request", http.StatusBadRequest) return } - aggregator.Aggregate([]string{config.Targets[targetKey]}, rw) + aggregator.Aggregate([]string{targetList[targetKey]}, rw) } else { - aggregator.Aggregate(config.Targets, rw) + aggregator.Aggregate(targets.Targets(), rw) } }) mux.HandleFunc("/alive", func(rw http.ResponseWriter, r *http.Request) { @@ -148,11 +158,7 @@ func main() { } uri := schema + "://" + address - target := name + "=" + uri - config.Targets = append(config.Targets, target) - if *cacheFilePath != "" { - saveTargets(config.Targets, *cacheFilePath) - } + targets.AddTarget(name + "=" + uri) log.Printf("Registered target %s with name %s\n", uri, name) }) mux.HandleFunc("/unregister", func(rw http.ResponseWriter, r *http.Request) { @@ -175,17 +181,13 @@ func main() { } uri := schema + "://" + address - target := name + "=" + uri - config.Targets = removeTarget(config.Targets, target) - if *cacheFilePath != "" { - saveTargets(config.Targets, *cacheFilePath) - } + targets.RemoveTarget(name + "=" + uri) log.Printf("Unregistered target %s with name %s\n", uri, name) }) } log.Printf("Starting server on %s with targets:\n", config.Server.Bind) - for _, t := range config.Targets { + for _, t := range targets.Targets() { log.Printf(" - %s\n", t) } @@ -212,67 +214,114 @@ func main() { } -type Result struct { - URL string - Name string - SecondsTaken float64 - MetricFamily map[string]*io_prometheus_client.MetricFamily - Error error +func NewTargets(initialTargets []string, cachePath string) *Targets { + t := &Targets{ + cachePath: cachePath, + targets: make(map[string]struct{}), + lock: sync.RWMutex{}, + } + t.tryLoadCache() + for _, v := range initialTargets { + t.AddTarget(v) + } + return t } -type Aggregator struct { - HTTP *http.Client +type Targets struct { + cachePath string + targets map[string]struct{} + lock sync.RWMutex } -func indexOf(element string, data []string) int { - for k, v := range data { - if element == v { - return k - } +func (t *Targets) AddTarget(target string) { + target = strings.TrimSpace(target) + if target == "" { + return } - return -1 //not found. + t.lock.Lock() + defer func() { + t.lock.Unlock() + t.updateCache() + }() + t.targets[target] = struct{}{} } -func removeTarget(targets []string, target string) []string { - index := indexOf(target, targets) - if index == -1 { - log.Printf("There is no currently registered target %s", target) - return targets - } - targets[index] = targets[len(targets)-1] - // We do not need to put s[i] at the end, as it will be discarded anyway - return targets[:len(targets)-1] +func (t *Targets) RemoveTarget(target string) { + target = strings.TrimSpace(target) + t.lock.Lock() + defer func() { + t.lock.Unlock() + t.updateCache() + }() + delete(t.targets, target) } -func appendCachedTargets(targets []string, cacheFilePath string) []string { - targetsFromFile, err := readLines(cacheFilePath) - result := targets - if err == nil { - for i := range targetsFromFile { - target := targetsFromFile[i] - if indexOf(target, result) == -1 { - result = append(result, target) - log.Printf("Recovered target %s from cache file\n", target) - } - } +func (t *Targets) Targets() []string { + t.lock.RLock() + defer t.lock.RUnlock() + + ts := []string{} + for k := range t.targets { + ts = append(ts, k) } - return result + return ts } -func saveTargets(targets []string, cacheFilePath string){ - err := writeLines(targets, cacheFilePath) +func (t *Targets) updateCache() { + if t.cachePath == "" { + return + } + err := writeLines(t.Targets(), t.cachePath) if err != nil { log.Fatal("Error while saving targets cache") } } +func (t *Targets) tryLoadCache() { + if t.cachePath == "" { + return + } + targetsFromFile, err := readLines(t.cachePath) + if err == nil { + for _, v := range targetsFromFile { + t.AddTarget(v) + log.Printf("Recovered target %s from cache file\n", v) + } + } else { + log.Printf("Failed to load cache: %s\n", err.Error()) + } +} + +type Result struct { + URL string + Name string + SecondsTaken float64 + MetricFamily map[string]*io_prometheus_client.MetricFamily + Error error +} + +type Aggregator struct { + HTTP *http.Client +} + func readLines(path string) ([]string, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return []string{}, nil + } else { + return nil, err + } + } file, err := os.Open(path) if err != nil { return nil, err } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + log.Printf("WARN: failed to close cache file after reading") + } + }() var lines []string scanner := bufio.NewScanner(file) @@ -288,11 +337,17 @@ func writeLines(lines []string, path string) error { if err != nil { return err } - defer file.Close() + defer func() { + if err := file.Close(); err != nil { + log.Printf("WARN: failed to close cache file after writing") + } + }() w := bufio.NewWriter(file) for _, line := range lines { - fmt.Fprintln(w, line) + if _, err := fmt.Fprintln(w, line); err != nil { + return err + } } return w.Flush() } @@ -346,7 +401,9 @@ func (f *Aggregator) Aggregate(targets []string, output io.Writer) { encoder := expfmt.NewEncoder(output, expfmt.FmtText) for _, f := range allFamilies { - encoder.Encode(f) + if err := encoder.Encode(f); err != nil { + log.Printf("Failed to encode familty: %s", err.Error()) + } } }(len(targets), resultChan)