diff --git a/.golangci.yml b/.golangci.yml index e8acff6fa553..0e5028634aee 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -42,10 +42,13 @@ linters-settings: - require-error - suite-dont-use-pkg - suite-extra-assert-call + disable: + - float-compare + - go-require gofmt: # https://golangci-lint.run/usage/linters/#gofmt # disable for faster check simplify: false rewrite-rules: - - pattern: 'interface{}' - replacement: 'any' + - pattern: "interface{}" + replacement: "any" diff --git a/Makefile b/Makefile index 5d22d522d45a..0d02189f508b 100644 --- a/Makefile +++ b/Makefile @@ -167,7 +167,7 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) install-tools: @mkdir -p $(GO_TOOLS_BIN_PATH) - @which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.55.2 + @which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.56.2 @grep '_' tools.go | sed 's/"//g' | awk '{print $$2}' | xargs go install .PHONY: install-tools diff --git a/client/client.go b/client/client.go index ba7ac7fd075a..5231d9f3497d 100644 --- a/client/client.go +++ b/client/client.go @@ -519,7 +519,7 @@ func newClientWithKeyspaceName( return nil } - // Create a PD service discovery with null keyspace id, then query the real id wth the keyspace name, + // Create a PD service discovery with null keyspace id, then query the real id with the keyspace name, // finally update the keyspace id to the PD service discovery for the following interactions. c.pdSvcDiscovery = newPDServiceDiscovery( clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) @@ -702,6 +702,9 @@ func (c *client) UpdateOption(option DynamicOption, value any) error { return err } case EnableTSOFollowerProxy: + if c.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE { + return errors.New("[pd] tso follower proxy is only supported in PD service mode") + } enable, ok := value.(bool) if !ok { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") @@ -788,10 +791,10 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur return req } - if err := tsoClient.dispatchRequest(dcLocation, req); err != nil { + if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil { // Wait for a while and try again time.Sleep(50 * time.Millisecond) - if err = tsoClient.dispatchRequest(dcLocation, req); err != nil { + if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil { req.done <- err } } diff --git a/client/grpcutil/grpcutil.go b/client/grpcutil/grpcutil.go index b6be2594b4df..742ee872b072 100644 --- a/client/grpcutil/grpcutil.go +++ b/client/grpcutil/grpcutil.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/client/errs" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" ) @@ -62,7 +63,18 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g if err != nil { return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() } - cc, err := grpc.DialContext(ctx, u.Host, append(do, opt)...) + // Here we use a shorter MaxDelay to make the connection recover faster. + // The default MaxDelay is 120s, which is too long for us. + backoffOpts := grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.6, + Jitter: 0.2, + MaxDelay: 3 * time.Second, + }, + }) + do = append(do, opt, backoffOpts) + cc, err := grpc.DialContext(ctx, u.Host, do...) if err != nil { return nil, errs.ErrGRPCDial.Wrap(err).GenWithStackByCause() } diff --git a/client/http/interface.go b/client/http/interface.go index c7bfc37958d6..06beba85a450 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -91,7 +91,7 @@ type Client interface { GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) GetPDVersion(context.Context) (string, error) /* Micro Service interfaces */ - GetMicroServiceMembers(context.Context, string) ([]string, error) + GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error) GetMicroServicePrimary(context.Context, string) (string, error) DeleteOperators(context.Context) error @@ -856,8 +856,8 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin } // GetMicroServiceMembers gets the members of the microservice. -func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]string, error) { - var members []string +func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]MicroServiceMember, error) { + var members []MicroServiceMember err := c.request(ctx, newRequestInfo(). WithName(getMicroServiceMembersName). WithURI(MicroServiceMembers(service)). diff --git a/client/http/types.go b/client/http/types.go index aaf7e1c00275..56ad0427f126 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -592,3 +592,12 @@ type MembersInfo struct { Leader *pdpb.Member `json:"leader,omitempty"` EtcdLeader *pdpb.Member `json:"etcd_leader,omitempty"` } + +// MicroServiceMember is the member info of a micro service. +type MicroServiceMember struct { + ServiceAddr string `json:"service-addr"` + Version string `json:"version"` + GitHash string `json:"git-hash"` + DeployPath string `json:"deploy-path"` + StartTimestamp int64 `json:"start-timestamp"` +} diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index bc446440215a..2c08f3ae833e 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -784,7 +784,7 @@ func (c *pdServiceDiscovery) GetServiceClient() ServiceClient { return leaderClient } -// GetAllServiceClients implments ServiceDiscovery +// GetAllServiceClients implements ServiceDiscovery func (c *pdServiceDiscovery) GetAllServiceClients() []ServiceClient { all := c.all.Load() if all == nil { diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index cb4c243a56f3..3930c9b1f333 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -73,7 +73,7 @@ func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { } } -func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) error { +func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, request *tsoRequest) error { dispatcher, ok := c.tsoDispatcher.Load(dcLocation) if !ok { err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation)) @@ -83,7 +83,11 @@ func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) erro } defer trace.StartRegion(request.requestCtx, "tsoReqEnqueue").End() - dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request + select { + case <-ctx.Done(): + return ctx.Err() + case dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request: + } return nil } @@ -311,6 +315,14 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) { make(chan *tsoRequest, defaultMaxTSOBatchSize*2), defaultMaxTSOBatchSize), } + failpoint.Inject("shortDispatcherChannel", func() { + dispatcher = &tsoDispatcher{ + dispatcherCancel: dispatcherCancel, + tsoBatchController: newTSOBatchController( + make(chan *tsoRequest, 1), + defaultMaxTSOBatchSize), + } + }) if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { // Successfully stored the value. Start the following goroutine. @@ -372,6 +384,9 @@ func (c *tsoClient) handleDispatcher( return case <-c.option.enableTSOFollowerProxyCh: enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + log.Info("[tso] tso follower proxy status changed", + zap.String("dc-location", dc), + zap.Bool("enable", enableTSOFollowerProxy)) if enableTSOFollowerProxy && updateTicker.C == nil { // Because the TSO Follower Proxy is enabled, // the periodic check needs to be performed. @@ -412,7 +427,7 @@ tsoBatchLoop: } else { log.Error("[tso] fetch pending tso requests error", zap.String("dc-location", dc), - errs.ZapError(errs.ErrClientGetTSO, err)) + zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error()))) } return } @@ -498,7 +513,7 @@ tsoBatchLoop: log.Error("[tso] getTS error after processing requests", zap.String("dc-location", dc), zap.String("stream-addr", streamAddr), - errs.ZapError(errs.ErrClientGetTSO, err)) + zap.Error(errs.ErrClientGetTSO.FastGenByArgs(err.Error()))) // Set `stream` to nil and remove this stream from the `connectionCtxs` due to error. connectionCtxs.Delete(streamAddr) cancel() @@ -701,7 +716,11 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s } // GC the stale one. connectionCtxs.Range(func(addr, cc any) bool { - if _, ok := tsoStreamBuilders[addr.(string)]; !ok { + addrStr := addr.(string) + if _, ok := tsoStreamBuilders[addrStr]; !ok { + log.Info("[tso] remove the stale tso stream", + zap.String("dc", dc), + zap.String("addr", addrStr)) cc.(*tsoConnectionContext).cancel() connectionCtxs.Delete(addr) } @@ -712,6 +731,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s if _, ok = connectionCtxs.Load(addr); ok { continue } + log.Info("[tso] try to create tso stream", + zap.String("dc", dc), zap.String("addr", addr)) cctx, cancel := context.WithCancel(dispatcherCtx) // Do not proxy the leader client. if addr != leaderAddr { diff --git a/go.mod b/go.mod index e438228a7283..6e7c411802f7 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 + github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20240111062855-41f7c8011953 diff --git a/go.sum b/go.sum index 0e308d173a0e..9debe93e2520 100644 --- a/go.sum +++ b/go.sum @@ -433,8 +433,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug= -github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo= +github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/btree/btree_generic.go b/pkg/btree/btree_generic.go index 630cb25abcd2..f918a8ac6862 100644 --- a/pkg/btree/btree_generic.go +++ b/pkg/btree/btree_generic.go @@ -821,7 +821,7 @@ type copyOnWriteContext[T Item[T]] struct { // The internal tree structure of b is marked read-only and shared between t and // t2. Writes to both t and t2 use copy-on-write logic, creating new nodes // whenever one of b's original nodes would have been modified. Read operations -// should have no performance degredation. Write operations for both t and t2 +// should have no performance degradation. Write operations for both t and t2 // will initially experience minor slow-downs caused by additional allocs and // copies due to the aforementioned copy-on-write logic, but should converge to // the original performance characteristics of the original tree. diff --git a/pkg/cache/fifo.go b/pkg/cache/fifo.go index 03bc6631eab7..d544cdaec1cc 100644 --- a/pkg/cache/fifo.go +++ b/pkg/cache/fifo.go @@ -89,7 +89,7 @@ func (c *FIFO) FromElems(key uint64) []*Item { return elems } -// FromLastSameElems returns continuous items that have the same comparable attribute with the the lastest one. +// FromLastSameElems returns continuous items that have the same comparable attribute with the last one. func (c *FIFO) FromLastSameElems(checkFunc func(any) (bool, string)) []*Item { c.RLock() defer c.RUnlock() diff --git a/pkg/cgroup/cgroup_cpu_test.go b/pkg/cgroup/cgroup_cpu_test.go index f0b9239ecab3..c373f8032105 100644 --- a/pkg/cgroup/cgroup_cpu_test.go +++ b/pkg/cgroup/cgroup_cpu_test.go @@ -46,7 +46,7 @@ func checkKernelVersionNewerThan(re *require.Assertions, t *testing.T, major, mi re.Len(kernelVersion, 1, fmt.Sprintf("release str is %s", releaseStr)) kernelVersionPartRE := regexp.MustCompile(`[0-9]+`) kernelVersionParts := kernelVersionPartRE.FindAllString(kernelVersion[0], -1) - re.Len(kernelVersionParts, 3, fmt.Sprintf("kernel verion str is %s", kernelVersion[0])) + re.Len(kernelVersionParts, 3, fmt.Sprintf("kernel version str is %s", kernelVersion[0])) t.Logf("parsed kernel version parts: major %s, minor %s, patch %s", kernelVersionParts[0], kernelVersionParts[1], kernelVersionParts[2]) mustConvInt := func(s string) int { diff --git a/pkg/core/rangetree/range_tree_test.go b/pkg/core/rangetree/range_tree_test.go index 29845cf0bca1..0664a7bdbefe 100644 --- a/pkg/core/rangetree/range_tree_test.go +++ b/pkg/core/rangetree/range_tree_test.go @@ -73,11 +73,11 @@ func bucketDebrisFactory(startKey, endKey []byte, item RangeItem) []RangeItem { if bytes.Compare(left, right) >= 0 { return nil } - // the left has oen intersection like |010 - 100| and |020 - 100|. + // the left has one intersection like |010 - 100| and |020 - 100|. if !bytes.Equal(item.GetStartKey(), left) { res = append(res, newSimpleBucketItem(item.GetStartKey(), left)) } - // the right has oen intersection like |010 - 100| and |010 - 099|. + // the right has one intersection like |010 - 100| and |010 - 099|. if !bytes.Equal(right, item.GetEndKey()) { res = append(res, newSimpleBucketItem(right, item.GetEndKey())) } diff --git a/pkg/core/region.go b/pkg/core/region.go index 66f918137de5..8a552859fb22 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1786,16 +1786,16 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 { // ValidRegion is used to decide if the region is valid. func (r *RegionsInfo) ValidRegion(region *metapb.Region) error { startKey := region.GetStartKey() - currnetRegion := r.GetRegionByKey(startKey) - if currnetRegion == nil { + currentRegion := r.GetRegionByKey(startKey) + if currentRegion == nil { return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region))) } // If the request epoch is less than current region epoch, then returns an error. regionEpoch := region.GetRegionEpoch() - currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch() - if regionEpoch.GetVersion() < currnetEpoch.GetVersion() || - regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() { - return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch) + currentEpoch := currentRegion.GetMeta().GetRegionEpoch() + if regionEpoch.GetVersion() < currentEpoch.GetVersion() || + regionEpoch.GetConfVer() < currentEpoch.GetConfVer() { + return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currentEpoch) } return nil } @@ -1884,19 +1884,19 @@ func EncodeToString(src []byte) []byte { return dst } -// HexRegionKey converts region key to hex format. Used for formating region in +// HexRegionKey converts region key to hex format. Used for formatting region in // logs. func HexRegionKey(key []byte) []byte { return ToUpperASCIIInplace(EncodeToString(key)) } -// HexRegionKeyStr converts region key to hex format. Used for formating region in +// HexRegionKeyStr converts region key to hex format. Used for formatting region in // logs. func HexRegionKeyStr(key []byte) string { return String(HexRegionKey(key)) } -// RegionToHexMeta converts a region meta's keys to hex format. Used for formating +// RegionToHexMeta converts a region meta's keys to hex format. Used for formatting // region in logs. func RegionToHexMeta(meta *metapb.Region) HexRegionMeta { if meta == nil { @@ -1905,7 +1905,7 @@ func RegionToHexMeta(meta *metapb.Region) HexRegionMeta { return HexRegionMeta{meta} } -// HexRegionMeta is a region meta in the hex format. Used for formating region in logs. +// HexRegionMeta is a region meta in the hex format. Used for formatting region in logs. type HexRegionMeta struct { *metapb.Region } @@ -1917,7 +1917,7 @@ func (h HexRegionMeta) String() string { return strings.TrimSpace(proto.CompactTextString(meta)) } -// RegionsToHexMeta converts regions' meta keys to hex format. Used for formating +// RegionsToHexMeta converts regions' meta keys to hex format. Used for formatting // region in logs. func RegionsToHexMeta(regions []*metapb.Region) HexRegionsMeta { hexRegionMetas := make([]*metapb.Region, len(regions)) @@ -1925,7 +1925,7 @@ func RegionsToHexMeta(regions []*metapb.Region) HexRegionsMeta { return hexRegionMetas } -// HexRegionsMeta is a slice of regions' meta in the hex format. Used for formating +// HexRegionsMeta is a slice of regions' meta in the hex format. Used for formatting // region in logs. type HexRegionsMeta []*metapb.Region diff --git a/pkg/core/store.go b/pkg/core/store.go index 1d3362cac0e4..9b660754496e 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -600,22 +600,36 @@ func DistinctScore(labels []string, stores []*StoreInfo, other *StoreInfo) float return score } -// MergeLabels merges the passed in labels with origins, overriding duplicated -// ones. +// MergeLabels merges the passed in labels with origins, overriding duplicated ones. +// Note: To prevent potential data races, it is advisable to refrain from directly modifying the 'origin' variable. func MergeLabels(origin []*metapb.StoreLabel, labels []*metapb.StoreLabel) []*metapb.StoreLabel { - storeLabels := origin -L: + results := make([]*metapb.StoreLabel, 0, len(origin)) + for _, label := range origin { + results = append(results, &metapb.StoreLabel{ + Key: label.Key, + Value: label.Value, + }) + } + for _, newLabel := range labels { - for _, label := range storeLabels { + found := false + for _, label := range results { if strings.EqualFold(label.Key, newLabel.Key) { + // Update the value for an existing key. label.Value = newLabel.Value - continue L + found = true + break } } - storeLabels = append(storeLabels, newLabel) + // Add a new label if the key doesn't exist in the original slice. + if !found { + results = append(results, newLabel) + } } - res := storeLabels[:0] - for _, l := range storeLabels { + + // Filter out labels with an empty value. + res := results[:0] + for _, l := range results { if l.Value != "" { res = append(res, l) } diff --git a/pkg/core/storelimit/limit_test.go b/pkg/core/storelimit/limit_test.go index 946729f8ce25..758653303114 100644 --- a/pkg/core/storelimit/limit_test.go +++ b/pkg/core/storelimit/limit_test.go @@ -101,18 +101,18 @@ func TestWindow(t *testing.T) { token := capacity + 10 re.True(s.take(token)) re.False(s.take(token)) - re.EqualValues(s.ack(token), 0) + re.EqualValues(0, s.ack(token)) re.True(s.take(token)) - re.EqualValues(s.ack(token), 0) + re.EqualValues(0, s.ack(token)) re.Equal(s.ack(token), token) - re.EqualValues(s.getUsed(), 0) + re.EqualValues(0, s.getUsed()) // case2: the capacity of the window must greater than the minSnapSize. s.reset(minSnapSize - 1) - re.EqualValues(s.capacity, minSnapSize) + re.EqualValues(minSnapSize, s.capacity) re.True(s.take(minSnapSize)) - re.EqualValues(s.ack(minSnapSize*2), minSnapSize) - re.EqualValues(s.getUsed(), 0) + re.EqualValues(minSnapSize, s.ack(minSnapSize*2)) + re.EqualValues(0, s.getUsed()) } func TestFeedback(t *testing.T) { diff --git a/pkg/encryption/crypter.go b/pkg/encryption/crypter.go index 16de0500f925..b1f8631ae26d 100644 --- a/pkg/encryption/crypter.go +++ b/pkg/encryption/crypter.go @@ -82,7 +82,7 @@ func newIV(ivLength int) ([]byte, error) { } if n != ivLength { return nil, errs.ErrEncryptionGenerateIV.GenWithStack( - "iv length exepcted %d vs actual %d", ivLength, n) + "iv length expected %d vs actual %d", ivLength, n) } return iv, nil } diff --git a/pkg/encryption/key_manager_test.go b/pkg/encryption/key_manager_test.go index 96bdb3c0eb55..74f8b9a3b470 100644 --- a/pkg/encryption/key_manager_test.go +++ b/pkg/encryption/key_manager_test.go @@ -509,7 +509,7 @@ func TestSetLeadershipWithEncryptionMethodChanged(t *testing.T) { } err := saveKeys(leadership, masterKeyMeta, keys, defaultKeyManagerHelper()) re.NoError(err) - // Config with different encrption method. + // Config with different encryption method. config := &Config{ DataEncryptionMethod: "aes256-ctr", MasterKey: MasterKeyConfig{ @@ -579,7 +579,7 @@ func TestSetLeadershipWithCurrentKeyExposed(t *testing.T) { } err := saveKeys(leadership, masterKeyMeta, keys, defaultKeyManagerHelper()) re.NoError(err) - // Config with different encrption method. + // Config with different encryption method. config := &Config{ DataEncryptionMethod: "aes128-ctr", MasterKey: MasterKeyConfig{ diff --git a/pkg/encryption/kms.go b/pkg/encryption/kms.go index 3e70b2deeb52..7c52b4280c20 100644 --- a/pkg/encryption/kms.go +++ b/pkg/encryption/kms.go @@ -90,7 +90,7 @@ func newMasterKeyFromKMS( } if len(output.Plaintext) != masterKeyLength { return nil, errs.ErrEncryptionKMS.GenWithStack( - "unexpected data key length generated from AWS KMS, expectd %d vs actual %d", + "unexpected data key length generated from AWS KMS, expected %d vs actual %d", masterKeyLength, len(output.Plaintext)) } masterKey = &MasterKey{ diff --git a/pkg/encryption/region_crypter.go b/pkg/encryption/region_crypter.go index 346e8a08da0e..458c5b67d7ba 100644 --- a/pkg/encryption/region_crypter.go +++ b/pkg/encryption/region_crypter.go @@ -41,7 +41,7 @@ func processRegionKeys(region *metapb.Region, key *encryptionpb.DataKey, iv []by } // EncryptRegion encrypt the region start key and end key, using the current key return from the -// key manager. The return is an encypted copy of the region, with Encryption meta updated. +// key manager. The return is an encrypted copy of the region, with Encryption meta updated. func EncryptRegion(region *metapb.Region, keyManager KeyManager) (*metapb.Region, error) { if region == nil { return nil, errs.ErrEncryptionEncryptRegion.GenWithStack("trying to encrypt nil region") diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 89c45497a876..1ce5ecda51dd 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -45,7 +45,7 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er } // GetMSMembers returns all the members of the specified service name. -func GetMSMembers(name string, client *clientv3.Client) ([]string, error) { +func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) { switch name { case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName: clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) @@ -61,7 +61,7 @@ func GetMSMembers(name string, client *clientv3.Client) ([]string, error) { return nil, errs.ErrEtcdTxnConflict.FastGenByArgs() } - var addrs []string + var entries []ServiceRegistryEntry for _, resp := range resps.Responses { for _, keyValue := range resp.GetResponseRange().GetKvs() { var entry ServiceRegistryEntry @@ -69,10 +69,10 @@ func GetMSMembers(name string, client *clientv3.Client) ([]string, error) { log.Error("try to deserialize service registry entry failed", zap.String("key", string(keyValue.Key)), zap.Error(err)) continue } - addrs = append(addrs, entry.ServiceAddr) + entries = append(entries, entry) } } - return addrs, nil + return entries, nil } return nil, errors.Errorf("unknown service name %s", name) diff --git a/pkg/mcs/discovery/registry_entry.go b/pkg/mcs/discovery/registry_entry.go index 52751b430c44..bf11ae5c8a40 100644 --- a/pkg/mcs/discovery/registry_entry.go +++ b/pkg/mcs/discovery/registry_entry.go @@ -23,7 +23,11 @@ import ( // ServiceRegistryEntry is the registry entry of a service type ServiceRegistryEntry struct { - ServiceAddr string `json:"service-addr"` + ServiceAddr string `json:"service-addr"` + Version string `json:"version"` + GitHash string `json:"git-hash"` + DeployPath string `json:"deploy-path"` + StartTimestamp int64 `json:"start-timestamp"` } // Serialize this service registry entry diff --git a/pkg/mcs/metastorage/server/grpc_service.go b/pkg/mcs/metastorage/server/grpc_service.go index 3da079e6109a..f5de50765e85 100644 --- a/pkg/mcs/metastorage/server/grpc_service.go +++ b/pkg/mcs/metastorage/server/grpc_service.go @@ -20,10 +20,12 @@ import ( "net/http" "github.com/pingcap/kvproto/pkg/meta_storagepb" + "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -88,12 +90,13 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb. } ctx, cancel := context.WithCancel(server.Context()) defer cancel() - options := []clientv3.OpOption{} + var options []clientv3.OpOption key := string(req.GetKey()) var startRevision int64 if endKey := req.GetRangeEnd(); endKey != nil { options = append(options, clientv3.WithRange(string(endKey))) } + log.Info("watch request", zap.String("key", key), zap.String("range-end", string(req.GetRangeEnd())), zap.Int64("start-revision", req.GetStartRevision())) if startRevision = req.GetStartRevision(); startRevision != 0 { options = append(options, clientv3.WithRev(startRevision)) } @@ -126,8 +129,8 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb. return res.Err() } - events := make([]*meta_storagepb.Event, 0, len(res.Events)) - for _, e := range res.Events { + events := make([]*meta_storagepb.Event, len(res.Events)) + for i, e := range res.Events { event := &meta_storagepb.Event{Kv: &meta_storagepb.KeyValue{ Key: e.Kv.Key, Value: e.Kv.Value, @@ -139,7 +142,7 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb. if e.PrevKv != nil { event.PrevKv = &meta_storagepb.KeyValue{Key: e.PrevKv.Key, Value: e.PrevKv.Value} } - events = append(events, event) + events[i] = event } if len(events) > 0 { if err := server.Send(&meta_storagepb.WatchResponse{ @@ -159,7 +162,7 @@ func (s *Service) Get(ctx context.Context, req *meta_storagepb.GetRequest) (*met } ctx, cancel := context.WithCancel(ctx) defer cancel() - options := []clientv3.OpOption{} + var options []clientv3.OpOption key := string(req.GetKey()) if endKey := req.GetRangeEnd(); endKey != nil { options = append(options, clientv3.WithRange(string(endKey))) @@ -184,8 +187,9 @@ func (s *Service) Get(ctx context.Context, req *meta_storagepb.GetRequest) (*met Count: res.Count, More: res.More, } - for _, kv := range res.Kvs { - resp.Kvs = append(resp.Kvs, &meta_storagepb.KeyValue{Key: kv.Key, Value: kv.Value}) + resp.Kvs = make([]*meta_storagepb.KeyValue, len(res.Kvs)) + for i, kv := range res.Kvs { + resp.Kvs[i] = &meta_storagepb.KeyValue{Key: kv.Key, Value: kv.Value} } return resp, nil @@ -198,7 +202,7 @@ func (s *Service) Put(ctx context.Context, req *meta_storagepb.PutRequest) (*met } ctx, cancel := context.WithCancel(ctx) defer cancel() - options := []clientv3.OpOption{} + var options []clientv3.OpOption key := string(req.GetKey()) value := string(req.GetValue()) if lease := clientv3.LeaseID(req.GetLease()); lease != 0 { @@ -227,6 +231,39 @@ func (s *Service) Put(ctx context.Context, req *meta_storagepb.PutRequest) (*met return resp, nil } +// Delete deletes the key-value pair from meta storage. +func (s *Service) Delete(ctx context.Context, req *meta_storagepb.DeleteRequest) (*meta_storagepb.DeleteResponse, error) { + if err := s.checkServing(); err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var options []clientv3.OpOption + key := string(req.GetKey()) + if prevKv := req.GetPrevKv(); prevKv { + options = append(options, clientv3.WithPrevKV()) + } + + cli := s.manager.GetClient() + res, err := cli.Delete(ctx, key, options...) + var revision int64 + if res != nil { + revision = res.Header.GetRevision() + } + if err != nil { + return &meta_storagepb.DeleteResponse{Header: s.wrapErrorAndRevision(revision, meta_storagepb.ErrorType_UNKNOWN, err.Error())}, nil + } + + resp := &meta_storagepb.DeleteResponse{ + Header: &meta_storagepb.ResponseHeader{ClusterId: s.manager.ClusterID(), Revision: revision}, + } + resp.PrevKvs = make([]*meta_storagepb.KeyValue, len(res.PrevKvs)) + for i, kv := range res.PrevKvs { + resp.PrevKvs[i] = &meta_storagepb.KeyValue{Key: kv.Key, Value: kv.Value} + } + return resp, nil +} + func (s *Service) wrapErrorAndRevision(revision int64, errorType meta_storagepb.ErrorType, message string) *meta_storagepb.ResponseHeader { return s.errorHeader(revision, &meta_storagepb.Error{ Type: errorType, diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 0ee6003e2f94..738140612b86 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -413,7 +413,17 @@ func (s *Server) startServer() (err error) { // different service modes provided by the same pd-server binary bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) - s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} + deployPath, err := os.Executable() + if err != nil { + deployPath = "" + } + s.serviceID = &discovery.ServiceRegistryEntry{ + ServiceAddr: s.cfg.AdvertiseListenAddr, + Version: versioninfo.PDReleaseVersion, + GitHash: versioninfo.PDGitHash, + DeployPath: deployPath, + StartTimestamp: s.StartTimestamp(), + } uniqueName := s.cfg.GetAdvertiseListenAddr() uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index 04418283007c..44f4b353d58e 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -95,7 +95,8 @@ func NewService(srv *tsoserver.Service) *Service { } s.RegisterAdminRouter() s.RegisterKeyspaceGroupRouter() - s.RegisterHealth() + s.RegisterHealthRouter() + s.RegisterConfigRouter() return s } @@ -112,12 +113,18 @@ func (s *Service) RegisterKeyspaceGroupRouter() { router.GET("/members", GetKeyspaceGroupMembers) } -// RegisterHealth registers the router of the health handler. -func (s *Service) RegisterHealth() { +// RegisterHealthRouter registers the router of the health handler. +func (s *Service) RegisterHealthRouter() { router := s.root.Group("health") router.GET("", GetHealth) } +// RegisterConfigRouter registers the router of the config handler. +func (s *Service) RegisterConfigRouter() { + router := s.root.Group("config") + router.GET("", getConfig) +} + func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) var level string @@ -248,3 +255,13 @@ func GetKeyspaceGroupMembers(c *gin.Context) { } c.IndentedJSON(http.StatusOK, members) } + +// @Tags config +// @Summary Get full config. +// @Produce json +// @Success 200 {object} config.Config +// @Router /config [get] +func getConfig(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + c.IndentedJSON(http.StatusOK, svr.GetConfig()) +} diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 92ffc6603c32..4df43e1ebfcd 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -368,7 +368,17 @@ func (s *Server) startServer() (err error) { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID) tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID) - s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} + deployPath, err := os.Executable() + if err != nil { + deployPath = "" + } + s.serviceID = &discovery.ServiceRegistryEntry{ + ServiceAddr: s.cfg.AdvertiseListenAddr, + Version: versioninfo.PDReleaseVersion, + GitHash: versioninfo.PDGitHash, + DeployPath: deployPath, + StartTimestamp: s.StartTimestamp(), + } s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr, discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg) diff --git a/pkg/movingaverage/weight_allocator.go b/pkg/movingaverage/weight_allocator.go index 06be4616a855..f63ce377e08a 100644 --- a/pkg/movingaverage/weight_allocator.go +++ b/pkg/movingaverage/weight_allocator.go @@ -37,7 +37,7 @@ func NewWeightAllocator(length, segNum int) *WeightAllocator { segLength := length / segNum // segMod is used for split seg when is length not divisible by segNum. segMod := length % segNum - segIndexs := make([]int, 0, segNum) + segIndexes := make([]int, 0, segNum) weights := make([]float64, 0, length) unitCount := 0 for i := 0; i < segNum; i++ { @@ -46,11 +46,11 @@ func NewWeightAllocator(length, segNum int) *WeightAllocator { next++ } unitCount += (segNum - i) * next - segIndexs = append(segIndexs, next) + segIndexes = append(segIndexes, next) } unitWeight := 1.0 / float64(unitCount) for i := 0; i < segNum; i++ { - for j := 0; j < segIndexs[i]; j++ { + for j := 0; j < segIndexes[i]; j++ { weights = append(weights, unitWeight*float64(segNum-i)) } } diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 345e4928c413..9354d21b0e94 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -30,13 +30,13 @@ const speedStatisticalWindow = 10 * time.Minute // Manager is used to maintain the progresses we care about. type Manager struct { syncutil.RWMutex - progesses map[string]*progressIndicator + progresses map[string]*progressIndicator } // NewManager creates a new Manager. func NewManager() *Manager { return &Manager{ - progesses: make(map[string]*progressIndicator), + progresses: make(map[string]*progressIndicator), } } @@ -59,7 +59,7 @@ func (m *Manager) Reset() { m.Lock() defer m.Unlock() - m.progesses = make(map[string]*progressIndicator) + m.progresses = make(map[string]*progressIndicator) } // AddProgress adds a progress into manager if it doesn't exist. @@ -69,8 +69,8 @@ func (m *Manager) AddProgress(progress string, current, total float64, updateInt history := list.New() history.PushBack(current) - if _, exist = m.progesses[progress]; !exist { - m.progesses[progress] = &progressIndicator{ + if _, exist = m.progresses[progress]; !exist { + m.progresses[progress] = &progressIndicator{ total: total, remaining: total, history: history, @@ -86,7 +86,7 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is m.Lock() defer m.Unlock() - if p, exist := m.progesses[progress]; exist { + if p, exist := m.progresses[progress]; exist { p.remaining = remaining if p.total < remaining { p.total = remaining @@ -120,7 +120,7 @@ func (m *Manager) UpdateProgressTotal(progress string, total float64) { m.Lock() defer m.Unlock() - if p, exist := m.progesses[progress]; exist { + if p, exist := m.progresses[progress]; exist { p.total = total } } @@ -130,8 +130,8 @@ func (m *Manager) RemoveProgress(progress string) (exist bool) { m.Lock() defer m.Unlock() - if _, exist = m.progesses[progress]; exist { - delete(m.progesses, progress) + if _, exist = m.progresses[progress]; exist { + delete(m.progresses, progress) return } return @@ -143,7 +143,7 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string { defer m.RUnlock() processes := []string{} - for p := range m.progesses { + for p := range m.progresses { if filter(p) { processes = append(processes, p) } @@ -156,7 +156,7 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl m.RLock() defer m.RUnlock() - if p, exist := m.progesses[progress]; exist { + if p, exist := m.progresses[progress]; exist { process = 1 - p.remaining/p.total if process < 0 { process = 0 diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index e6799fb0ff89..8f8b0ebcb565 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -49,7 +49,7 @@ func TestProgress(t *testing.T) { for i := 0; i < 100; i++ { m.UpdateProgress(n, 30, 30, false) } - re.Equal(61, m.progesses[n].history.Len()) + re.Equal(61, m.progresses[n].history.Len()) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index 5cf9f1a14504..038807d7d94e 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -260,7 +260,7 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) - // once zone2 down, swith to async state. + // once zone2 down, switch to async state. setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) diff --git a/pkg/response/store.go b/pkg/response/store.go index 6aebd65b8cfe..1efe11bfb39c 100644 --- a/pkg/response/store.go +++ b/pkg/response/store.go @@ -34,18 +34,18 @@ type MetaStore struct { type SlowTrend struct { // CauseValue is the slow trend detecting raw input, it changes by the performance and pressure along time of the store. // The value itself is not important, what matter is: - // - The comparition result from store to store. + // - The comparison result from store to store. // - The change magnitude along time (represented by CauseRate). - // Currently it's one of store's internal latency (duration of waiting in the task queue of raftstore.store). + // Currently, it's one of store's internal latency (duration of waiting in the task queue of raftstore.store). CauseValue float64 `json:"cause_value"` - // CauseRate is for mesuring the change magnitude of CauseValue of the store, + // CauseRate is for measuring the change magnitude of CauseValue of the store, // - CauseRate > 0 means the store is become slower currently // - CauseRate < 0 means the store is become faster currently // - CauseRate == 0 means the store's performance and pressure does not have significant changes CauseRate float64 `json:"cause_rate"` // ResultValue is the current gRPC QPS of the store. ResultValue float64 `json:"result_value"` - // ResultRate is for mesuring the change magnitude of ResultValue of the store. + // ResultRate is for measuring the change magnitude of ResultValue of the store. ResultRate float64 `json:"result_rate"` } diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index 2668ac8cc431..ccd6abdc098d 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -1018,20 +1018,20 @@ func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRule op = suite.rc.Check(suite.cluster.GetRegion(1)) re.NotNil(op) re.Contains(op.Desc(), "orphan") - var removedPeerStroeID uint64 + var removedPeerStoreID uint64 newLeaderStoreID := r1.GetLeader().GetStoreId() for i := 0; i < op.Len(); i++ { if s, ok := op.Step(i).(operator.RemovePeer); ok { - removedPeerStroeID = s.FromStore + removedPeerStoreID = s.FromStore } if s, ok := op.Step(i).(operator.TransferLeader); ok { newLeaderStoreID = s.ToStore } } - re.NotZero(removedPeerStroeID) + re.NotZero(removedPeerStoreID) r1 = r1.Clone( core.WithLeader(r1.GetStorePeer(newLeaderStoreID)), - core.WithRemoveStorePeer(removedPeerStroeID)) + core.WithRemoveStorePeer(removedPeerStoreID)) suite.cluster.PutRegion(r1) r1 = suite.cluster.GetRegion(1) re.Len(r1.GetPeers(), 6-j) @@ -1571,7 +1571,7 @@ func (suite *ruleCheckerTestSuite) TestFixOfflinePeer() { re.Nil(suite.rc.Check(region)) } -func (suite *ruleCheckerTestSuite) TestFixOfflinePeerWithAvaliableWitness() { +func (suite *ruleCheckerTestSuite) TestFixOfflinePeerWithAvailableWitness() { re := suite.Require() suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) diff --git a/pkg/schedule/filter/filters_test.go b/pkg/schedule/filter/filters_test.go index f030dff81a41..f061a472d658 100644 --- a/pkg/schedule/filter/filters_test.go +++ b/pkg/schedule/filter/filters_test.go @@ -313,7 +313,7 @@ func TestStoreStateFilterReason(t *testing.T) { } } - // No reason catched + // No reason caught store = store.Clone(core.SetLastHeartbeatTS(time.Now())) testCases := []testCase{ {2, "store-state-ok-filter", "store-state-ok-filter"}, diff --git a/pkg/schedule/labeler/labeler_test.go b/pkg/schedule/labeler/labeler_test.go index 87773ce892d1..910a5558eb3a 100644 --- a/pkg/schedule/labeler/labeler_test.go +++ b/pkg/schedule/labeler/labeler_test.go @@ -369,7 +369,7 @@ func TestLabelerRuleTTL(t *testing.T) { start, _ := hex.DecodeString("1234") end, _ := hex.DecodeString("5678") region := core.NewTestRegionInfo(1, 1, start, end) - // the region has no lable rule at the beginning. + // the region has no label rule at the beginning. re.Empty(labeler.GetRegionLabels(region)) // set rules for the region. @@ -386,11 +386,11 @@ func TestLabelerRuleTTL(t *testing.T) { re.Len(labels, 2) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/labeler/regionLabelExpireSub1Minute")) - // rule2 should be exist since `GetRegionLabels` won't clear it physically. - checkRuleInMemoryAndStoage(re, labeler, "rule2", true) + // rule2 should be existed since `GetRegionLabels` won't clear it physically. + checkRuleInMemoryAndStorage(re, labeler, "rule2", true) re.Nil(labeler.GetLabelRule("rule2")) // rule2 should be physically clear. - checkRuleInMemoryAndStoage(re, labeler, "rule2", false) + checkRuleInMemoryAndStorage(re, labeler, "rule2", false) re.Equal("", labeler.GetRegionLabel(region, "k2")) @@ -398,7 +398,7 @@ func TestLabelerRuleTTL(t *testing.T) { re.NotNil(labeler.GetLabelRule("rule1")) } -func checkRuleInMemoryAndStoage(re *require.Assertions, labeler *RegionLabeler, ruleID string, exist bool) { +func checkRuleInMemoryAndStorage(re *require.Assertions, labeler *RegionLabeler, ruleID string, exist bool) { re.Equal(exist, labeler.labelRules[ruleID] != nil) existInStorage := false labeler.storage.LoadRegionRules(func(k, v string) { @@ -419,10 +419,10 @@ func TestGC(t *testing.T) { start, _ := hex.DecodeString("1234") end, _ := hex.DecodeString("5678") region := core.NewTestRegionInfo(1, 1, start, end) - // the region has no lable rule at the beginning. + // the region has no label rule at the beginning. re.Empty(labeler.GetRegionLabels(region)) - labels := []RegionLabel{} + labels := make([]RegionLabel, 0, len(ttls)) for id, ttl := range ttls { labels = append(labels, RegionLabel{Key: fmt.Sprintf("k%d", id), Value: fmt.Sprintf("v%d", id), TTL: ttl}) rule := &LabelRule{ @@ -436,7 +436,7 @@ func TestGC(t *testing.T) { re.Len(labeler.labelRules, len(ttls)) - // check all rules unitl some rule expired. + // check all rules until some rule expired. for { time.Sleep(time.Millisecond * 5) labels := labeler.GetRegionLabels(region) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index baef0c6d5643..07cafb9c566e 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -486,6 +486,7 @@ func (oc *Controller) addOperatorLocked(op *Operator) bool { return false } oc.operators[regionID] = op + oc.counts[op.SchedulerKind()]++ operatorCounter.WithLabelValues(op.Desc(), "start").Inc() operatorSizeHist.WithLabelValues(op.Desc()).Observe(float64(op.ApproximateSize)) opInfluence := NewTotalOpInfluence([]*Operator{op}, oc.cluster) @@ -505,7 +506,6 @@ func (oc *Controller) addOperatorLocked(op *Operator) bool { storeLimitCostCounter.WithLabelValues(strconv.FormatUint(storeID, 10), n).Add(float64(stepCost) / float64(storelimit.RegionInfluence[v])) } } - oc.updateCounts(oc.operators) var step OpStep if region := oc.cluster.GetRegion(op.RegionID()); region != nil { @@ -560,6 +560,7 @@ func (oc *Controller) removeOperatorsLocked() []*Operator { var removed []*Operator for regionID, op := range oc.operators { delete(oc.operators, regionID) + oc.counts[op.SchedulerKind()]-- operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() oc.ack(op) if op.Kind()&OpMerge != 0 { @@ -567,7 +568,6 @@ func (oc *Controller) removeOperatorsLocked() []*Operator { } removed = append(removed, op) } - oc.updateCounts(oc.operators) return removed } @@ -602,7 +602,7 @@ func (oc *Controller) removeOperatorLocked(op *Operator) bool { regionID := op.RegionID() if cur := oc.operators[regionID]; cur == op { delete(oc.operators, regionID) - oc.updateCounts(oc.operators) + oc.counts[op.SchedulerKind()]-- operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() oc.ack(op) if op.Kind()&OpMerge != 0 { @@ -783,16 +783,6 @@ func (oc *Controller) GetHistory(start time.Time) []OpHistory { return history } -// updateCounts updates resource counts using current pending operators. -func (oc *Controller) updateCounts(operators map[uint64]*Operator) { - for k := range oc.counts { - delete(oc.counts, k) - } - for _, op := range operators { - oc.counts[op.SchedulerKind()]++ - } -} - // OperatorCount gets the count of operators filtered by kind. // kind only has one OpKind. func (oc *Controller) OperatorCount(kind OpKind) uint64 { @@ -862,7 +852,7 @@ func (oc *Controller) SetOperator(op *Operator) { oc.Lock() defer oc.Unlock() oc.operators[op.RegionID()] = op - oc.updateCounts(oc.operators) + oc.counts[op.SchedulerKind()]++ } // OpWithStatus records the operator and its status. diff --git a/pkg/schedule/placement/fit.go b/pkg/schedule/placement/fit.go index d907bcd011a4..30530462664b 100644 --- a/pkg/schedule/placement/fit.go +++ b/pkg/schedule/placement/fit.go @@ -314,8 +314,8 @@ func pickPeersFromBinaryInt(candidates []*fitPeer, binaryNumber uint) []*fitPeer return selected } -func unSelectPeers(seleted []*fitPeer) { - for _, p := range seleted { +func unSelectPeers(selected []*fitPeer) { + for _, p := range selected { p.selected = false } } diff --git a/pkg/schedule/placement/fit_region_test.go b/pkg/schedule/placement/fit_region_test.go index 5bc62d9cc121..2006801e71a5 100644 --- a/pkg/schedule/placement/fit_region_test.go +++ b/pkg/schedule/placement/fit_region_test.go @@ -299,26 +299,26 @@ func BenchmarkFitRegionWithMoreRulesAndStoreLabels(b *testing.B) { values := []string{} for id := 1; id < 100; id++ { values = append(values, fmt.Sprintf("value_%08d", id)) - labelContaint := LabelConstraint{ + labelConstraint := LabelConstraint{ Key: fmt.Sprintf("key_%08d", id), Op: NotIn, Values: values, } - rule.LabelConstraints = append(rule.LabelConstraints, labelContaint) + rule.LabelConstraints = append(rule.LabelConstraints, labelConstraint) } - // add an exclusive containt. + // add an exclusive constraint. values = append(values, "exclusive") - labelContaint := LabelConstraint{ + labelConstraint := LabelConstraint{ Key: "exclusive", Op: In, Values: values, } - rule.LabelConstraints = append(rule.LabelConstraints, labelContaint) + rule.LabelConstraints = append(rule.LabelConstraints, labelConstraint) rules = append(rules, rule) } - // create stores, with each stores has 101 normal labels(1 exclusive label). + // create stores, with each store has 101 normal labels(1 exclusive label). lists := make([]*core.StoreInfo, 0) - labels := []*metapb.StoreLabel{} + labels := make([]*metapb.StoreLabel, 0, 101) for labID := 0; labID < 100; labID++ { label := &metapb.StoreLabel{Key: fmt.Sprintf("store_%08d", labID), Value: fmt.Sprintf("value_%08d", labID)} labels = append(labels, label) @@ -349,7 +349,7 @@ func BenchmarkFitRegionWithMoreRulesAndStoreLabels(b *testing.B) { func BenchmarkFitRegionWithLocationLabels(b *testing.B) { region := mockRegion(5, 5) - rules := []*Rule{} + var rules []*Rule rule := &Rule{ GroupID: DefaultGroupID, ID: "followers", diff --git a/pkg/schedule/placement/fit_test.go b/pkg/schedule/placement/fit_test.go index f3c1fe530587..aa5c66059f7d 100644 --- a/pkg/schedule/placement/fit_test.go +++ b/pkg/schedule/placement/fit_test.go @@ -151,7 +151,7 @@ func TestReplace(t *testing.T) { } for _, tc := range testCases { region := makeRegion(tc.region) - var rules []*Rule + rules := make([]*Rule, 0, len(tc.rules)) for _, r := range tc.rules { rules = append(rules, makeRule(r)) } @@ -196,7 +196,7 @@ func TestFitRegion(t *testing.T) { for _, testCase := range testCases { region := makeRegion(testCase.region) - var rules []*Rule + rules := make([]*Rule, 0, len(testCase.rules)) for _, r := range testCase.rules { rules = append(rules, makeRule(r)) } diff --git a/pkg/schedule/plan/status.go b/pkg/schedule/plan/status.go index 4242b6314939..636b9ceaaca3 100644 --- a/pkg/schedule/plan/status.go +++ b/pkg/schedule/plan/status.go @@ -49,7 +49,7 @@ const ( const ( // StatusStoreRejectLeader represents the store is restricted by the special configuration. e.g. reject label setting, evict leader/slow store scheduler. StatusStoreRejectLeader = iota + 300 - // StatusNotMatchIsolation represents the isolation cannot satisfy the requirement. + // StatusStoreNotMatchIsolation represents the isolation cannot satisfy the requirement. StatusStoreNotMatchIsolation ) @@ -189,7 +189,7 @@ func (s *Status) String() string { return StatusText(s.StatusCode) } -// IsNormal returns true if the status is noraml. +// IsNormal returns true if the status is normal. func (s *Status) IsNormal() bool { return int(s.StatusCode)/10 == 10 } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 3db8b44c925a..5cd59583767b 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -40,7 +40,7 @@ const ( EvictLeaderName = "evict-leader-scheduler" // EvictLeaderType is evict leader scheduler type. EvictLeaderType = "evict-leader" - // EvictLeaderBatchSize is the number of operators to to transfer + // EvictLeaderBatchSize is the number of operators to transfer // leaders by one scheduling EvictLeaderBatchSize = 3 lastStoreDeleteInfo = "The last store has been deleted" diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index b75ff27bd9a2..d919c1c0f0ab 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -597,7 +597,7 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor } slowTrend := store.GetSlowTrend() // Use `SlowTrend.ResultValue` at first, but not good, `CauseValue` is better - // Greater `CuaseValue` means slower + // Greater `CauseValue` means slower if slowTrend != nil && (targetSlowTrend.CauseValue-slowTrend.CauseValue) > alterEpsilon && slowTrend.CauseValue > alterEpsilon { slowerThanStoresNum += 1 } diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index ede4bbec034b..4b627b9a6105 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -203,6 +203,7 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { statisticsInterval = 0 checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */) checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) + checkHotWriteRegionPlacement(re, true) } func TestSplitIfRegionTooHot(t *testing.T) { @@ -393,6 +394,60 @@ func TestSplitBucketsByLoad(t *testing.T) { } } +func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) { + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + tc.SetEnableUseJointConsensus(true) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2)) + tc.SetEnablePlacementRules(enablePlacementRules) + labels := []string{"zone", "host"} + tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) + hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + tc.SetHotRegionCacheHitsThreshold(0) + + tc.AddLabelsStore(1, 2, map[string]string{"zone": "z1", "host": "h1"}) + tc.AddLabelsStore(2, 2, map[string]string{"zone": "z1", "host": "h2"}) + tc.AddLabelsStore(3, 2, map[string]string{"zone": "z2", "host": "h3"}) + tc.AddLabelsStore(4, 2, map[string]string{"zone": "z2", "host": "h4"}) + tc.AddLabelsStore(5, 2, map[string]string{"zone": "z2", "host": "h5"}) + tc.AddLabelsStore(6, 2, map[string]string{"zone": "z2", "host": "h6"}) + tc.RuleManager.SetRule(&placement.Rule{ + GroupID: "pd", ID: "leader", Role: placement.Leader, Count: 1, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z1"}}}, + }) + tc.RuleManager.SetRule(&placement.Rule{ + GroupID: "pd", ID: "voter", Role: placement.Follower, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}}, + }) + tc.RuleManager.DeleteRule("pd", "default") + + tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 0) + tc.UpdateStorageWrittenBytes(3, 6*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 3*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(5, 3*units.MiB*utils.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(6, 6*units.MiB*utils.StoreHeartBeatReportInterval) + + // Region 1, 2 and 3 are hot regions. + addRegionInfo(tc, utils.Write, []testRegionInfo{ + {1, []uint64{1, 3, 5}, 512 * units.KiB, 0, 0}, + {2, []uint64{1, 4, 6}, 512 * units.KiB, 0, 0}, + {3, []uint64{1, 3, 6}, 512 * units.KiB, 0, 0}, + }) + ops, _ := hb.Schedule(tc, false) + re.NotEmpty(ops) + re.NotContains(ops[0].Step(1).String(), "transfer leader") + clearPendingInfluence(hb.(*hotScheduler)) + + tc.RuleManager.SetRule(&placement.Rule{ + GroupID: "pd", ID: "voter", Role: placement.Voter, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}}, + }) + tc.RuleManager.DeleteRule("pd", "follower") + ops, _ = hb.Schedule(tc, false) + re.NotEmpty(ops) + // TODO: fix the test + // re.NotContains(ops[0].Step(1).String(), "transfer leader") +} + func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) { cancel, opt, tc, oc := prepareSchedulersTest() defer cancel() diff --git a/pkg/schedule/schedulers/hot_region_v2.go b/pkg/schedule/schedulers/hot_region_v2.go index 04ba0fc978f9..40cb35cd16b3 100644 --- a/pkg/schedule/schedulers/hot_region_v2.go +++ b/pkg/schedule/schedulers/hot_region_v2.go @@ -138,7 +138,7 @@ func (bs *balanceSolver) filterUniformStoreV2() (string, bool) { if !bs.enableExpectation() { return "", false } - // Because region is available for src and dst, so stddev is the same for both, only need to calcurate one. + // Because region is available for src and dst, so stddev is the same for both, only need to calculate one. isUniformFirstPriority, isUniformSecondPriority := bs.isUniformFirstPriority(bs.cur.srcStore), bs.isUniformSecondPriority(bs.cur.srcStore) if isUniformFirstPriority && isUniformSecondPriority { // If both dims are enough uniform, any schedule is unnecessary. diff --git a/pkg/statistics/store_test.go b/pkg/statistics/store_test.go index a0e7140a8823..ccf85caaa721 100644 --- a/pkg/statistics/store_test.go +++ b/pkg/statistics/store_test.go @@ -24,7 +24,7 @@ import ( "github.com/tikv/pd/pkg/core" ) -func TestFilterUnhealtyStore(t *testing.T) { +func TestFilterUnhealthyStore(t *testing.T) { re := require.New(t) stats := NewStoresStats() cluster := core.NewBasicCluster() diff --git a/pkg/storage/hot_region_storage_test.go b/pkg/storage/hot_region_storage_test.go index 629c638c1ff1..1486fb8271d2 100644 --- a/pkg/storage/hot_region_storage_test.go +++ b/pkg/storage/hot_region_storage_test.go @@ -172,14 +172,14 @@ func TestHotRegionWrite(t *testing.T) { func TestHotRegionDelete(t *testing.T) { re := require.New(t) defaultRemainDay := 7 - defaultDelteData := 30 + defaultDeleteData := 30 deleteDate := time.Now().AddDate(0, 0, 0) packHotRegionInfo := &MockPackHotRegionInfo{} store, clean, err := newTestHotRegionStorage(10*time.Minute, uint64(defaultRemainDay), packHotRegionInfo) re.NoError(err) defer clean() historyHotRegions := make([]HistoryHotRegion, 0) - for i := 0; i < defaultDelteData; i++ { + for i := 0; i < defaultDeleteData; i++ { historyHotRegion := HistoryHotRegion{ UpdateTime: deleteDate.UnixNano() / int64(time.Millisecond), RegionID: 1, diff --git a/pkg/storage/leveldb_backend.go b/pkg/storage/leveldb_backend.go index d25044e9c205..8fb1db196c12 100644 --- a/pkg/storage/leveldb_backend.go +++ b/pkg/storage/leveldb_backend.go @@ -18,9 +18,7 @@ import ( "context" "time" - "github.com/gogo/protobuf/proto" "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/syndtr/goleveldb/leveldb" "github.com/tikv/pd/pkg/encryption" @@ -32,25 +30,27 @@ import ( ) const ( - // DefaultFlushRegionRate is the ttl to sync the regions to region storage. - defaultFlushRegionRate = 3 * time.Second - // DefaultBatchSize is the batch size to save the regions to region storage. + // defaultFlushRate is the default interval to flush the data into the local storage. + defaultFlushRate = 3 * time.Second + // defaultBatchSize is the default batch size to save the data to the local storage. defaultBatchSize = 100 + // defaultDirtyFlushTick + defaultDirtyFlushTick = time.Second ) // levelDBBackend is a storage backend that stores data in LevelDB, -// which is mainly used by the PD region storage. +// which is mainly used to store the PD Region meta information. type levelDBBackend struct { *endpoint.StorageEndpoint - ekm *encryption.Manager - mu syncutil.RWMutex - batchRegions map[string]*metapb.Region - batchSize int - cacheSize int - flushRate time.Duration - flushTime time.Time - regionStorageCtx context.Context - regionStorageCancel context.CancelFunc + ekm *encryption.Manager + mu syncutil.RWMutex + batch map[string][]byte + batchSize int + cacheSize int + flushRate time.Duration + flushTime time.Time + ctx context.Context + cancel context.CancelFunc } // newLevelDBBackend is used to create a new LevelDB backend. @@ -63,23 +63,19 @@ func newLevelDBBackend( if err != nil { return nil, err } - regionStorageCtx, regionStorageCancel := context.WithCancel(ctx) lb := &levelDBBackend{ - StorageEndpoint: endpoint.NewStorageEndpoint(levelDB, ekm), - ekm: ekm, - batchSize: defaultBatchSize, - flushRate: defaultFlushRegionRate, - batchRegions: make(map[string]*metapb.Region, defaultBatchSize), - flushTime: time.Now().Add(defaultFlushRegionRate), - regionStorageCtx: regionStorageCtx, - regionStorageCancel: regionStorageCancel, + StorageEndpoint: endpoint.NewStorageEndpoint(levelDB, ekm), + ekm: ekm, + batchSize: defaultBatchSize, + flushRate: defaultFlushRate, + batch: make(map[string][]byte, defaultBatchSize), + flushTime: time.Now().Add(defaultFlushRate), } + lb.ctx, lb.cancel = context.WithCancel(ctx) go lb.backgroundFlush() return lb, nil } -var dirtyFlushTick = time.Second - func (lb *levelDBBackend) backgroundFlush() { defer logutil.LogPanic() @@ -87,14 +83,14 @@ func (lb *levelDBBackend) backgroundFlush() { isFlush bool err error ) - ticker := time.NewTicker(dirtyFlushTick) + ticker := time.NewTicker(defaultDirtyFlushTick) defer ticker.Stop() for { select { case <-ticker.C: lb.mu.RLock() isFlush = lb.flushTime.Before(time.Now()) - failpoint.Inject("regionStorageFastFlush", func() { + failpoint.Inject("levelDBStorageFastFlush", func() { isFlush = true }) lb.mu.RUnlock() @@ -102,42 +98,32 @@ func (lb *levelDBBackend) backgroundFlush() { continue } if err = lb.Flush(); err != nil { - log.Error("flush regions meet error", errs.ZapError(err)) + log.Error("flush data meet error", errs.ZapError(err)) } - case <-lb.regionStorageCtx.Done(): + case <-lb.ctx.Done(): return } } } -func (lb *levelDBBackend) SaveRegion(region *metapb.Region) error { - region, err := encryption.EncryptRegion(region, lb.ekm) - if err != nil { - return err - } +// SaveIntoBatch saves the key-value pair into the batch cache, and it will +// only be saved to the underlying storage when the `Flush` method is +// called or the cache is full. +func (lb *levelDBBackend) SaveIntoBatch(key string, value []byte) error { lb.mu.Lock() defer lb.mu.Unlock() if lb.cacheSize < lb.batchSize-1 { - lb.batchRegions[endpoint.RegionPath(region.GetId())] = region + lb.batch[key] = value lb.cacheSize++ lb.flushTime = time.Now().Add(lb.flushRate) return nil } - lb.batchRegions[endpoint.RegionPath(region.GetId())] = region - err = lb.flushLocked() - - if err != nil { - return err - } - return nil -} - -func (lb *levelDBBackend) DeleteRegion(region *metapb.Region) error { - return lb.Remove(endpoint.RegionPath(region.GetId())) + lb.batch[key] = value + return lb.flushLocked() } -// Flush saves the cache region to the underlying storage. +// Flush saves the batch cache to the underlying storage. func (lb *levelDBBackend) Flush() error { lb.mu.Lock() defer lb.mu.Unlock() @@ -145,38 +131,32 @@ func (lb *levelDBBackend) Flush() error { } func (lb *levelDBBackend) flushLocked() error { - if err := lb.saveRegions(lb.batchRegions); err != nil { + if err := lb.saveBatchLocked(); err != nil { return err } lb.cacheSize = 0 - lb.batchRegions = make(map[string]*metapb.Region, lb.batchSize) + lb.batch = make(map[string][]byte, lb.batchSize) return nil } -func (lb *levelDBBackend) saveRegions(regions map[string]*metapb.Region) error { +func (lb *levelDBBackend) saveBatchLocked() error { batch := new(leveldb.Batch) - - for key, r := range regions { - value, err := proto.Marshal(r) - if err != nil { - return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause() - } + for key, value := range lb.batch { batch.Put([]byte(key), value) } - if err := lb.Base.(*kv.LevelDBKV).Write(batch, nil); err != nil { return errs.ErrLevelDBWrite.Wrap(err).GenWithStackByCause() } return nil } -// Close closes the LevelDB kv. It will call Flush() once before closing. +// Close will gracefully close the LevelDB backend and flush the data to the underlying storage before closing. func (lb *levelDBBackend) Close() error { err := lb.Flush() if err != nil { - log.Error("meet error before close the region storage", errs.ZapError(err)) + log.Error("meet error before closing the leveldb storage", errs.ZapError(err)) } - lb.regionStorageCancel() + lb.cancel() err = lb.Base.(*kv.LevelDBKV).Close() if err != nil { return errs.ErrLevelDBClose.Wrap(err).GenWithStackByArgs() diff --git a/pkg/storage/leveldb_backend_test.go b/pkg/storage/leveldb_backend_test.go new file mode 100644 index 000000000000..f727dd69ee36 --- /dev/null +++ b/pkg/storage/leveldb_backend_test.go @@ -0,0 +1,121 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLevelDBBackend(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + backend, err := newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + re.NotNil(backend) + key, value := "k1", "v1" + // Save without flush. + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err := backend.Load(key) + re.NoError(err) + re.Empty(val) + // Flush and load. + err = backend.Flush() + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + // Delete and load. + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Save twice without flush. + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + value = "v2" + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Delete before flush. + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Flush and load. + err = backend.Flush() + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + // Delete and load. + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + // Test the background flush. + backend.flushRate = defaultDirtyFlushTick + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + time.Sleep(defaultDirtyFlushTick * 2) + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + err = backend.Remove(key) + re.NoError(err) + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + backend.flushRate = defaultFlushRate + // Test the flush when the cache is full. + backend.flushRate = time.Minute + for i := 0; i < backend.batchSize; i++ { + key, value = fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i) + err = backend.SaveIntoBatch(key, []byte(value)) + re.NoError(err) + if i < backend.batchSize-1 { + // The cache is not full yet. + val, err = backend.Load(key) + re.NoError(err) + re.Empty(val) + } else { + // The cache is full, and the flush is triggered. + val, err = backend.Load(key) + re.NoError(err) + re.Equal(value, val) + } + } + backend.flushRate = defaultFlushRate + // Close the backend. + err = backend.Close() + re.NoError(err) +} diff --git a/pkg/storage/region_storage.go b/pkg/storage/region_storage.go new file mode 100644 index 000000000000..11bc6a7cc214 --- /dev/null +++ b/pkg/storage/region_storage.go @@ -0,0 +1,79 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/encryption" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" +) + +// RegionStorage is a storage for the PD region meta information based on LevelDB, +// which will override the default implementation of the `endpoint.RegionStorage`. +type RegionStorage struct { + kv.Base + backend *levelDBBackend +} + +var _ endpoint.RegionStorage = (*RegionStorage)(nil) + +func newRegionStorage(backend *levelDBBackend) *RegionStorage { + return &RegionStorage{Base: backend.Base, backend: backend} +} + +// LoadRegion implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) { + return s.backend.LoadRegion(regionID, region) +} + +// LoadRegions implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { + return s.backend.LoadRegions(ctx, f) +} + +// SaveRegion implements the `endpoint.RegionStorage` interface. +// Instead of saving the region directly, it will encrypt the region and then save it in batch. +func (s *RegionStorage) SaveRegion(region *metapb.Region) error { + encryptedRegion, err := encryption.EncryptRegion(region, s.backend.ekm) + if err != nil { + return err + } + value, err := proto.Marshal(encryptedRegion) + if err != nil { + return errs.ErrProtoMarshal.Wrap(err).GenWithStackByCause() + } + return s.backend.SaveIntoBatch(endpoint.RegionPath(region.GetId()), value) +} + +// DeleteRegion implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) DeleteRegion(region *metapb.Region) error { + return s.backend.Remove((endpoint.RegionPath(region.GetId()))) +} + +// Flush implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) Flush() error { + return s.backend.Flush() +} + +// Close implements the `endpoint.RegionStorage` interface. +func (s *RegionStorage) Close() error { + return s.backend.Close() +} diff --git a/pkg/storage/region_storage_test.go b/pkg/storage/region_storage_test.go new file mode 100644 index 000000000000..f6670f8c82ec --- /dev/null +++ b/pkg/storage/region_storage_test.go @@ -0,0 +1,95 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/storage/endpoint" +) + +func TestRegionStorage(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var ( + regionStorage endpoint.RegionStorage + err error + ) + regionStorage, err = NewRegionStorageWithLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + re.NotNil(regionStorage) + // Load regions from the storage. + regions := make([]*core.RegionInfo, 0) + appendRegionFunc := func(region *core.RegionInfo) []*core.RegionInfo { + regions = append(regions, region) + return nil + } + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Empty(regions) + // Save regions to the storage. + region1 := newTestRegionMeta(1) + err = regionStorage.SaveRegion(region1) + re.NoError(err) + region2 := newTestRegionMeta(2) + err = regionStorage.SaveRegion(region2) + re.NoError(err) + regions = make([]*core.RegionInfo, 0) + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Empty(regions) + // Flush and load. + err = regionStorage.Flush() + re.NoError(err) + regions = make([]*core.RegionInfo, 0) + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Len(regions, 2) + re.Equal(region1, regions[0].GetMeta()) + re.Equal(region2, regions[1].GetMeta()) + newRegion := &metapb.Region{} + ok, err := regionStorage.LoadRegion(3, newRegion) + re.NoError(err) + re.False(ok) + ok, err = regionStorage.LoadRegion(1, newRegion) + re.NoError(err) + re.True(ok) + re.Equal(region1, newRegion) + ok, err = regionStorage.LoadRegion(2, newRegion) + re.NoError(err) + re.True(ok) + re.Equal(region2, newRegion) + // Delete and load. + err = regionStorage.DeleteRegion(region1) + re.NoError(err) + regions = make([]*core.RegionInfo, 0) + err = regionStorage.LoadRegions(ctx, appendRegionFunc) + re.NoError(err) + re.Len(regions, 1) + re.Equal(region2, regions[0].GetMeta()) + ok, err = regionStorage.LoadRegion(2, newRegion) + re.NoError(err) + re.True(ok) + re.Equal(region2, newRegion) + re.Equal(regions[0].GetMeta(), newRegion) + // Close the storage. + err = regionStorage.Close() + re.NoError(err) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index aba01dfa8063..5e006133d22b 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -57,13 +57,18 @@ func NewStorageWithEtcdBackend(client *clientv3.Client, rootPath string) Storage return newEtcdBackend(client, rootPath) } -// NewStorageWithLevelDBBackend creates a new storage with LevelDB backend. -func NewStorageWithLevelDBBackend( +// NewRegionStorageWithLevelDBBackend will create a specialized storage to +// store region meta information based on a LevelDB backend. +func NewRegionStorageWithLevelDBBackend( ctx context.Context, filePath string, ekm *encryption.Manager, -) (Storage, error) { - return newLevelDBBackend(ctx, filePath, ekm) +) (*RegionStorage, error) { + levelDBBackend, err := newLevelDBBackend(ctx, filePath, ekm) + if err != nil { + return nil, err + } + return newRegionStorage(levelDBBackend), nil } // TODO: support other KV storage backends like BadgerDB in the future. @@ -88,15 +93,14 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage } } -// TryGetLocalRegionStorage gets the local region storage. Returns nil if not present. -func TryGetLocalRegionStorage(s Storage) endpoint.RegionStorage { +// RetrieveRegionStorage retrieve the region storage from the given storage. +// If it's a `coreStorage`, it will return the regionStorage inside, otherwise it will return the original storage. +func RetrieveRegionStorage(s Storage) endpoint.RegionStorage { switch ps := s.(type) { case *coreStorage: return ps.regionStorage - case *levelDBBackend, *memoryStorage: - return ps default: - return nil + return ps } } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index dbb5a03b264f..4525ec6091c0 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -209,6 +209,57 @@ func TestLoadMinServiceGCSafePoint(t *testing.T) { re.Equal(uint64(2), ssp.SafePoint) } +func TestTryGetLocalRegionStorage(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Memory backend integrated into core storage. + defaultStorage := NewStorageWithMemoryBackend() + var regionStorage endpoint.RegionStorage = NewStorageWithMemoryBackend() + coreStorage := NewCoreStorage(defaultStorage, regionStorage) + storage := RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + // RegionStorage with LevelDB backend integrated into core storage. + defaultStorage = NewStorageWithMemoryBackend() + regionStorage, err := NewRegionStorageWithLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + coreStorage = NewCoreStorage(defaultStorage, regionStorage) + storage = RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + // Raw LevelDB backend integrated into core storage. + defaultStorage = NewStorageWithMemoryBackend() + regionStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + coreStorage = NewCoreStorage(defaultStorage, regionStorage) + storage = RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + defaultStorage = NewStorageWithMemoryBackend() + regionStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + coreStorage = NewCoreStorage(defaultStorage, regionStorage) + storage = RetrieveRegionStorage(coreStorage) + re.NotNil(storage) + re.Equal(regionStorage, storage) + // Without core storage. + defaultStorage = NewStorageWithMemoryBackend() + storage = RetrieveRegionStorage(defaultStorage) + re.NotNil(storage) + re.Equal(defaultStorage, storage) + defaultStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + storage = RetrieveRegionStorage(defaultStorage) + re.NotNil(storage) + re.Equal(defaultStorage, storage) + defaultStorage, err = newLevelDBBackend(ctx, t.TempDir(), nil) + re.NoError(err) + storage = RetrieveRegionStorage(defaultStorage) + re.NotNil(storage) + re.Equal(defaultStorage, storage) +} + func TestLoadRegions(t *testing.T) { re := require.New(t) storage := NewStorageWithMemoryBackend() @@ -367,7 +418,7 @@ func randomMerge(regions []*metapb.Region, n int, ratio int) { } } -func saveRegions(lb *levelDBBackend, n int, ratio int) error { +func saveRegions(storage endpoint.RegionStorage, n int, ratio int) error { keys := generateKeys(n) regions := make([]*metapb.Region, 0, n) for i := uint64(0); i < uint64(n); i++ { @@ -398,36 +449,36 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error { } for _, region := range regions { - err := lb.SaveRegion(region) + err := storage.SaveRegion(region) if err != nil { return err } } - return lb.Flush() + return storage.Flush() } func benchmarkLoadRegions(b *testing.B, n int, ratio int) { re := require.New(b) ctx := context.Background() dir := b.TempDir() - lb, err := newLevelDBBackend(ctx, dir, nil) + regionStorage, err := NewRegionStorageWithLevelDBBackend(ctx, dir, nil) if err != nil { b.Fatal(err) } cluster := core.NewBasicCluster() - err = saveRegions(lb, n, ratio) + err = saveRegions(regionStorage, n, ratio) if err != nil { b.Fatal(err) } defer func() { - err = lb.Close() + err = regionStorage.Close() if err != nil { b.Fatal(err) } }() b.ResetTimer() - err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion) + err = regionStorage.LoadRegions(ctx, cluster.CheckAndPutRegion) re.NoError(err) } diff --git a/pkg/syncer/client_test.go b/pkg/syncer/client_test.go index ba389b5de6dc..6770fae44acf 100644 --- a/pkg/syncer/client_test.go +++ b/pkg/syncer/client_test.go @@ -34,7 +34,7 @@ import ( func TestLoadRegion(t *testing.T) { re := require.New(t) tempDir := t.TempDir() - rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) + rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) server := &mockServer{ @@ -62,7 +62,7 @@ func TestLoadRegion(t *testing.T) { func TestErrorCode(t *testing.T) { re := require.New(t) tempDir := t.TempDir() - rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) + rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) server := &mockServer{ ctx: context.Background(), diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 4fb38614de0c..ccc32b133039 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -88,19 +88,16 @@ type RegionSyncer struct { streamingRunning atomic.Bool } -// NewRegionSyncer returns a region syncer. -// The final consistency is ensured by the heartbeat. -// Strong consistency is not guaranteed. -// Usually open the region syncer in huge cluster and the server -// no longer etcd but go-leveldb. +// NewRegionSyncer returns a region syncer that ensures final consistency through the heartbeat, +// but it does not guarantee strong consistency. Using the same storage backend of the region storage. func NewRegionSyncer(s Server) *RegionSyncer { - localRegionStorage := storage.TryGetLocalRegionStorage(s.GetStorage()) - if localRegionStorage == nil { + regionStorage := storage.RetrieveRegionStorage(s.GetStorage()) + if regionStorage == nil { return nil } syncer := &RegionSyncer{ server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, localRegionStorage.(kv.Base)), + history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), tlsConfig: s.GetTLSConfig(), } diff --git a/pkg/unsaferecovery/unsafe_recovery_controller_test.go b/pkg/unsaferecovery/unsafe_recovery_controller_test.go index 956b9b8729c9..6f1fab621648 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller_test.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller_test.go @@ -1755,7 +1755,7 @@ func TestRunning(t *testing.T) { re.True(recoveryController.IsRunning()) } -func TestEpochComparsion(t *testing.T) { +func TestEpochComparison(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/utils/etcdutil/health_checker.go b/pkg/utils/etcdutil/health_checker.go index d1a57e97c5f3..51c1808de4a5 100644 --- a/pkg/utils/etcdutil/health_checker.go +++ b/pkg/utils/etcdutil/health_checker.go @@ -274,12 +274,14 @@ func (checker *healthChecker) updateEvictedEps(lastEps, pickedEps []string) { pickedSet[ep] = true } // Reset the count to 0 if it's in evictedEps but not in the pickedEps. - checker.evictedEps.Range(func(key, _ any) bool { + checker.evictedEps.Range(func(key, value any) bool { ep := key.(string) - if !pickedSet[ep] { + count := value.(int) + if count > 0 && !pickedSet[ep] { checker.evictedEps.Store(ep, 0) log.Info("reset evicted etcd endpoint picked count", zap.String("endpoint", ep), + zap.Int("previous-count", count), zap.String("source", checker.source)) } return true diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index e54ddfe17119..d108a6f5d444 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/pkg/transport" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" @@ -147,7 +148,18 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g if err != nil { return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() } - cc, err := grpc.DialContext(ctx, u.Host, append(do, opt)...) + // Here we use a shorter MaxDelay to make the connection recover faster. + // The default MaxDelay is 120s, which is too long for us. + backoffOpts := grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.6, + Jitter: 0.2, + MaxDelay: 3 * time.Second, + }, + }) + do = append(do, opt, backoffOpts) + cc, err := grpc.DialContext(ctx, u.Host, do...) if err != nil { return nil, errs.ErrGRPCDial.Wrap(err).GenWithStackByCause() } diff --git a/pkg/utils/requestutil/context.go b/pkg/utils/requestutil/context.go index 1fdbac08a975..c49b94399d0e 100644 --- a/pkg/utils/requestutil/context.go +++ b/pkg/utils/requestutil/context.go @@ -44,7 +44,7 @@ func WithEndTime(parent context.Context, endTime int64) context.Context { return context.WithValue(parent, endTimeKey, endTime) } -// EndTimeFrom returns the value of the excution info key on the ctx +// EndTimeFrom returns the value of the execution info key on the ctx func EndTimeFrom(ctx context.Context) (int64, bool) { info, ok := ctx.Value(endTimeKey).(int64) return info, ok diff --git a/pkg/utils/typeutil/clone_test.go b/pkg/utils/typeutil/clone_test.go index f7a7ef5d57af..58da51d0d0db 100644 --- a/pkg/utils/typeutil/clone_test.go +++ b/pkg/utils/typeutil/clone_test.go @@ -28,9 +28,9 @@ func TestDeepClone(t *testing.T) { re := assert.New(t) src := &fate{ID: 1} dst := DeepClone(src, fateFactory) - re.EqualValues(dst.ID, 1) + re.EqualValues(1, dst.ID) dst.ID = 2 - re.EqualValues(src.ID, 1) + re.EqualValues(1, src.ID) // case2: the source is nil var src2 *fate diff --git a/server/api/region.go b/server/api/region.go index 28249629db51..5e3e271c4ee0 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -488,7 +488,7 @@ func calHist(bound int, list *[]int64) *[]*histItem { } // @Tags region -// @Summary List all range holes whitout any region info. +// @Summary List all range holes without any region info. // @Produce json // @Success 200 {object} [][]string // @Router /regions/range-holes [get] diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 48310d26deab..51668ab9ea80 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -35,7 +35,7 @@ func RegisterMicroService(r *gin.RouterGroup) { // @Tags members // @Summary Get all members of the cluster for the specified service. // @Produce json -// @Success 200 {object} []string +// @Success 200 {object} []discovery.ServiceRegistryEntry // @Router /ms/members/{service} [get] func GetMembers(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) @@ -45,12 +45,12 @@ func GetMembers(c *gin.Context) { } if service := c.Param("service"); len(service) > 0 { - addrs, err := discovery.GetMSMembers(service, svr.GetClient()) + entries, err := discovery.GetMSMembers(service, svr.GetClient()) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return } - c.IndentedJSON(http.StatusOK, addrs) + c.IndentedJSON(http.StatusOK, entries) return } diff --git a/server/grpc_service.go b/server/grpc_service.go index 9d1636199ccc..b6cdce4c8b88 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -2566,7 +2566,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion // SplitAndScatterRegions split regions by the given split keys, and scatter regions. // Only regions which split successfully will be scattered. -// scatterFinishedPercentage indicates the percentage of successfully splited regions that are scattered. +// scatterFinishedPercentage indicates the percentage of successfully split regions that are scattered. func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) { if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { fName := currentFunction() diff --git a/server/server.go b/server/server.go index ab69c2a3ad74..4c1632f634af 100644 --- a/server/server.go +++ b/server/server.go @@ -449,11 +449,16 @@ func (s *Server) startServer(ctx context.Context) error { Label: idAllocLabel, Member: s.member.MemberValue(), }) - regionStorage, err := storage.NewStorageWithLevelDBBackend(ctx, filepath.Join(s.cfg.DataDir, "region-meta"), s.encryptionKeyManager) + // Initialize an etcd storage as the default storage. + defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath) + // Initialize a specialized LevelDB storage to store the region-related meta info independently. + regionStorage, err := storage.NewRegionStorageWithLevelDBBackend( + ctx, + filepath.Join(s.cfg.DataDir, "region-meta"), + s.encryptionKeyManager) if err != nil { return err } - defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath) s.storage = storage.NewCoreStorage(defaultStorage, regionStorage) s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} diff --git a/tests/cluster.go b/tests/cluster.go index 3b798f8738ac..198b49ce7284 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -852,8 +852,8 @@ func (c *TestCluster) CheckClusterDCLocation() { wg := sync.WaitGroup{} for _, server := range c.GetServers() { wg.Add(1) - go func(ser *TestServer) { - ser.GetTSOAllocatorManager().ClusterDCLocationChecker() + go func(s *TestServer) { + s.GetTSOAllocatorManager().ClusterDCLocationChecker() wg.Done() }(server) } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index cfb896878f25..b66b15d8243d 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -24,6 +24,7 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -39,6 +40,7 @@ import ( "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/tso" @@ -49,6 +51,7 @@ import ( "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/integrations/mcs" "go.etcd.io/etcd/clientv3" "go.uber.org/goleak" ) @@ -319,6 +322,30 @@ func TestTSOFollowerProxy(t *testing.T) { wg.Wait() } +func TestTSOFollowerProxyWithTSOService(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestAPICluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + err = cluster.RunInitialServers() + re.NoError(err) + leaderName := cluster.WaitLeader() + pdLeaderServer := cluster.GetServer(leaderName) + re.NoError(pdLeaderServer.BootstrapCluster()) + backendEndpoints := pdLeaderServer.GetAddr() + tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints) + re.NoError(err) + defer tsoCluster.Destroy() + cli := mcs.SetupClientWithKeyspaceID(ctx, re, utils.DefaultKeyspaceID, strings.Split(backendEndpoints, ",")) + re.NotNil(cli) + defer cli.Close() + // TSO service does not support the follower proxy, so enabling it should fail. + err = cli.UpdateOption(pd.EnableTSOFollowerProxy, true) + re.Error(err) +} + // TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207 func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { re := require.New(t) @@ -831,6 +858,51 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastCheckAvailable")) } +func (suite *followerForwardAndHandleTestSuite) TestGetTSFuture() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/shortDispatcherChannel", "return(true)")) + + cli := setupCli(re, ctx, suite.endpoints) + + ctxs := make([]context.Context, 20) + cancels := make([]context.CancelFunc, 20) + for i := 0; i < 20; i++ { + ctxs[i], cancels[i] = context.WithCancel(ctx) + } + start := time.Now() + wg1 := sync.WaitGroup{} + wg2 := sync.WaitGroup{} + wg3 := sync.WaitGroup{} + wg1.Add(1) + go func() { + <-time.After(time.Second) + for i := 0; i < 20; i++ { + cancels[i]() + } + wg1.Done() + }() + wg2.Add(1) + go func() { + cli.Close() + wg2.Done() + }() + wg3.Add(1) + go func() { + for i := 0; i < 20; i++ { + cli.GetTSAsync(ctxs[i]) + } + wg3.Done() + }() + wg1.Wait() + wg2.Wait() + wg3.Wait() + re.Less(time.Since(start), time.Second*2) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/shortDispatcherChannel")) +} + func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 { for i := 0; i < tsoRequestRound; i++ { physical, logical, err := cli.GetTS(context.TODO()) diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index 31d43cb86f63..f1da9295e8ff 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -17,7 +17,7 @@ require ( github.com/go-sql-driver/mysql v1.7.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20231226064240-4f28b82c7860 + github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 556932a24480..dbcffd14b42c 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -410,8 +410,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231226064240-4f28b82c7860 h1:yv9mYJJCKv2mKcW2nEYUgfRkfeyapRWB3GktKEE4sv8= -github.com/pingcap/kvproto v0.0.0-20231226064240-4f28b82c7860/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo= +github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 8d8243ed7d74..074988d9abad 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -679,7 +679,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) re.Equal(60.0, penalty.WriteBytes) - re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6) + re.InEpsilon(10.0/1000.0/1000.0, penalty.TotalCpuTimeMs, 1e-6) _, err = c.OnResponse(resourceGroupName, req2, resp2) re.NoError(err) @@ -1108,7 +1108,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupRUConsumption() { re.NoError(err) re.Equal(g.RUStats, testConsumption) - // update resoruce group, ru stats not change + // update resource group, ru stats not change g.RUSettings.RU.Settings.FillRate = 12345 _, err = cli.ModifyResourceGroup(suite.ctx, g) re.NoError(err) diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 0a026aff916a..327254184620 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -266,3 +266,22 @@ func (suite *tsoAPITestSuite) TestStatus() { re.Equal(versioninfo.PDGitHash, s.GitHash) re.Equal(versioninfo.PDReleaseVersion, s.Version) } + +func (suite *tsoAPITestSuite) TestConfig() { + re := suite.Require() + + primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re) + resp, err := http.Get(primary.GetConfig().GetAdvertiseListenAddr() + "/tso/api/v1/config") + re.NoError(err) + defer resp.Body.Close() + re.Equal(http.StatusOK, resp.StatusCode) + respBytes, err := io.ReadAll(resp.Body) + re.NoError(err) + var cfg tso.Config + re.NoError(json.Unmarshal(respBytes, &cfg)) + re.Equal(cfg.GetListenAddr(), primary.GetConfig().GetListenAddr()) + re.Equal(cfg.GetTSOSaveInterval(), primary.GetConfig().GetTSOSaveInterval()) + re.Equal(cfg.IsLocalTSOEnabled(), primary.GetConfig().IsLocalTSOEnabled()) + re.Equal(cfg.GetTSOUpdatePhysicalInterval(), primary.GetConfig().GetTSOUpdatePhysicalInterval()) + re.Equal(cfg.GetMaxResetTSGap(), primary.GetConfig().GetMaxResetTSGap()) +} diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index c5cc6ec5d6dc..3d7b099f342b 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -269,10 +269,6 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { // TestGetMinTS tests the correctness of GetMinTS. func (suite *tsoClientTestSuite) TestGetMinTS() { re := suite.Require() - if !suite.legacy { - suite.waitForAllKeyspaceGroupsInServing(re) - } - var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ { diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index a91bbaf6b403..1470173e0ed7 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -36,7 +36,7 @@ func TestMain(m *testing.M) { func TestRegionSyncer(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/regionStorageFastFlush", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/levelDBStorageFastFlush", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/noFastExitSync", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/disableClientStreaming", `return(true)`)) @@ -73,7 +73,7 @@ func TestRegionSyncer(t *testing.T) { } // merge case // region2 -> region1 -> region0 - // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conversion // region0 version is max(1, max(1, 1)+1)+1=3 regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[0]) @@ -81,7 +81,7 @@ func TestRegionSyncer(t *testing.T) { // merge case // region3 -> region4 - // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conversion // region4 version is max(1, 1)+1=2 regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[4]) @@ -89,7 +89,7 @@ func TestRegionSyncer(t *testing.T) { // merge case // region0 -> region4 - // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conversion // region4 version is max(3, 2)+1=4 regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[4]) @@ -156,7 +156,7 @@ func TestRegionSyncer(t *testing.T) { re.Equal(region.GetBuckets(), r.GetBuckets()) } re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/syncer/noFastExitSync")) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/regionStorageFastFlush")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/levelDBStorageFastFlush")) } func TestFullSyncWithAddMember(t *testing.T) { diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go index 41f544729c29..3bc4d56ac581 100644 --- a/tests/server/tso/allocator_test.go +++ b/tests/server/tso/allocator_test.go @@ -162,10 +162,10 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) { wg := sync.WaitGroup{} wg.Add(len(dcLocationConfig)) for serverName, dcLocation := range dcLocationConfig { - go func(serName, dc string) { + go func(name, dc string) { defer wg.Done() testutil.Eventually(re, func() bool { - return cluster.WaitAllocatorLeader(dc) == serName + return cluster.WaitAllocatorLeader(dc) == name }, testutil.WithWaitFor(90*time.Second), testutil.WithTickInterval(time.Second)) }(serverName, dcLocation) } @@ -188,8 +188,8 @@ func waitAllocatorPriorityCheck(cluster *tests.TestCluster) { wg := sync.WaitGroup{} for _, server := range cluster.GetServers() { wg.Add(1) - go func(ser *tests.TestServer) { - ser.GetTSOAllocatorManager().PriorityChecker() + go func(s *tests.TestServer) { + s.GetTSOAllocatorManager().PriorityChecker() wg.Done() }(server) } diff --git a/tools/go.mod b/tools/go.mod index 767ada3c8ccf..3e63141a89b0 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -24,7 +24,7 @@ require ( github.com/mattn/go-shellwords v1.0.12 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 + github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 diff --git a/tools/go.sum b/tools/go.sum index 54acc216ec07..8711bc3f8aa0 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -411,8 +411,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug= -github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo= +github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index 4a39e54d43c9..473a11d749ad 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -122,11 +122,11 @@ type ETCDCase interface { Unary(context.Context, *clientv3.Client) error } -// ETCDCraeteFn is function type to create ETCDCase. -type ETCDCraeteFn func() ETCDCase +// ETCDCreateFn is function type to create ETCDCase. +type ETCDCreateFn func() ETCDCase // ETCDCaseFnMap is the map for all ETCD case creation function. -var ETCDCaseFnMap = map[string]ETCDCraeteFn{ +var ETCDCaseFnMap = map[string]ETCDCreateFn{ "Get": newGetKV(), "Put": newPutKV(), "Delete": newDeleteKV(), @@ -139,11 +139,11 @@ type GRPCCase interface { Unary(context.Context, pd.Client) error } -// GRPCCraeteFn is function type to create GRPCCase. -type GRPCCraeteFn func() GRPCCase +// GRPCCreateFn is function type to create GRPCCase. +type GRPCCreateFn func() GRPCCase // GRPCCaseFnMap is the map for all gRPC case creation function. -var GRPCCaseFnMap = map[string]GRPCCraeteFn{ +var GRPCCaseFnMap = map[string]GRPCCreateFn{ "GetRegion": newGetRegion(), "GetRegionEnableFollower": newGetRegionEnableFollower(), "GetStore": newGetStore(), @@ -160,11 +160,11 @@ type HTTPCase interface { Do(context.Context, pdHttp.Client) error } -// HTTPCraeteFn is function type to create HTTPCase. -type HTTPCraeteFn func() HTTPCase +// HTTPCreateFn is function type to create HTTPCase. +type HTTPCreateFn func() HTTPCase // HTTPCaseFnMap is the map for all HTTP case creation function. -var HTTPCaseFnMap = map[string]HTTPCraeteFn{ +var HTTPCaseFnMap = map[string]HTTPCreateFn{ "GetRegionStatus": newRegionStats(), "GetMinResolvedTS": newMinResolvedTS(), } diff --git a/tools/pd-api-bench/cases/controller.go b/tools/pd-api-bench/cases/controller.go index db42c469843c..00d9261cba8d 100644 --- a/tools/pd-api-bench/cases/controller.go +++ b/tools/pd-api-bench/cases/controller.go @@ -196,7 +196,7 @@ type httpController struct { wg sync.WaitGroup } -func newHTTPController(ctx context.Context, clis []pdHttp.Client, fn HTTPCraeteFn) *httpController { +func newHTTPController(ctx context.Context, clis []pdHttp.Client, fn HTTPCreateFn) *httpController { c := &httpController{ pctx: ctx, clients: clis, @@ -220,21 +220,25 @@ func (c *httpController) run() { c.wg.Add(1) go func(hCli pdHttp.Client) { defer c.wg.Done() - var ticker = time.NewTicker(tt) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for i := int64(0); i < burst; i++ { - err := c.Do(c.ctx, hCli) - if err != nil { - log.Error("meet erorr when doing HTTP request", zap.String("case", c.Name()), zap.Error(err)) + c.wg.Add(int(burst)) + for i := int64(0); i < burst; i++ { + go func() { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := c.Do(c.ctx, hCli) + if err != nil { + log.Error("meet erorr when doing HTTP request", zap.String("case", c.Name()), zap.Error(err)) + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running HTTP case") + return } } - case <-c.ctx.Done(): - log.Info("Got signal to exit running HTTP case") - return - } + }() } }(hCli) } @@ -261,7 +265,7 @@ type gRPCController struct { wg sync.WaitGroup } -func newGRPCController(ctx context.Context, clis []pd.Client, fn GRPCCraeteFn) *gRPCController { +func newGRPCController(ctx context.Context, clis []pd.Client, fn GRPCCreateFn) *gRPCController { c := &gRPCController{ pctx: ctx, clients: clis, @@ -285,21 +289,25 @@ func (c *gRPCController) run() { c.wg.Add(1) go func(cli pd.Client) { defer c.wg.Done() - var ticker = time.NewTicker(tt) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for i := int64(0); i < burst; i++ { - err := c.Unary(c.ctx, cli) - if err != nil { - log.Error("meet erorr when doing gRPC request", zap.String("case", c.Name()), zap.Error(err)) + c.wg.Add(int(burst)) + for i := int64(0); i < burst; i++ { + go func() { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := c.Unary(c.ctx, cli) + if err != nil { + log.Error("meet erorr when doing gRPC request", zap.String("case", c.Name()), zap.Error(err)) + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running gRPC case") + return } } - case <-c.ctx.Done(): - log.Info("Got signal to exit running gRPC case") - return - } + }() } }(cli) } @@ -326,7 +334,7 @@ type etcdController struct { wg sync.WaitGroup } -func newEtcdController(ctx context.Context, clis []*clientv3.Client, fn ETCDCraeteFn) *etcdController { +func newEtcdController(ctx context.Context, clis []*clientv3.Client, fn ETCDCreateFn) *etcdController { c := &etcdController{ pctx: ctx, clients: clis, @@ -355,21 +363,25 @@ func (c *etcdController) run() { c.wg.Add(1) go func(cli *clientv3.Client) { defer c.wg.Done() - var ticker = time.NewTicker(tt) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for i := int64(0); i < burst; i++ { - err := c.Unary(c.ctx, cli) - if err != nil { - log.Error("meet erorr when doing etcd request", zap.String("case", c.Name()), zap.Error(err)) + c.wg.Add(int(burst)) + for i := int64(0); i < burst; i++ { + go func() { + defer c.wg.Done() + var ticker = time.NewTicker(tt) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := c.Unary(c.ctx, cli) + if err != nil { + log.Error("meet erorr when doing etcd request", zap.String("case", c.Name()), zap.Error(err)) + } + case <-c.ctx.Done(): + log.Info("Got signal to exit running etcd case") + return } } - case <-c.ctx.Done(): - log.Info("Got signal to exit running etcd case") - return - } + }() } }(cli) } diff --git a/tools/pd-ctl/pdctl/command/config_command.go b/tools/pd-ctl/pdctl/command/config_command.go index 0e6fa7e93407..c70c33e26c32 100644 --- a/tools/pd-ctl/pdctl/command/config_command.go +++ b/tools/pd-ctl/pdctl/command/config_command.go @@ -408,7 +408,7 @@ func setReplicationModeCommandFunc(cmd *cobra.Command, args []string) { } else if len(args) == 3 { t := reflectutil.FindFieldByJSONTag(reflect.TypeOf(config.ReplicationModeConfig{}), []string{args[0], args[1]}) if t != nil && t.Kind() == reflect.Int { - // convert to number for numberic fields. + // convert to number for numeric fields. arg2, err := strconv.ParseInt(args[2], 10, 64) if err != nil { cmd.Printf("value %v cannot covert to number: %v", args[2], err) diff --git a/tools/pd-recover/main.go b/tools/pd-recover/main.go index 375a9398a4ff..9b5d08013db6 100644 --- a/tools/pd-recover/main.go +++ b/tools/pd-recover/main.go @@ -62,7 +62,7 @@ func main() { fs.BoolVar(&v, "V", false, "print version information") fs.BoolVar(&fromOldMember, "from-old-member", false, "recover from a member of an existing cluster") fs.StringVar(&endpoints, "endpoints", "http://127.0.0.1:2379", "endpoints urls") - fs.Uint64Var(&allocID, "alloc-id", 0, "please make sure alloced ID is safe") + fs.Uint64Var(&allocID, "alloc-id", 0, "please make sure allocated ID is safe") fs.Uint64Var(&clusterID, "cluster-id", 0, "please make cluster ID match with tikv") fs.StringVar(&caPath, "cacert", "", "path of file that contains list of trusted SSL CAs") fs.StringVar(&certPath, "cert", "", "path of file that contains list of trusted SSL CAs")