Skip to content

Commit

Permalink
Merge branch 'master' into mcs-comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Jun 12, 2024
2 parents 609f4a5 + c015f14 commit b2d087e
Show file tree
Hide file tree
Showing 119 changed files with 1,990 additions and 1,964 deletions.
26 changes: 26 additions & 0 deletions OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- AndreMouche
- binshi-bing
- bufferflies
- CabinfeverB
- Connor1996
- disksing
- huachaohuang
- HunDunDM
- HuSharp
- JmPotato
- lhy1024
- nolouch
- overvenus
- qiuyesuifeng
- rleungx
- siddontang
- Yisaer
- zhouqiang-cl
reviewers:
- BusyJay
- howardlau1999
- Luffbee
- shafreeck
- xhebox
6 changes: 6 additions & 0 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Sort the member alphabetically.
aliases:
sig-critical-approvers-config:
- easonn7
- kevin-xianliu
- niubell
11 changes: 0 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,17 +1431,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
Expand Down
13 changes: 6 additions & 7 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ import (
"github.com/pingcap/errors"
)

// Note: keep the same as the ones defined on the server side to ensure the client can use them correctly.
const (
// NoLeaderErr indicates there is no leader in the cluster currently.
NoLeaderErr = "no leader"
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
NotLeaderErr = "not leader"
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
NotPrimaryErr = "not primary"
)

// client errors
Expand Down
18 changes: 18 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,29 @@
package errs

