diff --git a/collectors/metrics/pkg/forwarder/forwarder.go b/collectors/metrics/pkg/forwarder/forwarder.go index fd5a41e9d..1df1760c3 100644 --- a/collectors/metrics/pkg/forwarder/forwarder.go +++ b/collectors/metrics/pkg/forwarder/forwarder.go @@ -405,6 +405,11 @@ func (w *Worker) LastMetrics() []*clientmodel.MetricFamily { } func (w *Worker) Run(ctx context.Context) { + // Forward metrics immediately on startup. + if err := w.forward(ctx); err != nil { + rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err) + } + ticker := time.NewTicker(w.interval) defer ticker.Stop() diff --git a/collectors/metrics/pkg/metricsclient/metricsclient.go b/collectors/metrics/pkg/metricsclient/metricsclient.go index d74f4c872..4f28c065e 100644 --- a/collectors/metrics/pkg/metricsclient/metricsclient.go +++ b/collectors/metrics/pkg/metricsclient/metricsclient.go @@ -16,6 +16,7 @@ import ( "io" "net" "net/http" + "net/url" "os" "path/filepath" "sort" @@ -23,7 +24,7 @@ import ( "strings" "time" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -503,12 +504,13 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request, } b.MaxElapsedTime = interval / time.Duration(halfInterval) retryable := func() error { - return c.sendRequest(req.URL.String(), compressed) + return c.sendRequest(ctx, req.URL.String(), compressed) } notify := func(err error, t time.Duration) { msg := fmt.Sprintf("error: %v happened at time: %v", err, t) logger.Log(c.logger, logger.Warn, "msg", msg) } + err = backoff.RetryNotify(retryable, b, notify) if err != nil { return err @@ -518,43 +520,69 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request, return nil } -func (c *Client) sendRequest(serverURL string, body []byte) error { +func (c *Client) sendRequest(ctx context.Context, serverURL string, body []byte) error { req1, err := http.NewRequest(http.MethodPost, serverURL, bytes.NewBuffer(body)) if err != nil { - msg := "failed to create forwarding request" - logger.Log(c.logger, logger.Warn, "msg", msg, "err", err) + wrappedErr := fmt.Errorf("failed to create forwarding request: %w", err) c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc() - return errors.New(msg) + return backoff.Permanent(wrappedErr) } - // req.Header.Add("THANOS-TENANT", tenantID) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() req1 = req1.WithContext(ctx) resp, err := c.client.Do(req1) if err != nil { - msg := "failed to forward request" - logger.Log(c.logger, logger.Warn, "msg", msg, "err", err) c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc() - return errors.New(msg) + + wrappedErr := fmt.Errorf("failed to forward request: %w", err) + if isTransientError(err) { + return wrappedErr + } + + return backoff.Permanent(wrappedErr) } c.metrics.ForwardRemoteWriteRequests.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc() - if resp.StatusCode/100 != 2 { + if resp.StatusCode < 200 || resp.StatusCode >= 300 { // surfacing upstreams error to our users too + defer resp.Body.Close() bodyBytes, err := io.ReadAll(resp.Body) if err != nil { logger.Log(c.logger, logger.Warn, err) } - bodyString := string(bodyBytes) - msg := fmt.Sprintf("response status code is %s, response body is %s", resp.Status, bodyString) - logger.Log(c.logger, logger.Warn, msg) - return errors.New(msg) + retErr := fmt.Errorf("response status code is %s, response body is %s", resp.Status, string(bodyBytes)) + + if isTransientResponseError(resp) { + return retErr + } + + return backoff.Permanent(retErr) } + return nil } + +func isTransientError(err error) bool { + if urlErr, ok := err.(*url.Error); ok { + return urlErr.Timeout() + } + + return false +} + +func isTransientResponseError(resp *http.Response) bool { + if resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented { + return true + } + + if resp.StatusCode == http.StatusTooManyRequests { + return true + } + + return false +} diff --git a/go.mod b/go.mod index f7942a40d..c277ae897 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.0 require ( github.com/IBM/controller-filtered-cache v0.3.6 - github.com/cenkalti/backoff v2.2.1+incompatible + github.com/cenkalti/backoff/v4 v4.3.0 github.com/cloudflare/cfssl v1.6.3 github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/go-co-op/gocron v1.37.0 @@ -75,7 +75,6 @@ require ( github.com/bgentry/speakeasy v0.1.0 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/brancz/locutus v0.0.0-20210511124350-7a84f4d1bcb3 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa // indirect diff --git a/go.sum b/go.sum index 04aa65dd9..c3545d0f6 100644 --- a/go.sum +++ b/go.sum @@ -189,7 +189,6 @@ github.com/caarlos0/ctrlc v1.0.0/go.mod h1:CdXpj4rmq0q/1Eb44M9zi2nKB0QraNKuRGYGr github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e/go.mod h1:9IOqJGCPMSc6E5ydlp5NIonxObaeu/Iub/X03EKPVYo= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cavaliercoder/go-cpio v0.0.0-20180626203310-925f9528c45e/go.mod h1:oDpT4efm8tSYHXV5tHSdRvBet/b/QzxZ+XyyPehvm3A= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=