Skip to content

Commit

Permalink
update retry logic on forward
Browse files Browse the repository at this point in the history
Signed-off-by: Thibault Mange <[email protected]>
  • Loading branch information
thibaultmg authored and openshift-cherrypick-robot committed Dec 4, 2024
1 parent b5fc667 commit d69a6c4
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 20 deletions.
5 changes: 5 additions & 0 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ func (w *Worker) LastMetrics() []*clientmodel.MetricFamily {
}

func (w *Worker) Run(ctx context.Context) {
// Forward metrics immediately on startup.
if err := w.forward(ctx); err != nil {
rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err)
}

ticker := time.NewTicker(w.interval)
defer ticker.Stop()

Expand Down
62 changes: 45 additions & 17 deletions collectors/metrics/pkg/metricsclient/metricsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -503,12 +504,13 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
}
b.MaxElapsedTime = interval / time.Duration(halfInterval)
retryable := func() error {
return c.sendRequest(req.URL.String(), compressed)
return c.sendRequest(ctx, req.URL.String(), compressed)
}
notify := func(err error, t time.Duration) {
msg := fmt.Sprintf("error: %v happened at time: %v", err, t)
logger.Log(c.logger, logger.Warn, "msg", msg)
}

err = backoff.RetryNotify(retryable, b, notify)
if err != nil {
return err
Expand All @@ -518,43 +520,69 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
return nil
}

func (c *Client) sendRequest(serverURL string, body []byte) error {
func (c *Client) sendRequest(ctx context.Context, serverURL string, body []byte) error {
req1, err := http.NewRequest(http.MethodPost, serverURL, bytes.NewBuffer(body))
if err != nil {
msg := "failed to create forwarding request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
wrappedErr := fmt.Errorf("failed to create forwarding request: %w", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)
return backoff.Permanent(wrappedErr)
}

// req.Header.Add("THANOS-TENANT", tenantID)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

req1 = req1.WithContext(ctx)

resp, err := c.client.Do(req1)
if err != nil {
msg := "failed to forward request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)

wrappedErr := fmt.Errorf("failed to forward request: %w", err)
if isTransientError(err) {
return wrappedErr
}

return backoff.Permanent(wrappedErr)
}

c.metrics.ForwardRemoteWriteRequests.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()

if resp.StatusCode/100 != 2 {
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// surfacing upstreams error to our users too
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logger.Log(c.logger, logger.Warn, err)
}
bodyString := string(bodyBytes)
msg := fmt.Sprintf("response status code is %s, response body is %s", resp.Status, bodyString)
logger.Log(c.logger, logger.Warn, msg)
return errors.New(msg)

retErr := fmt.Errorf("response status code is %s, response body is %s", resp.Status, string(bodyBytes))

if isTransientResponseError(resp) {
return retErr
}

return backoff.Permanent(retErr)
}

return nil
}

func isTransientError(err error) bool {
if urlErr, ok := err.(*url.Error); ok {
return urlErr.Timeout()
}

return false
}

func isTransientResponseError(resp *http.Response) bool {
if resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented {
return true
}

if resp.StatusCode == http.StatusTooManyRequests {
return true
}

return false
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

require (
github.com/IBM/controller-filtered-cache v0.3.6
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cloudflare/cfssl v1.6.3
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-co-op/gocron v1.37.0
Expand Down Expand Up @@ -75,7 +75,6 @@ require (
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/brancz/locutus v0.0.0-20210511124350-7a84f4d1bcb3 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ github.com/caarlos0/ctrlc v1.0.0/go.mod h1:CdXpj4rmq0q/1Eb44M9zi2nKB0QraNKuRGYGr
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e/go.mod h1:9IOqJGCPMSc6E5ydlp5NIonxObaeu/Iub/X03EKPVYo=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cavaliercoder/go-cpio v0.0.0-20180626203310-925f9528c45e/go.mod h1:oDpT4efm8tSYHXV5tHSdRvBet/b/QzxZ+XyyPehvm3A=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
Expand Down

0 comments on commit d69a6c4

Please sign in to comment.