import (
"strings"

"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// IsLeaderChange will determine whether there is a leader/primary change.
func IsLeaderChange(err error) bool {
if err == nil {
return false
}
if err == ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, NoLeaderErr) ||
strings.Contains(errMsg, NotLeaderErr) ||
strings.Contains(errMsg, MismatchLeaderErr) ||
strings.Contains(errMsg, NotServedErr) ||
strings.Contains(errMsg, NotPrimaryErr)
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.62.1
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
membersPrefix = "/pd/api/v1/members"
leaderPrefix = "/pd/api/v1/leader"
transferLeader = "/pd/api/v1/leader/transfer"
health = "/pd/api/v1/health"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand Down
54 changes: 36 additions & 18 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,47 @@ func (ci *clientInner) requestWithRetry(
headerOpts ...HeaderOption,
) error {
var (
serverURL string
isLeader bool
statusCode int
err error
logFields = append(reqInfo.logFields(), zap.String("source", ci.source))
)
execFunc := func() error {
defer func() {
// If the status code is 503, it indicates that there may be PD leader/follower changes.
// If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
ci.sd.ScheduleCheckMemberChanged()
}
log.Debug("[pd] http request finished", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}()
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
clients := ci.sd.GetAllServiceClients()
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
serverURL = cli.GetURL()
isLeader = cli.IsConnectedToLeader()
if len(reqInfo.targetURL) > 0 && reqInfo.targetURL != serverURL {
skipNum++
continue
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
statusCode, err = ci.doRequest(ctx, serverURL, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
log.Debug("[pd] http request url failed", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
Expand All @@ -153,10 +172,11 @@ func (ci *clientInner) requestWithRetry(
}
// Copy a new backoffer for each request.
bo := *reqInfo.bo
// Backoffer also needs to check the status code to determine whether to retry.
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
// Backoffer also needs to check the status code to determine whether to retry.
return err != nil && !noNeedRetry(statusCode)
})
}, false)
return bo.Exec(ctx, execFunc)
}

Expand All @@ -168,26 +188,21 @@ func noNeedRetry(statusCode int) bool {

func (ci *clientInner) doRequest(
ctx context.Context,
url string, reqInfo *requestInfo,
serverURL string, reqInfo *requestInfo,
headerOpts ...HeaderOption,
) (int, error) {
var (
source = ci.source
callerID = reqInfo.callerID
name = reqInfo.name
method = reqInfo.method
body = reqInfo.body
res = reqInfo.res
respHandler = reqInfo.respHandler
url = reqInfo.getURL(serverURL)
logFields = append(reqInfo.logFields(),
zap.String("source", ci.source),
zap.String("url", url))
)
url = reqInfo.getURL(url)
logFields := []zap.Field{
zap.String("source", source),
zap.String("name", name),
zap.String("url", url),
zap.String("method", method),
zap.String("caller-id", callerID),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body))
if err != nil {
Expand Down Expand Up @@ -228,11 +243,14 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// API server will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
}

log.Error("[pd] request failed with a non-200 status", logFields...)
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s', body: '%s'", resp.Status, bs)
}

if res == nil {
Expand Down
24 changes: 24 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ type Client interface {
GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
GetStore(context.Context, uint64) (*StoreInfo, error)
DeleteStore(context.Context, uint64) error
SetStoreLabels(context.Context, int64, map[string]string) error
GetHealthStatus(context.Context) ([]Health, error)
/* Config-related interfaces */
GetConfig(context.Context) (map[string]any, error)
SetConfig(context.Context, map[string]any, ...float64) error
Expand Down Expand Up @@ -337,6 +339,20 @@ func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels
WithBody(jsonInput))
}

// GetHealthStatus gets the health status of the cluster.
func (c *client) GetHealthStatus(ctx context.Context) ([]Health, error) {
var healths []Health
err := c.request(ctx, newRequestInfo().
WithName(getHealthStatusName).
WithURI(health).
WithMethod(http.MethodGet).
WithResp(&healths))
if err != nil {
return nil, err
}
return healths, nil
}

// GetConfig gets the configurations.
func (c *client) GetConfig(ctx context.Context) (map[string]any, error) {
var config map[string]any
Expand Down Expand Up @@ -425,6 +441,14 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*StoreInfo, erro
return &store, nil
}

// DeleteStore deletes the store by ID.
func (c *client) DeleteStore(ctx context.Context, storeID uint64) error {
return c.request(ctx, newRequestInfo().
WithName(deleteStoreName).
WithURI(StoreByID(storeID)).
WithMethod(http.MethodDelete))
}

// GetClusterVersion gets the cluster version.
func (c *client) GetClusterVersion(ctx context.Context) (string, error) {
var version string
Expand Down
13 changes: 13 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)

// The following constants are the names of the requests.
Expand All @@ -38,7 +39,9 @@ const (
getRegionStatusByKeyRangeName = "GetRegionStatusByKeyRange"
getStoresName = "GetStores"
getStoreName = "GetStore"
deleteStoreName = "DeleteStore"
setStoreLabelsName = "SetStoreLabels"
getHealthStatusName = "GetHealthStatus"
getConfigName = "GetConfig"
setConfigName = "SetConfig"
getScheduleConfigName = "GetScheduleConfig"
Expand Down Expand Up @@ -156,3 +159,13 @@ func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}

func (ri *requestInfo) logFields() []zap.Field {
return []zap.Field{
zap.String("caller-id", ri.callerID),
zap.String("name", ri.name),
zap.String("uri", ri.uri),
zap.String("method", ri.method),
zap.String("target-url", ri.targetURL),
}
}
9 changes: 9 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,12 @@ func stringToKeyspaceState(str string) (keyspacepb.KeyspaceState, error) {
return keyspacepb.KeyspaceState(0), fmt.Errorf("invalid KeyspaceState string: %s", str)
}
}

// Health reflects the cluster's health.
// NOTE: This type is moved from `server/api/health.go`, maybe move them to the same place later.
type Health struct {
Name string `json:"name"`
MemberID uint64 `json:"member_id"`
ClientUrls []string `json:"client_urls"`
Health bool `json:"health"`
}
3 changes: 2 additions & 1 deletion client/pd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/testutil"
"google.golang.org/grpc"
Expand Down Expand Up @@ -205,7 +206,7 @@ func (suite *serviceClientTestSuite) TestServiceClient() {
re.NotNil(leaderConn)

_, err := pb.NewGreeterClient(followerConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.ErrorContains(err, "not leader")
re.ErrorContains(err, errs.NotLeaderErr)
resp, err := pb.NewGreeterClient(leaderConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.NoError(err)
re.Equal("Hello pd", resp.GetMessage())
Expand Down
7 changes: 7 additions & 0 deletions client/resource_group/controller/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# See the OWNERS docs at https://go.k8s.io/owners
options:
no_parent_owners: true
filters:
"(OWNERS|config\\.go)$":
approvers:
- sig-critical-approvers-config
Loading

0 comments on commit b2d087e

Please sign in to comment.