Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACM-14080: update retry logic on forward #1697

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ func (w *Worker) LastMetrics() []*clientmodel.MetricFamily {
}

func (w *Worker) Run(ctx context.Context) {
// Forward metrics immediately on startup.
if err := w.forward(ctx); err != nil {
rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err)
}

ticker := time.NewTicker(w.interval)
defer ticker.Stop()

Expand Down
62 changes: 45 additions & 17 deletions collectors/metrics/pkg/metricsclient/metricsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -503,12 +504,13 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
}
b.MaxElapsedTime = interval / time.Duration(halfInterval)
retryable := func() error {
return c.sendRequest(req.URL.String(), compressed)
return c.sendRequest(ctx, req.URL.String(), compressed)
}
notify := func(err error, t time.Duration) {
msg := fmt.Sprintf("error: %v happened at time: %v", err, t)
logger.Log(c.logger, logger.Warn, "msg", msg)
}

err = backoff.RetryNotify(retryable, b, notify)
if err != nil {
return err
Expand All @@ -518,43 +520,69 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
return nil
}

func (c *Client) sendRequest(serverURL string, body []byte) error {
func (c *Client) sendRequest(ctx context.Context, serverURL string, body []byte) error {
req1, err := http.NewRequest(http.MethodPost, serverURL, bytes.NewBuffer(body))
if err != nil {
msg := "failed to create forwarding request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
wrappedErr := fmt.Errorf("failed to create forwarding request: %w", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)
return backoff.Permanent(wrappedErr)
}

// req.Header.Add("THANOS-TENANT", tenantID)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

req1 = req1.WithContext(ctx)

resp, err := c.client.Do(req1)
if err != nil {
msg := "failed to forward request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)

wrappedErr := fmt.Errorf("failed to forward request: %w", err)
if isTransientError(err) {
return wrappedErr
}

return backoff.Permanent(wrappedErr)
}

c.metrics.ForwardRemoteWriteRequests.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()

if resp.StatusCode/100 != 2 {
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// surfacing upstreams error to our users too
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logger.Log(c.logger, logger.Warn, err)
}
bodyString := string(bodyBytes)
msg := fmt.Sprintf("response status code is %s, response body is %s", resp.Status, bodyString)
logger.Log(c.logger, logger.Warn, msg)
return errors.New(msg)

retErr := fmt.Errorf("response status code is %s, response body is %s", resp.Status, string(bodyBytes))

if isTransientResponseError(resp) {
return retErr
}

return backoff.Permanent(retErr)
}

return nil
}

func isTransientError(err error) bool {
if urlErr, ok := err.(*url.Error); ok {
return urlErr.Timeout()
}

return false
}

func isTransientResponseError(resp *http.Response) bool {
if resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented {
return true
}

if resp.StatusCode == http.StatusTooManyRequests {
return true
}

return false
}
103 changes: 103 additions & 0 deletions collectors/metrics/pkg/metricsclient/metricsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
package metricsclient

import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
clientmodel "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
)

func TestDefaultTransport(t *testing.T) {
Expand Down Expand Up @@ -271,3 +278,99 @@ func timeseriesEqual(t1 []prompb.TimeSeries, t2 []prompb.TimeSeries) (bool, erro

return true, nil
}

func TestClient_RemoteWrite(t *testing.T) {
tests := []struct {
name string
families []*clientmodel.MetricFamily
serverHandler http.HandlerFunc
expect func(t *testing.T, err error, retryCount int)
}{
{
name: "successful write with metrics",
families: []*clientmodel.MetricFamily{mockMetricFamily()},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.NoError(t, err)
assert.Equal(t, 1, retryCount)
},
},
{
name: "no metrics to write",
families: []*clientmodel.MetricFamily{},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.NoError(t, err)
assert.Equal(t, 0, retryCount)
},
},
{
name: "retryable error",
families: []*clientmodel.MetricFamily{mockMetricFamily()},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.Error(t, err)
assert.Greater(t, retryCount, 1)
},
},
{
name: "non-retryable error",
families: []*clientmodel.MetricFamily{mockMetricFamily()},
serverHandler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusConflict)
},
expect: func(t *testing.T, err error, retryCount int) {
assert.Error(t, err)
assert.Equal(t, 1, retryCount)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requestCount := 0

handler := func(w http.ResponseWriter, r *http.Request) {
requestCount++
tt.serverHandler(w, r)
}
ts := httptest.NewServer(http.HandlerFunc(handler))
defer ts.Close()

reg := prometheus.NewRegistry()
clientMetrics := &ClientMetrics{
ForwardRemoteWriteRequests: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "forward_write_requests_total",
Help: "Counter of forward remote write requests.",
}, []string{"status_code"}),
}
client := &Client{logger: log.NewNopLogger(), client: ts.Client(), metrics: clientMetrics}

req, err := http.NewRequest("POST", ts.URL, bytes.NewBuffer([]byte{}))
assert.NoError(t, err)

err = client.RemoteWrite(context.Background(), req, tt.families, 30*time.Second)

tt.expect(t, err, requestCount)
})
}
}

func mockMetricFamily() *clientmodel.MetricFamily {
return &clientmodel.MetricFamily{
Name: proto.String("test_metric"),
Type: clientmodel.MetricType_COUNTER.Enum(),
Metric: []*clientmodel.Metric{
{
Counter: &clientmodel.Counter{Value: proto.Float64(1)},
TimestampMs: proto.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
},
},
}
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

require (
github.com/IBM/controller-filtered-cache v0.3.6
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cloudflare/cfssl v1.6.3
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-co-op/gocron v1.37.0
Expand Down Expand Up @@ -75,7 +75,6 @@ require (
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/brancz/locutus v0.0.0-20210511124350-7a84f4d1bcb3 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ github.com/caarlos0/ctrlc v1.0.0/go.mod h1:CdXpj4rmq0q/1Eb44M9zi2nKB0QraNKuRGYGr
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e/go.mod h1:9IOqJGCPMSc6E5ydlp5NIonxObaeu/Iub/X03EKPVYo=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cavaliercoder/go-cpio v0.0.0-20180626203310-925f9528c45e/go.mod h1:oDpT4efm8tSYHXV5tHSdRvBet/b/QzxZ+XyyPehvm3A=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
Expand Down
Loading