From 8b2896bb2b29bde2f0bf5c46ef532cfc7914f266 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 16 Oct 2023 19:38:00 +0800 Subject: [PATCH 1/3] tests: add store limit test (#7214) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- .../mcs/scheduling/server_test.go | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 0760bd1fbb57..324c8e5cad5d 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -28,7 +28,9 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core/storelimit" mcs "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" @@ -377,3 +379,122 @@ func (suite *serverTestSuite) TestForwardRegionHeartbeat() { reflect.DeepEqual(region.GetDownPeers(), downPeers) && reflect.DeepEqual(region.GetPendingPeers(), pendingPeers) }) } + +func (suite *serverTestSuite) TestStoreLimit() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + oc := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetOperatorController() + leaderServer := suite.pdLeader.GetServer() + conf := leaderServer.GetReplicationConfig().Clone() + conf.MaxReplicas = 1 + leaderServer.SetReplicationConfig(*conf) + grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr()) + for i := uint64(1); i <= 2; i++ { + resp, err := grpcPDClient.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + + stream, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + for i := uint64(2); i <= 10; i++ { + peers := []*metapb.Peer{{Id: i, StoreId: 1}} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()), + Region: &metapb.Region{ + Id: i, + Peers: peers, + StartKey: []byte(fmt.Sprintf("t%d", i)), + EndKey: []byte(fmt.Sprintf("t%d", i+1)), + }, + Leader: peers[0], + ApproximateSize: 10 * units.MiB, + } + err = stream.Send(regionReq) + re.NoError(err) + } + + leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.AddPeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.RemovePeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60) + // There is a time window between setting store limit in API service side and capturing the change in scheduling service. + waitSyncFinish(re, tc, storelimit.AddPeer, 60) + for i := uint64(1); i <= 5; i++ { + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 120) + waitSyncFinish(re, tc, storelimit.AddPeer, 120) + for i := uint64(1); i <= 10; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.AddPeer, 60) + waitSyncFinish(re, tc, storelimit.AddPeer, 60) + for i := uint64(1); i <= 5; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60) + waitSyncFinish(re, tc, storelimit.RemovePeer, 60) + for i := uint64(1); i <= 5; i++ { + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 120) + waitSyncFinish(re, tc, storelimit.RemovePeer, 120) + for i := uint64(1); i <= 10; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.RemovePeer, 60) + waitSyncFinish(re, tc, storelimit.RemovePeer, 60) + for i := uint64(1); i <= 5; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorFail(re, oc, op) +} + +func checkOperatorSuccess(re *require.Assertions, oc *operator.Controller, op *operator.Operator) { + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + re.True(op.IsEnd()) + re.Equal(op, oc.GetOperatorStatus(op.RegionID()).Operator) +} + +func checkOperatorFail(re *require.Assertions, oc *operator.Controller, op *operator.Operator) { + re.False(oc.AddOperator(op)) + re.False(oc.RemoveOperator(op)) +} + +func waitSyncFinish(re *require.Assertions, tc *tests.TestSchedulingCluster, typ storelimit.Type, expectedLimit float64) { + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetPersistConfig().GetStoreLimitByType(2, typ) == expectedLimit + }) +} From c9a97a83d16ea74ae5c81e0dc754687f226a972e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 17 Oct 2023 14:33:29 +0800 Subject: [PATCH 2/3] mcs: support forwarding split and scatter request (#7190) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- go.mod | 2 +- go.sum | 4 +- pkg/mcs/scheduling/server/cluster.go | 12 +++ pkg/mcs/scheduling/server/grpc_service.go | 105 +++++++++++++++++++-- pkg/tso/allocator_manager.go | 6 +- server/grpc_service.go | 110 +++++++++++++++++++++- tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 +- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 +- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 +- tools/pd-api-bench/go.mod | 2 +- tools/pd-api-bench/go.sum | 4 +- 14 files changed, 236 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 0984d5bc9ec6..73f3d783db22 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b + github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 diff --git a/go.sum b/go.sum index 93c886cf13e9..6f73b8068607 100644 --- a/go.sum +++ b/go.sum @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 5031f07bd496..e4983eca7ea2 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -20,7 +20,9 @@ import ( "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" @@ -130,6 +132,16 @@ func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { return c.labelerManager } +// GetRegionSplitter returns the region splitter. +func (c *Cluster) GetRegionSplitter() *splitter.RegionSplitter { + return c.coordinator.GetRegionSplitter() +} + +// GetRegionScatterer returns the region scatter. +func (c *Cluster) GetRegionScatterer() *scatter.RegionScatterer { + return c.coordinator.GetRegionScatterer() +} + // GetStoresLoads returns load stats of all stores. func (c *Cluster) GetStoresLoads() map[uint64][]float64 { return c.hotStat.GetStoresLoads() diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index b9b682dfa74d..e2e2c8af32d5 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -140,13 +140,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat c := s.GetCluster() if c == nil { - resp := &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ - ClusterId: s.clusterID, - Error: &schedulingpb.Error{ - Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED, - Message: "scheduling server is not initialized yet", - }, - }} + resp := &schedulingpb.RegionHeartbeatResponse{Header: s.notBootstrappedHeader()} err := server.Send(resp) return errors.WithStack(err) } @@ -177,7 +171,7 @@ func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.Stor if c == nil { // TODO: add metrics log.Info("cluster isn't initialized") - return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil + return &schedulingpb.StoreHeartbeatResponse{Header: s.notBootstrappedHeader()}, nil } if c.GetStore(request.GetStats().GetStoreId()) == nil { @@ -191,6 +185,75 @@ func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.Stor return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil } +// SplitRegions split regions by the given split keys +func (s *Service) SplitRegions(ctx context.Context, request *schedulingpb.SplitRegionsRequest) (*schedulingpb.SplitRegionsResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } + finishedPercentage, newRegionIDs := c.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) + return &schedulingpb.SplitRegionsResponse{ + Header: s.header(), + RegionsId: newRegionIDs, + FinishedPercentage: uint64(finishedPercentage), + }, nil +} + +// ScatterRegions implements gRPC PDServer. +func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.ScatterRegionsRequest) (*schedulingpb.ScatterRegionsResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.ScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } + + opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) + if err != nil { + return nil, err + } + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + log.Debug("scatter regions", zap.Errors("failures", func() []error { + r := make([]error, 0, len(failures)) + for _, err := range failures { + r = append(r, err) + } + return r + }())) + } + return &schedulingpb.ScatterRegionsResponse{ + Header: s.header(), + FinishedPercentage: uint64(percentage), + }, nil +} + +// GetOperator gets information about the operator belonging to the specify region. +func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOperatorRequest) (*schedulingpb.GetOperatorResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil + } + + opController := c.GetCoordinator().GetOperatorController() + requestID := request.GetRegionId() + r := opController.GetOperatorStatus(requestID) + if r == nil { + header := s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_UNKNOWN, + Message: "Not Found", + }) + return &schedulingpb.GetOperatorResponse{Header: header}, nil + } + + return &schedulingpb.GetOperatorResponse{ + Header: s.header(), + RegionId: requestID, + Desc: []byte(r.Desc()), + Kind: []byte(r.Kind().String()), + Status: r.Status, + }, nil +} + // RegisterGRPCService registers the service to gRPC server. func (s *Service) RegisterGRPCService(g *grpc.Server) { schedulingpb.RegisterSchedulingServer(g, s) @@ -201,3 +264,29 @@ func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler handler, group := SetUpRestHandler(s) apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) } + +func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader { + return &schedulingpb.ResponseHeader{ + ClusterId: s.clusterID, + Error: err, + } +} + +func (s *Service) notBootstrappedHeader() *schedulingpb.ResponseHeader { + return s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED, + Message: "cluster is not initialized", + }) +} + +func (s *Service) header() *schedulingpb.ResponseHeader { + if s.clusterID == 0 { + return s.wrapErrorToHeader(schedulingpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready") + } + return &schedulingpb.ResponseHeader{ClusterId: s.clusterID} +} + +func (s *Service) wrapErrorToHeader( + errorType schedulingpb.ErrorType, message string) *schedulingpb.ResponseHeader { + return s.errorHeader(&schedulingpb.Error{Type: errorType, Message: message}) +} diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 55632f9f5ba9..df0ca0affc97 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -413,7 +413,7 @@ func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations if err != nil { log.Warn("get server id and dcLocation from etcd failed, invalid server id", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.Any("splitted-serverPath", serverPath), + zap.Any("split-serverPath", serverPath), zap.String("dc-location", dcLocation), errs.ZapError(err)) continue @@ -973,8 +973,8 @@ func (am *AllocatorManager) getDCLocationSuffixMapFromEtcd() (map[string]int32, if err != nil { return nil, err } - splittedKey := strings.Split(string(kv.Key), "/") - dcLocation := splittedKey[len(splittedKey)-1] + splitKey := strings.Split(string(kv.Key), "/") + dcLocation := splitKey[len(splitKey)-1] dcLocationSuffix[dcLocation] = int32(suffix) } return dcLocationSuffix, nil diff --git a/server/grpc_service.go b/server/grpc_service.go index d9f37a64d7dc..4fbfaa0f4075 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1770,6 +1770,35 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus // ScatterRegion implements gRPC PDServer. func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { + if s.IsAPIServiceMode() { + s.updateSchedulingClient(ctx) + if s.schedulingClient.Load() != nil { + regionsID := request.GetRegionsId() + if len(regionsID) == 0 { + return &pdpb.ScatterRegionResponse{ + Header: s.invalidValue("regions id is required"), + }, nil + } + req := &schedulingpb.ScatterRegionsRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + RegionsId: regionsID, + Group: request.GetGroup(), + RetryLimit: request.GetRetryLimit(), + SkipStoreLimit: request.GetSkipStoreLimit(), + } + resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().ScatterRegions(ctx, req) + if err != nil { + // reset to let it be updated in the next request + s.schedulingClient.Store(&schedulingClient{}) + return s.convertScatterResponse(resp), err + } + return s.convertScatterResponse(resp), nil + } + } + fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).ScatterRegion(ctx, request) } @@ -1960,6 +1989,25 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { + if s.IsAPIServiceMode() { + s.updateSchedulingClient(ctx) + if s.schedulingClient.Load() != nil { + req := &schedulingpb.GetOperatorRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + RegionId: request.GetRegionId(), + } + resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().GetOperator(ctx, req) + if err != nil { + // reset to let it be updated in the next request + s.schedulingClient.Store(&schedulingClient{}) + return s.convertOperatorResponse(resp), err + } + return s.convertOperatorResponse(resp), nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetOperator(ctx, request) } @@ -2049,6 +2097,45 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader { }) } +func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader { + switch header.GetError().GetType() { + case schedulingpb.ErrorType_UNKNOWN: + return &pdpb.ResponseHeader{ + ClusterId: header.GetClusterId(), + Error: &pdpb.Error{ + Type: pdpb.ErrorType_UNKNOWN, + Message: header.GetError().GetMessage(), + }, + } + default: + return &pdpb.ResponseHeader{ClusterId: header.GetClusterId()} + } +} + +func (s *GrpcServer) convertSplitResponse(resp *schedulingpb.SplitRegionsResponse) *pdpb.SplitRegionsResponse { + return &pdpb.SplitRegionsResponse{ + Header: s.convertHeader(resp.GetHeader()), + FinishedPercentage: resp.GetFinishedPercentage(), + } +} + +func (s *GrpcServer) convertScatterResponse(resp *schedulingpb.ScatterRegionsResponse) *pdpb.ScatterRegionResponse { + return &pdpb.ScatterRegionResponse{ + Header: s.convertHeader(resp.GetHeader()), + FinishedPercentage: resp.GetFinishedPercentage(), + } +} + +func (s *GrpcServer) convertOperatorResponse(resp *schedulingpb.GetOperatorResponse) *pdpb.GetOperatorResponse { + return &pdpb.GetOperatorResponse{ + Header: s.convertHeader(resp.GetHeader()), + RegionId: resp.GetRegionId(), + Desc: resp.GetDesc(), + Kind: resp.GetKind(), + Status: resp.GetStatus(), + } +} + // Only used for the TestLocalAllocatorLeaderChange. var mockLocalAllocatorLeaderChangeFlag = false @@ -2153,6 +2240,27 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest // SplitRegions split regions by the given split keys func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { + if s.IsAPIServiceMode() { + s.updateSchedulingClient(ctx) + if s.schedulingClient.Load() != nil { + req := &schedulingpb.SplitRegionsRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + SplitKeys: request.GetSplitKeys(), + RetryLimit: request.GetRetryLimit(), + } + resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().SplitRegions(ctx, req) + if err != nil { + // reset to let it be updated in the next request + s.schedulingClient.Store(&schedulingClient{}) + return s.convertSplitResponse(resp), err + } + return s.convertSplitResponse(resp), nil + } + } + fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).SplitRegions(ctx, request) } @@ -2175,7 +2283,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion } // SplitAndScatterRegions split regions by the given split keys, and scatter regions. -// Only regions which splited successfully will be scattered. +// Only regions which split successfully will be scattered. // scatterFinishedPercentage indicates the percentage of successfully splited regions that are scattered. func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) { fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index 8a6f8bd3a97d..ea933ae09961 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b + github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 1b83e4907a0d..785d69bbc330 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 075b31549ccb..536f34edbcf9 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b + github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index d7f02d2fec99..ad36f60c3ce7 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -407,8 +407,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 1f4edf955101..f8a742362c9f 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b + github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a github.com/stretchr/testify v1.8.4 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 848c96952334..7ae1d41a8232 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -401,8 +401,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-api-bench/go.mod b/tools/pd-api-bench/go.mod index ad1cc12827a1..94aa1ff0dc9b 100644 --- a/tools/pd-api-bench/go.mod +++ b/tools/pd-api-bench/go.mod @@ -69,7 +69,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect - github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b // indirect + github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.11.1 // indirect diff --git a/tools/pd-api-bench/go.sum b/tools/pd-api-bench/go.sum index 26080f5cb673..aaa400df5fd7 100644 --- a/tools/pd-api-bench/go.sum +++ b/tools/pd-api-bench/go.sum @@ -238,8 +238,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs= -github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= +github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= From a85f29ced89a32fb28f9cacc8b020360083d042f Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 17 Oct 2023 17:09:29 +0800 Subject: [PATCH 3/3] scheduling/watcher, storage: integrate rule watcher with the managers (#7213) ref tikv/pd#5839 Integrate rule watcher with the managers. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/rule/watcher.go | 83 +++++++++--- pkg/mcs/scheduling/server/server.go | 17 ++- pkg/schedule/labeler/labeler.go | 2 +- pkg/schedule/placement/rule_manager.go | 6 +- pkg/storage/endpoint/rule.go | 6 + .../integrations/mcs/scheduling/rule_test.go | 122 +++++++----------- 6 files changed, 132 insertions(+), 104 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 4cad6fdcbaed..912fb9c01e56 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -20,6 +20,9 @@ import ( "sync" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/schedule/checker" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" @@ -49,18 +52,27 @@ type Watcher struct { etcdClient *clientv3.Client ruleStorage endpoint.RuleStorage + // checkerController is used to add the suspect key ranges to the checker when the rule changed. + checkerController *checker.Controller + // ruleManager is used to manage the placement rules. + ruleManager *placement.RuleManager + // regionLabeler is used to manage the region label rules. + regionLabeler *labeler.RegionLabeler + ruleWatcher *etcdutil.LoopWatcher groupWatcher *etcdutil.LoopWatcher labelWatcher *etcdutil.LoopWatcher } // NewWatcher creates a new watcher to watch the Placement Rule change from PD API server. -// Please use `GetRuleStorage` to get the underlying storage to access the Placement Rules. func NewWatcher( ctx context.Context, etcdClient *clientv3.Client, clusterID uint64, ruleStorage endpoint.RuleStorage, + checkerController *checker.Controller, + ruleManager *placement.RuleManager, + regionLabeler *labeler.RegionLabeler, ) (*Watcher, error) { ctx, cancel := context.WithCancel(ctx) rw := &Watcher{ @@ -71,6 +83,9 @@ func NewWatcher( regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, ruleStorage: ruleStorage, + checkerController: checkerController, + ruleManager: ruleManager, + regionLabeler: regionLabeler, } err := rw.initializeRuleWatcher() if err != nil { @@ -90,17 +105,31 @@ func NewWatcher( func (rw *Watcher) initializeRuleWatcher() error { prefixToTrim := rw.rulesPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - // Since the PD API server will validate the rule before saving it to etcd, - // so we could directly save the string rule in JSON to the storage here. log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - return rw.ruleStorage.SaveRuleJSON( - strings.TrimPrefix(string(kv.Key), prefixToTrim), - string(kv.Value), - ) + rule, err := placement.NewRuleFromJSON(kv.Value) + if err != nil { + return err + } + // Update the suspect key ranges in the checker. + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { + rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + } + return rw.ruleManager.SetRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("delete placement rule", zap.String("key", string(kv.Key))) - return rw.ruleStorage.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) + key := string(kv.Key) + log.Info("delete placement rule", zap.String("key", key)) + ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim)) + if err != nil { + return err + } + rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) + if err != nil { + return err + } + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) } postEventFn := func() error { return nil @@ -120,14 +149,24 @@ func (rw *Watcher) initializeGroupWatcher() error { prefixToTrim := rw.ruleGroupPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - return rw.ruleStorage.SaveRuleGroupJSON( - strings.TrimPrefix(string(kv.Key), prefixToTrim), - string(kv.Value), - ) + ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) + if err != nil { + return err + } + // Add all rule key ranges within the group to the suspect key ranges. + for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } + return rw.ruleManager.SetRuleGroup(ruleGroup) } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("delete placement rule group", zap.String("key", string(kv.Key))) - return rw.ruleStorage.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim)) + key := string(kv.Key) + log.Info("delete placement rule group", zap.String("key", key)) + trimmedKey := strings.TrimPrefix(key, prefixToTrim) + for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } + return rw.ruleManager.DeleteRuleGroup(trimmedKey) } postEventFn := func() error { return nil @@ -147,14 +186,16 @@ func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - return rw.ruleStorage.SaveRegionRuleJSON( - strings.TrimPrefix(string(kv.Key), prefixToTrim), - string(kv.Value), - ) + rule, err := labeler.NewLabelRuleFromJSON(kv.Value) + if err != nil { + return err + } + return rw.regionLabeler.SetLabelRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("delete region label rule", zap.String("key", string(kv.Key))) - return rw.ruleStorage.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) + key := string(kv.Key) + log.Info("delete region label rule", zap.String("key", key)) + return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim)) } postEventFn := func() error { return nil diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 02cb1ba3c704..9caae9320375 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -455,7 +455,7 @@ func (s *Server) startServer() (err error) { func (s *Server) startCluster(context.Context) error { s.basicCluster = core.NewBasicCluster() s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - err := s.startWatcher() + err := s.startMetaConfWatcher() if err != nil { return err } @@ -464,7 +464,13 @@ func (s *Server) startCluster(context.Context) error { if err != nil { return err } + // Inject the cluster components into the config watcher after the scheduler controller is created. s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) + // Start the rule watcher after the cluster is created. + err = s.startRuleWatcher() + if err != nil { + return err + } s.cluster.StartBackgroundJobs() return nil } @@ -474,7 +480,7 @@ func (s *Server) stopCluster() { s.stopWatcher() } -func (s *Server) startWatcher() (err error) { +func (s *Server) startMetaConfWatcher() (err error) { s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster) if err != nil { return err @@ -483,7 +489,12 @@ func (s *Server) startWatcher() (err error) { if err != nil { return err } - s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage) + return err +} + +func (s *Server) startRuleWatcher() (err error) { + s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage, + s.cluster.GetCoordinator().GetCheckerController(), s.cluster.GetRuleManager(), s.cluster.GetRegionLabeler()) return err } diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index c525ac5c44f6..39722b1a0384 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -254,7 +254,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error { } } - // update inmemory states. + // update in-memory states. l.Lock() defer l.Unlock() diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 909c0fa10785..bdca4cc1b19d 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -135,8 +135,10 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat } func (m *RuleManager) loadRules() error { - var toSave []*Rule - var toDelete []string + var ( + toSave []*Rule + toDelete []string + ) err := m.storage.LoadRules(func(k, v string) { r, err := NewRuleFromJSON([]byte(v)) if err != nil { diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index 7e2813c23bde..125c5bc31eb8 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -22,6 +22,7 @@ import ( // RuleStorage defines the storage operations on the rule. type RuleStorage interface { + LoadRule(ruleKey string) (string, error) LoadRules(f func(k, v string)) error SaveRule(ruleKey string, rule interface{}) error SaveRuleJSON(ruleKey, rule string) error @@ -93,6 +94,11 @@ func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error { return se.Remove(regionLabelKeyPath(ruleKey)) } +// LoadRule load a placement rule from storage. +func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { + return se.Load(ruleKeyPath(ruleKey)) +} + // LoadRules loads placement rules from storage. func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { return se.loadRangeByPrefix(rulesPath+"/", f) diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index 104204dd6256..bffa58d0fe64 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -19,15 +19,9 @@ import ( "sort" "testing" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/keyspace" - "github.com/tikv/pd/pkg/mcs/scheduling/server/rule" - "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" - "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" ) @@ -41,7 +35,8 @@ type ruleTestSuite struct { // The PD cluster. cluster *tests.TestCluster // pdLeaderServer is the leader server of the PD cluster. - pdLeaderServer *tests.TestServer + pdLeaderServer *tests.TestServer + backendEndpoint string } func TestRule(t *testing.T) { @@ -59,6 +54,7 @@ func (suite *ruleTestSuite) SetupSuite() { re.NoError(err) leaderName := suite.cluster.WaitLeader() suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.backendEndpoint = suite.pdLeaderServer.GetAddr() re.NoError(suite.pdLeaderServer.BootstrapCluster()) } @@ -67,50 +63,18 @@ func (suite *ruleTestSuite) TearDownSuite() { suite.cluster.Destroy() } -func loadRules(re *require.Assertions, ruleStorage endpoint.RuleStorage) (rules []*placement.Rule) { - err := ruleStorage.LoadRules(func(_, v string) { - r, err := placement.NewRuleFromJSON([]byte(v)) - re.NoError(err) - rules = append(rules, r) - }) - re.NoError(err) - return -} - -func loadRuleGroups(re *require.Assertions, ruleStorage endpoint.RuleStorage) (groups []*placement.RuleGroup) { - err := ruleStorage.LoadRuleGroups(func(_, v string) { - rg, err := placement.NewRuleGroupFromJSON([]byte(v)) - re.NoError(err) - groups = append(groups, rg) - }) - re.NoError(err) - return -} - -func loadRegionRules(re *require.Assertions, ruleStorage endpoint.RuleStorage) (rules []*labeler.LabelRule) { - err := ruleStorage.LoadRegionRules(func(_, v string) { - lr, err := labeler.NewLabelRuleFromJSON([]byte(v)) - re.NoError(err) - rules = append(rules, lr) - }) - re.NoError(err) - return -} - func (suite *ruleTestSuite) TestRuleWatch() { re := suite.Require() - ruleStorage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) - // Create a rule watcher. - _, err := rule.NewWatcher( - suite.ctx, - suite.pdLeaderServer.GetEtcdClient(), - suite.cluster.GetCluster().GetId(), - ruleStorage, - ) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoint) re.NoError(err) - // Check the default rule. - rules := loadRules(re, ruleStorage) + defer tc.Destroy() + + tc.WaitForPrimaryServing(re) + cluster := tc.GetPrimaryServer().GetCluster() + ruleManager := cluster.GetRuleManager() + // Check the default rule and rule group. + rules := ruleManager.GetAllRules() re.Len(rules, 1) re.Equal("pd", rules[0].GroupID) re.Equal("default", rules[0].ID) @@ -119,12 +83,13 @@ func (suite *ruleTestSuite) TestRuleWatch() { re.Empty(rules[0].EndKey) re.Equal(placement.Voter, rules[0].Role) re.Empty(rules[0].LocationLabels) - // Check the empty rule group. - ruleGroups := loadRuleGroups(re, ruleStorage) - re.NoError(err) - re.Empty(ruleGroups) + ruleGroups := ruleManager.GetRuleGroups() + re.Len(ruleGroups, 1) + re.Equal("pd", ruleGroups[0].ID) + re.Equal(0, ruleGroups[0].Index) + re.False(ruleGroups[0].Override) // Set a new rule via the PD API server. - ruleManager := suite.pdLeaderServer.GetRaftCluster().GetRuleManager() + apiRuleManager := suite.pdLeaderServer.GetRaftCluster().GetRuleManager() rule := &placement.Rule{ GroupID: "2", ID: "3", @@ -133,10 +98,10 @@ func (suite *ruleTestSuite) TestRuleWatch() { StartKeyHex: "22", EndKeyHex: "dd", } - err = ruleManager.SetRule(rule) + err = apiRuleManager.SetRule(rule) re.NoError(err) testutil.Eventually(re, func() bool { - rules = loadRules(re, ruleStorage) + rules = ruleManager.GetAllRules() return len(rules) == 2 }) sort.Slice(rules, func(i, j int) bool { @@ -150,10 +115,10 @@ func (suite *ruleTestSuite) TestRuleWatch() { re.Equal(rule.StartKeyHex, rules[1].StartKeyHex) re.Equal(rule.EndKeyHex, rules[1].EndKeyHex) // Delete the rule. - err = ruleManager.DeleteRule(rule.GroupID, rule.ID) + err = apiRuleManager.DeleteRule(rule.GroupID, rule.ID) re.NoError(err) testutil.Eventually(re, func() bool { - rules = loadRules(re, ruleStorage) + rules = ruleManager.GetAllRules() return len(rules) == 1 }) re.Len(rules, 1) @@ -164,30 +129,35 @@ func (suite *ruleTestSuite) TestRuleWatch() { Index: 100, Override: true, } - err = ruleManager.SetRuleGroup(ruleGroup) + err = apiRuleManager.SetRuleGroup(ruleGroup) re.NoError(err) testutil.Eventually(re, func() bool { - ruleGroups = loadRuleGroups(re, ruleStorage) - return len(ruleGroups) == 1 + ruleGroups = ruleManager.GetRuleGroups() + return len(ruleGroups) == 2 }) - re.Len(ruleGroups, 1) - re.Equal(ruleGroup.ID, ruleGroups[0].ID) - re.Equal(ruleGroup.Index, ruleGroups[0].Index) - re.Equal(ruleGroup.Override, ruleGroups[0].Override) + re.Len(ruleGroups, 2) + re.Equal(ruleGroup.ID, ruleGroups[1].ID) + re.Equal(ruleGroup.Index, ruleGroups[1].Index) + re.Equal(ruleGroup.Override, ruleGroups[1].Override) // Delete the rule group. - err = ruleManager.DeleteRuleGroup(ruleGroup.ID) + err = apiRuleManager.DeleteRuleGroup(ruleGroup.ID) re.NoError(err) testutil.Eventually(re, func() bool { - ruleGroups = loadRuleGroups(re, ruleStorage) - return len(ruleGroups) == 0 + ruleGroups = ruleManager.GetRuleGroups() + return len(ruleGroups) == 1 }) - re.Empty(ruleGroups) + re.Len(ruleGroups, 1) // Test the region label rule watch. - labelRules := loadRegionRules(re, ruleStorage) - re.Len(labelRules, 1) - defaultKeyspaceRule := keyspace.MakeLabelRule(utils.DefaultKeyspaceID) - re.Equal(defaultKeyspaceRule, labelRules[0]) + regionLabeler := cluster.GetRegionLabeler() + labelRules := regionLabeler.GetAllLabelRules() + apiRegionLabeler := suite.pdLeaderServer.GetRaftCluster().GetRegionLabeler() + apiLabelRules := apiRegionLabeler.GetAllLabelRules() + re.Len(labelRules, len(apiLabelRules)) + re.Equal(apiLabelRules[0].ID, labelRules[0].ID) + re.Equal(apiLabelRules[0].Index, labelRules[0].Index) + re.Equal(apiLabelRules[0].Labels, labelRules[0].Labels) + re.Equal(apiLabelRules[0].RuleType, labelRules[0].RuleType) // Set a new region label rule. labelRule := &labeler.LabelRule{ ID: "rule1", @@ -195,11 +165,10 @@ func (suite *ruleTestSuite) TestRuleWatch() { RuleType: "key-range", Data: labeler.MakeKeyRanges("1234", "5678"), } - regionLabeler := suite.pdLeaderServer.GetRaftCluster().GetRegionLabeler() - err = regionLabeler.SetLabelRule(labelRule) + err = apiRegionLabeler.SetLabelRule(labelRule) re.NoError(err) testutil.Eventually(re, func() bool { - labelRules = loadRegionRules(re, ruleStorage) + labelRules = regionLabeler.GetAllLabelRules() return len(labelRules) == 2 }) sort.Slice(labelRules, func(i, j int) bool { @@ -220,17 +189,16 @@ func (suite *ruleTestSuite) TestRuleWatch() { SetRules: []*labeler.LabelRule{labelRule}, DeleteRules: []string{"rule1"}, } - err = regionLabeler.Patch(patch) + err = apiRegionLabeler.Patch(patch) re.NoError(err) testutil.Eventually(re, func() bool { - labelRules = loadRegionRules(re, ruleStorage) + labelRules = regionLabeler.GetAllLabelRules() return len(labelRules) == 2 }) sort.Slice(labelRules, func(i, j int) bool { return labelRules[i].ID < labelRules[j].ID }) re.Len(labelRules, 2) - re.Equal(defaultKeyspaceRule, labelRules[0]) re.Equal(labelRule.ID, labelRules[1].ID) re.Equal(labelRule.Labels, labelRules[1].Labels) re.Equal(labelRule.RuleType, labelRules[1].RuleType)