From 7d4c49a9c7bf1ae0ae146008776bfd2afa973e3e Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 4 Dec 2023 13:25:13 +0800 Subject: [PATCH] Introduce caller ID into the HTTP client Signed-off-by: JmPotato --- client/http/client.go | 61 +++++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index 36355a90d192..b7b4b6ee0186 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -79,6 +79,8 @@ type Client interface { GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) /* Client-related methods */ + // WithCallerID sets and returns a new client with the given caller ID. + WithCallerID(string) Client // WithRespHandler sets and returns a new client with the given HTTP response handler. // This allows the caller to customize how the response is handled, including error handling logic. // Additionally, it is important for the caller to handle the content of the response body properly @@ -89,11 +91,27 @@ type Client interface { var _ Client = (*client)(nil) -type client struct { +// clientInner is the inner implementation of the PD HTTP client, which will +// implement some internal logics, such as HTTP client, service discovery, etc. +type clientInner struct { pdAddrs []string tlsConf *tls.Config cli *http.Client +} + +func (c *clientInner) close() { + if c.cli != nil { + c.cli.CloseIdleConnections() + } + log.Info("[pd] http client closed") +} + +type client struct { + // Wrap this struct is to make sure the inner implementation + // won't be exposed and cloud be consistent during the copy. + inner *clientInner + callerID string respHandler func(resp *http.Response, res interface{}) error requestCounter *prometheus.CounterVec @@ -106,7 +124,7 @@ type ClientOption func(c *client) // WithHTTPClient configures the client with the given initialized HTTP client. func WithHTTPClient(cli *http.Client) ClientOption { return func(c *client) { - c.cli = cli + c.inner.cli = cli } } @@ -114,7 +132,7 @@ func WithHTTPClient(cli *http.Client) ClientOption { // This option won't work if the client is configured with WithHTTPClient. func WithTLSConfig(tlsConf *tls.Config) ClientOption { return func(c *client) { - c.tlsConf = tlsConf + c.inner.tlsConf = tlsConf } } @@ -134,7 +152,7 @@ func NewClient( pdAddrs []string, opts ...ClientOption, ) Client { - c := &client{} + c := &client{inner: &clientInner{}} // Apply the options first. for _, opt := range opts { opt(c) @@ -143,7 +161,7 @@ func NewClient( for i, addr := range pdAddrs { if !strings.HasPrefix(addr, httpScheme) { var scheme string - if c.tlsConf != nil { + if c.inner.tlsConf != nil { scheme = httpsScheme } else { scheme = httpScheme @@ -151,14 +169,14 @@ func NewClient( pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr) } } - c.pdAddrs = pdAddrs + c.inner.pdAddrs = pdAddrs // Init the HTTP client if it's not configured. - if c.cli == nil { - c.cli = &http.Client{Timeout: defaultTimeout} - if c.tlsConf != nil { + if c.inner.cli == nil { + c.inner.cli = &http.Client{Timeout: defaultTimeout} + if c.inner.tlsConf != nil { transport := http.DefaultTransport.(*http.Transport).Clone() - transport.TLSClientConfig = c.tlsConf - c.cli.Transport = transport + transport.TLSClientConfig = c.inner.tlsConf + c.inner.cli.Transport = transport } } @@ -167,10 +185,17 @@ func NewClient( // Close closes the HTTP client. func (c *client) Close() { - if c.cli != nil { - c.cli.CloseIdleConnections() + if c.inner == nil { + return } - log.Info("[pd] http client closed") + c.inner.close() +} + +// WithCallerID sets and returns a new client with the given caller ID. +func (c *client) WithCallerID(callerID string) Client { + newClient := *c + newClient.callerID = callerID + return &newClient } // WithRespHandler sets and returns a new client with the given HTTP response handler. @@ -218,8 +243,8 @@ func (c *client) requestWithRetry( err error addr string ) - for idx := 0; idx < len(c.pdAddrs); idx++ { - addr = c.pdAddrs[idx] + for idx := 0; idx < len(c.inner.pdAddrs); idx++ { + addr = c.inner.pdAddrs[idx] err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...) if err == nil { break @@ -239,6 +264,8 @@ func (c *client) request( logFields := []zap.Field{ zap.String("name", name), zap.String("url", url), + zap.String("method", method), + zap.String("caller-id", c.callerID), } log.Debug("[pd] request the http url", logFields...) req, err := http.NewRequestWithContext(ctx, method, url, body) @@ -250,7 +277,7 @@ func (c *client) request( opt(req.Header) } start := time.Now() - resp, err := c.cli.Do(req) + resp, err := c.inner.cli.Do(req) if err != nil { c.reqCounter(name, networkErrorStatus) log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...)