Skip to content

Commit

Permalink
Introduce caller ID into the HTTP client
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 4, 2023
1 parent 3191594 commit 7d4c49a
Showing 1 changed file with 44 additions and 17 deletions.
61 changes: 44 additions & 17 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type Client interface {
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
WithCallerID(string) Client
// WithRespHandler sets and returns a new client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
// Additionally, it is important for the caller to handle the content of the response body properly
Expand All @@ -89,11 +91,27 @@ type Client interface {

var _ Client = (*client)(nil)

type client struct {
// clientInner is the inner implementation of the PD HTTP client, which will
// implement some internal logics, such as HTTP client, service discovery, etc.
type clientInner struct {
pdAddrs []string
tlsConf *tls.Config
cli *http.Client
}

func (c *clientInner) close() {
if c.cli != nil {
c.cli.CloseIdleConnections()
}
log.Info("[pd] http client closed")
}

type client struct {
// Wrap this struct is to make sure the inner implementation
// won't be exposed and cloud be consistent during the copy.
inner *clientInner

callerID string
respHandler func(resp *http.Response, res interface{}) error

requestCounter *prometheus.CounterVec
Expand All @@ -106,15 +124,15 @@ type ClientOption func(c *client)
// WithHTTPClient configures the client with the given initialized HTTP client.
func WithHTTPClient(cli *http.Client) ClientOption {
return func(c *client) {
c.cli = cli
c.inner.cli = cli

Check warning on line 127 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L127

Added line #L127 was not covered by tests
}
}

// WithTLSConfig configures the client with the given TLS config.
// This option won't work if the client is configured with WithHTTPClient.
func WithTLSConfig(tlsConf *tls.Config) ClientOption {
return func(c *client) {
c.tlsConf = tlsConf
c.inner.tlsConf = tlsConf

Check warning on line 135 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L135

Added line #L135 was not covered by tests
}
}

Expand All @@ -134,7 +152,7 @@ func NewClient(
pdAddrs []string,
opts ...ClientOption,
) Client {
c := &client{}
c := &client{inner: &clientInner{}}
// Apply the options first.
for _, opt := range opts {
opt(c)
Expand All @@ -143,22 +161,22 @@ func NewClient(
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, httpScheme) {
var scheme string
if c.tlsConf != nil {
if c.inner.tlsConf != nil {

Check warning on line 164 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L164

Added line #L164 was not covered by tests
scheme = httpsScheme
} else {
scheme = httpScheme
}
pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr)
}
}
c.pdAddrs = pdAddrs
c.inner.pdAddrs = pdAddrs
// Init the HTTP client if it's not configured.
if c.cli == nil {
c.cli = &http.Client{Timeout: defaultTimeout}
if c.tlsConf != nil {
if c.inner.cli == nil {
c.inner.cli = &http.Client{Timeout: defaultTimeout}
if c.inner.tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = c.tlsConf
c.cli.Transport = transport
transport.TLSClientConfig = c.inner.tlsConf
c.inner.cli.Transport = transport

Check warning on line 179 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L178-L179

Added lines #L178 - L179 were not covered by tests
}
}

Expand All @@ -167,10 +185,17 @@ func NewClient(

// Close closes the HTTP client.
func (c *client) Close() {
if c.cli != nil {
c.cli.CloseIdleConnections()
if c.inner == nil {
return

Check warning on line 189 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L189

Added line #L189 was not covered by tests
}
log.Info("[pd] http client closed")
c.inner.close()
}

// WithCallerID sets and returns a new client with the given caller ID.
func (c *client) WithCallerID(callerID string) Client {
newClient := *c
newClient.callerID = callerID
return &newClient

Check warning on line 198 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L196-L198

Added lines #L196 - L198 were not covered by tests
}

// WithRespHandler sets and returns a new client with the given HTTP response handler.
Expand Down Expand Up @@ -218,8 +243,8 @@ func (c *client) requestWithRetry(
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
for idx := 0; idx < len(c.inner.pdAddrs); idx++ {
addr = c.inner.pdAddrs[idx]
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...)
if err == nil {
break
Expand All @@ -239,6 +264,8 @@ func (c *client) request(
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", url),
zap.String("method", method),
zap.String("caller-id", c.callerID),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, url, body)
Expand All @@ -250,7 +277,7 @@ func (c *client) request(
opt(req.Header)
}
start := time.Now()
resp, err := c.cli.Do(req)
resp, err := c.inner.cli.Do(req)
if err != nil {
c.reqCounter(name, networkErrorStatus)
log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...)
Expand Down

0 comments on commit 7d4c49a

Please sign in to comment.