diff --git a/collectors/metrics/pkg/forwarder/forwarder.go b/collectors/metrics/pkg/forwarder/forwarder.go index fd5a41e9d..1df1760c3 100644 --- a/collectors/metrics/pkg/forwarder/forwarder.go +++ b/collectors/metrics/pkg/forwarder/forwarder.go @@ -405,6 +405,11 @@ func (w *Worker) LastMetrics() []*clientmodel.MetricFamily { } func (w *Worker) Run(ctx context.Context) { + // Forward metrics immediately on startup. + if err := w.forward(ctx); err != nil { + rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err) + } + ticker := time.NewTicker(w.interval) defer ticker.Stop() diff --git a/collectors/metrics/pkg/metricsclient/metricsclient.go b/collectors/metrics/pkg/metricsclient/metricsclient.go index d74f4c872..4f28c065e 100644 --- a/collectors/metrics/pkg/metricsclient/metricsclient.go +++ b/collectors/metrics/pkg/metricsclient/metricsclient.go @@ -16,6 +16,7 @@ import ( "io" "net" "net/http" + "net/url" "os" "path/filepath" "sort" @@ -23,7 +24,7 @@ import ( "strings" "time" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -503,12 +504,13 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request, } b.MaxElapsedTime = interval / time.Duration(halfInterval) retryable := func() error { - return c.sendRequest(req.URL.String(), compressed) + return c.sendRequest(ctx, req.URL.String(), compressed) } notify := func(err error, t time.Duration) { msg := fmt.Sprintf("error: %v happened at time: %v", err, t) logger.Log(c.logger, logger.Warn, "msg", msg) } + err = backoff.RetryNotify(retryable, b, notify) if err != nil { return err @@ -518,43 +520,69 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request, return nil } -func (c *Client) sendRequest(serverURL string, body []byte) error { +func (c *Client) sendRequest(ctx context.Context, serverURL string, body []byte) error { req1, err := http.NewRequest(http.MethodPost, serverURL, bytes.NewBuffer(body)) if err != nil { - msg := "failed to create forwarding request" - logger.Log(c.logger, logger.Warn, "msg", msg, "err", err) + wrappedErr := fmt.Errorf("failed to create forwarding request: %w", err) c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc() - return errors.New(msg) + return backoff.Permanent(wrappedErr) } - // req.Header.Add("THANOS-TENANT", tenantID) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() req1 = req1.WithContext(ctx) resp, err := c.client.Do(req1) if err != nil { - msg := "failed to forward request" - logger.Log(c.logger, logger.Warn, "msg", msg, "err", err) c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc() - return errors.New(msg) + + wrappedErr := fmt.Errorf("failed to forward request: %w", err) + if isTransientError(err) { + return wrappedErr + } + + return backoff.Permanent(wrappedErr) } c.metrics.ForwardRemoteWriteRequests.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc() - if resp.StatusCode/100 != 2 { + if resp.StatusCode < 200 || resp.StatusCode >= 300 { // surfacing upstreams error to our users too + defer resp.Body.Close() bodyBytes, err := io.ReadAll(resp.Body) if err != nil { logger.Log(c.logger, logger.Warn, err) } - bodyString := string(bodyBytes) - msg := fmt.Sprintf("response status code is %s, response body is %s", resp.Status, bodyString) - logger.Log(c.logger, logger.Warn, msg) - return errors.New(msg) + retErr := fmt.Errorf("response status code is %s, response body is %s", resp.Status, string(bodyBytes)) + + if isTransientResponseError(resp) { + return retErr + } + + return backoff.Permanent(retErr) } + return nil } + +func isTransientError(err error) bool { + if urlErr, ok := err.(*url.Error); ok { + return urlErr.Timeout() + } + + return false +} + +func isTransientResponseError(resp *http.Response) bool { + if resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented { + return true + } + + if resp.StatusCode == http.StatusTooManyRequests { + return true + } + + return false +} diff --git a/collectors/metrics/pkg/metricsclient/metricsclient_test.go b/collectors/metrics/pkg/metricsclient/metricsclient_test.go index 0c3e3aece..05402bac4 100644 --- a/collectors/metrics/pkg/metricsclient/metricsclient_test.go +++ b/collectors/metrics/pkg/metricsclient/metricsclient_test.go @@ -5,15 +5,22 @@ package metricsclient import ( + "bytes" + "context" "fmt" "net/http" + "net/http/httptest" "reflect" "testing" "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" clientmodel "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" ) func TestDefaultTransport(t *testing.T) { @@ -271,3 +278,99 @@ func timeseriesEqual(t1 []prompb.TimeSeries, t2 []prompb.TimeSeries) (bool, erro return true, nil } + +func TestClient_RemoteWrite(t *testing.T) { + tests := []struct { + name string + families []*clientmodel.MetricFamily + serverHandler http.HandlerFunc + expect func(t *testing.T, err error, retryCount int) + }{ + { + name: "successful write with metrics", + families: []*clientmodel.MetricFamily{mockMetricFamily()}, + serverHandler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }, + expect: func(t *testing.T, err error, retryCount int) { + assert.NoError(t, err) + assert.Equal(t, 1, retryCount) + }, + }, + { + name: "no metrics to write", + families: []*clientmodel.MetricFamily{}, + serverHandler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }, + expect: func(t *testing.T, err error, retryCount int) { + assert.NoError(t, err) + assert.Equal(t, 0, retryCount) + }, + }, + { + name: "retryable error", + families: []*clientmodel.MetricFamily{mockMetricFamily()}, + serverHandler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + expect: func(t *testing.T, err error, retryCount int) { + assert.Error(t, err) + assert.Greater(t, retryCount, 1) + }, + }, + { + name: "non-retryable error", + families: []*clientmodel.MetricFamily{mockMetricFamily()}, + serverHandler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusConflict) + }, + expect: func(t *testing.T, err error, retryCount int) { + assert.Error(t, err) + assert.Equal(t, 1, retryCount) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + requestCount := 0 + + handler := func(w http.ResponseWriter, r *http.Request) { + requestCount++ + tt.serverHandler(w, r) + } + ts := httptest.NewServer(http.HandlerFunc(handler)) + defer ts.Close() + + reg := prometheus.NewRegistry() + clientMetrics := &ClientMetrics{ + ForwardRemoteWriteRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "forward_write_requests_total", + Help: "Counter of forward remote write requests.", + }, []string{"status_code"}), + } + client := &Client{logger: log.NewNopLogger(), client: ts.Client(), metrics: clientMetrics} + + req, err := http.NewRequest("POST", ts.URL, bytes.NewBuffer([]byte{})) + assert.NoError(t, err) + + err = client.RemoteWrite(context.Background(), req, tt.families, 30*time.Second) + + tt.expect(t, err, requestCount) + }) + } +} + +func mockMetricFamily() *clientmodel.MetricFamily { + return &clientmodel.MetricFamily{ + Name: proto.String("test_metric"), + Type: clientmodel.MetricType_COUNTER.Enum(), + Metric: []*clientmodel.Metric{ + { + Counter: &clientmodel.Counter{Value: proto.Float64(1)}, + TimestampMs: proto.Int64(time.Now().UnixNano() / int64(time.Millisecond)), + }, + }, + } +} diff --git a/go.mod b/go.mod index f410ba1db..c98eb23f4 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.0 require ( github.com/IBM/controller-filtered-cache v0.3.6 - github.com/cenkalti/backoff v2.2.1+incompatible + github.com/cenkalti/backoff/v4 v4.3.0 github.com/cloudflare/cfssl v1.6.3 github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/go-co-op/gocron v1.37.0 @@ -75,7 +75,6 @@ require ( github.com/bgentry/speakeasy v0.1.0 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/brancz/locutus v0.0.0-20210511124350-7a84f4d1bcb3 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa // indirect diff --git a/go.sum b/go.sum index bf2d5773c..2829f3e42 100644 --- a/go.sum +++ b/go.sum @@ -189,7 +189,6 @@ github.com/caarlos0/ctrlc v1.0.0/go.mod h1:CdXpj4rmq0q/1Eb44M9zi2nKB0QraNKuRGYGr github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e/go.mod h1:9IOqJGCPMSc6E5ydlp5NIonxObaeu/Iub/X03EKPVYo= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cavaliercoder/go-cpio v0.0.0-20180626203310-925f9528c45e/go.mod h1:oDpT4efm8tSYHXV5tHSdRvBet/b/QzxZ+XyyPehvm3A= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=