From 43ff40873ee7f92f375bfeb6ca41d2def2331430 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 19 Oct 2023 17:34:59 +0800 Subject: [PATCH 1/4] mcs: forward ask split request (#7208) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- go.mod | 2 +- go.sum | 4 +- pkg/core/region.go | 17 ++++++ pkg/mcs/scheduling/server/config/config.go | 5 ++ pkg/mcs/scheduling/server/grpc_service.go | 71 ++++++++++++++++++++++ server/cluster/cluster.go | 5 ++ server/cluster/cluster_worker.go | 21 +------ server/grpc_service.go | 27 ++++++++ tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 +- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 +- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 +- tests/server/cluster/cluster_work_test.go | 8 +-- tools/pd-api-bench/go.mod | 2 +- tools/pd-api-bench/go.sum | 4 +- 17 files changed, 146 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 73f3d783db2..86f56089347 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 diff --git a/go.sum b/go.sum index 6f73b806860..9392644f181 100644 --- a/go.sum +++ b/go.sum @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/core/region.go b/pkg/core/region.go index 6631005a608..6875844b7a6 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1675,6 +1675,23 @@ func (r *RegionsInfo) GetAverageRegionSize() int64 { return r.tree.TotalSize() / int64(r.tree.length()) } +// ValidRegion is used to decide if the region is valid. +func (r *RegionsInfo) ValidRegion(region *metapb.Region) error { + startKey := region.GetStartKey() + currnetRegion := r.GetRegionByKey(startKey) + if currnetRegion == nil { + return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(RegionToHexMeta(region))) + } + // If the request epoch is less than current region epoch, then returns an error. + regionEpoch := region.GetRegionEpoch() + currnetEpoch := currnetRegion.GetMeta().GetRegionEpoch() + if regionEpoch.GetVersion() < currnetEpoch.GetVersion() || + regionEpoch.GetConfVer() < currnetEpoch.GetConfVer() { + return errors.Errorf("invalid region epoch, request: %v, current: %v", regionEpoch, currnetEpoch) + } + return nil +} + // DiffRegionPeersInfo return the difference of peers info between two RegionInfo func DiffRegionPeersInfo(origin *RegionInfo, other *RegionInfo) string { var ret []string diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index d462a9a58b5..001a433ba07 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -650,6 +650,11 @@ func (o *PersistConfig) IsRaftKV2() bool { return o.GetStoreConfig().IsRaftKV2() } +// IsTikvRegionSplitEnabled returns whether tikv split region is disabled. +func (o *PersistConfig) IsTikvRegionSplitEnabled() bool { + return o.GetScheduleConfig().EnableTiKVSplitRegion +} + // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index e2e2c8af32d..8e47e7380f9 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -22,13 +22,16 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -254,6 +257,74 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper }, nil } +// AskBatchSplit implements gRPC PDServer. +func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) { + c := s.GetCluster() + if c == nil { + return &schedulingpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil + } + + if request.GetRegion() == nil { + return &schedulingpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN, + "missing region for split"), + }, nil + } + + if c.persistConfig.IsSchedulingHalted() { + return nil, errs.ErrSchedulingIsHalted.FastGenByArgs() + } + if !c.persistConfig.IsTikvRegionSplitEnabled() { + return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() + } + reqRegion := request.GetRegion() + splitCount := request.GetSplitCount() + err := c.ValidRegion(reqRegion) + if err != nil { + return nil, err + } + splitIDs := make([]*pdpb.SplitID, 0, splitCount) + recordRegions := make([]uint64, 0, splitCount+1) + + for i := 0; i < int(splitCount); i++ { + newRegionID, err := c.AllocID() + if err != nil { + return nil, errs.ErrSchedulerNotFound.FastGenByArgs() + } + + peerIDs := make([]uint64, len(request.Region.Peers)) + for i := 0; i < len(peerIDs); i++ { + if peerIDs[i], err = c.AllocID(); err != nil { + return nil, err + } + } + + recordRegions = append(recordRegions, newRegionID) + splitIDs = append(splitIDs, &pdpb.SplitID{ + NewRegionId: newRegionID, + NewPeerIds: peerIDs, + }) + + log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs)) + } + + recordRegions = append(recordRegions, reqRegion.GetId()) + if versioninfo.IsFeatureSupported(c.persistConfig.GetClusterVersion(), versioninfo.RegionMerge) { + // Disable merge the regions in a period of time. + c.GetCoordinator().GetMergeChecker().RecordRegionSplit(recordRegions) + } + + // If region splits during the scheduling process, regions with abnormal + // status may be left, and these regions need to be checked with higher + // priority. + c.GetCoordinator().GetCheckerController().AddSuspectRegions(recordRegions...) + + return &schedulingpb.AskBatchSplitResponse{ + Header: s.header(), + Ids: splitIDs, + }, nil +} + // RegisterGRPCService registers the service to gRPC server. func (s *Service) RegisterGRPCService(g *grpc.Server) { schedulingpb.RegisterSchedulingServer(g, s) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 7a0e99efa5f..25a47a7fca9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1236,6 +1236,11 @@ func (c *RaftCluster) GetRegions() []*core.RegionInfo { return c.core.GetRegions() } +// ValidRegion is used to decide if the region is valid. +func (c *RaftCluster) ValidRegion(region *metapb.Region) error { + return c.core.ValidRegion(region) +} + // GetTotalRegionCount returns total count of regions func (c *RaftCluster) GetTotalRegionCount() int { return c.core.GetTotalRegionCount() diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index c1da97363b5..a38ae86123f 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -50,7 +50,7 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs() } reqRegion := request.GetRegion() - err := c.ValidRequestRegion(reqRegion) + err := c.ValidRegion(reqRegion) if err != nil { return nil, err } @@ -90,23 +90,6 @@ func (c *RaftCluster) isSchedulingHalted() bool { return c.opt.IsSchedulingHalted() } -// ValidRequestRegion is used to decide if the region is valid. -func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { - startKey := reqRegion.GetStartKey() - region := c.GetRegionByKey(startKey) - if region == nil { - return errors.Errorf("region not found, request region: %v", logutil.RedactStringer(core.RegionToHexMeta(reqRegion))) - } - // If the request epoch is less than current region epoch, then returns an error. - reqRegionEpoch := reqRegion.GetRegionEpoch() - regionEpoch := region.GetMeta().GetRegionEpoch() - if reqRegionEpoch.GetVersion() < regionEpoch.GetVersion() || - reqRegionEpoch.GetConfVer() < regionEpoch.GetConfVer() { - return errors.Errorf("invalid region epoch, request: %v, current: %v", reqRegionEpoch, regionEpoch) - } - return nil -} - // HandleAskBatchSplit handles the batch split request. func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { if c.isSchedulingHalted() { @@ -117,7 +100,7 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (* } reqRegion := request.GetRegion() splitCount := request.GetSplitCount() - err := c.ValidRequestRegion(reqRegion) + err := c.ValidRegion(reqRegion) if err != nil { return nil, err } diff --git a/server/grpc_service.go b/server/grpc_service.go index 4fbfaa0f407..9208aa0c5dc 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1622,6 +1622,26 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { + if s.IsAPIServiceMode() { + s.updateSchedulingClient(ctx) + if s.schedulingClient.Load() != nil { + req := &schedulingpb.AskBatchSplitRequest{ + Header: &schedulingpb.RequestHeader{ + ClusterId: request.GetHeader().GetClusterId(), + SenderId: request.GetHeader().GetSenderId(), + }, + Region: request.GetRegion(), + SplitCount: request.GetSplitCount(), + } + resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().AskBatchSplit(ctx, req) + if err != nil { + // reset to let it be updated in the next request + s.schedulingClient.Store(&schedulingClient{}) + return s.convertAskSplitResponse(resp), err + } + return s.convertAskSplitResponse(resp), nil + } + } fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).AskBatchSplit(ctx, request) } @@ -2136,6 +2156,13 @@ func (s *GrpcServer) convertOperatorResponse(resp *schedulingpb.GetOperatorRespo } } +func (s *GrpcServer) convertAskSplitResponse(resp *schedulingpb.AskBatchSplitResponse) *pdpb.AskBatchSplitResponse { + return &pdpb.AskBatchSplitResponse{ + Header: s.convertHeader(resp.GetHeader()), + Ids: resp.GetIds(), + } +} + // Only used for the TestLocalAllocatorLeaderChange. var mockLocalAllocatorLeaderChangeFlag = false diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index ea933ae0996..e38efbeb438 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 785d69bbc33..c745c4fa518 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 536f34edbcf..000bfdc8312 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index ad36f60c3ce..0da75329284 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -407,8 +407,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index f8a742362c9..f8a5cfac75f 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/stretchr/testify v1.8.4 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 7ae1d41a823..63327985f0d 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -401,8 +401,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/server/cluster/cluster_work_test.go b/tests/server/cluster/cluster_work_test.go index ef09e522305..eabecf8e29b 100644 --- a/tests/server/cluster/cluster_work_test.go +++ b/tests/server/cluster/cluster_work_test.go @@ -64,13 +64,13 @@ func TestValidRequestRegion(t *testing.T) { err = rc.HandleRegionHeartbeat(r1) re.NoError(err) r2 := &metapb.Region{Id: 2, StartKey: []byte("a"), EndKey: []byte("b")} - re.Error(rc.ValidRequestRegion(r2)) + re.Error(rc.ValidRegion(r2)) r3 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} - re.Error(rc.ValidRequestRegion(r3)) + re.Error(rc.ValidRegion(r3)) r4 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}} - re.Error(rc.ValidRequestRegion(r4)) + re.Error(rc.ValidRegion(r4)) r5 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}} - re.NoError(rc.ValidRequestRegion(r5)) + re.NoError(rc.ValidRegion(r5)) rc.Stop() } diff --git a/tools/pd-api-bench/go.mod b/tools/pd-api-bench/go.mod index 94aa1ff0dc9..e6e896a0797 100644 --- a/tools/pd-api-bench/go.mod +++ b/tools/pd-api-bench/go.mod @@ -69,7 +69,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect - github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a // indirect + github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.11.1 // indirect diff --git a/tools/pd-api-bench/go.sum b/tools/pd-api-bench/go.sum index aaa400df5fd..f40a4fe2f5a 100644 --- a/tools/pd-api-bench/go.sum +++ b/tools/pd-api-bench/go.sum @@ -238,8 +238,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a h1:xbak0nFiTkinvdtr/EUK0dvdkX32twKRpIgsMRiiVQA= -github.com/pingcap/kvproto v0.0.0-20231017055627-c06b434f3c3a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c h1:5cpE29kMHjc8fv+mRiXbLTDfoZHiX5BTK6knVWlWvqk= +github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= From 445319f8115db75ed55ed92718c105dffd4a56fb Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 19 Oct 2023 18:39:29 +0800 Subject: [PATCH 2/4] mcs: fix panic of forwarding request (#7220) ref tikv/pd#5839 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/grpc_service.go | 49 +++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 9208aa0c5dc..da9f71170c3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -69,6 +69,7 @@ var ( ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") + ErrNotFoundSchedulingAddr = status.Errorf(codes.NotFound, "not found scheduling address") ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") @@ -1002,8 +1003,8 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear s.handleDamagedStore(request.GetStats()) storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, _ := s.updateSchedulingClient(ctx) + if forwardCli != nil { req := &schedulingpb.StoreHeartbeatRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1011,7 +1012,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear }, Stats: request.GetStats(), } - if _, err := s.schedulingClient.Load().(*schedulingClient).getClient().StoreHeartbeat(ctx, req); err != nil { + if _, err := forwardCli.StoreHeartbeat(ctx, req); err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) } @@ -1030,7 +1031,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear return resp, nil } -func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { +func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) pre := s.schedulingClient.Load() if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { @@ -1038,11 +1039,14 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) { if err != nil { log.Error("get delegate client failed", zap.Error(err)) } - s.schedulingClient.Store(&schedulingClient{ + forwardCli := &schedulingClient{ client: schedulingpb.NewSchedulingClient(client), lastPrimary: forwardedHost, - }) + } + s.schedulingClient.Store(forwardCli) + return forwardCli.getClient(), nil } + return nil, ErrNotFoundSchedulingAddr } // bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error @@ -1791,8 +1795,13 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus // ScatterRegion implements gRPC PDServer. func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { regionsID := request.GetRegionsId() if len(regionsID) == 0 { return &pdpb.ScatterRegionResponse{ @@ -1809,7 +1818,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg RetryLimit: request.GetRetryLimit(), SkipStoreLimit: request.GetSkipStoreLimit(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().ScatterRegions(ctx, req) + resp, err := forwardCli.ScatterRegions(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) @@ -2010,8 +2019,13 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb // GetOperator gets information about the operator belonging to the specify region. func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.GetOperatorResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.GetOperatorRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -2019,7 +2033,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR }, RegionId: request.GetRegionId(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().GetOperator(ctx, req) + resp, err := forwardCli.GetOperator(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) @@ -2268,8 +2282,13 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest // SplitRegions split regions by the given split keys func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.SplitRegionsResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.SplitRegionsRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -2278,7 +2297,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion SplitKeys: request.GetSplitKeys(), RetryLimit: request.GetRetryLimit(), } - resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().SplitRegions(ctx, req) + resp, err := forwardCli.SplitRegions(ctx, req) if err != nil { // reset to let it be updated in the next request s.schedulingClient.Store(&schedulingClient{}) From d9dbf77a410362f54b8c79134b5908fd3964046a Mon Sep 17 00:00:00 2001 From: lucasliang Date: Thu, 19 Oct 2023 18:52:59 +0800 Subject: [PATCH 3/4] schedulers: transplant the `recovery-duration` setting to `evict-slow-store` scheduler. (#7225) ref tikv/pd#7156 This pr is used to transplant the `recovery-duration` imported in `evict-slow-trend` scheduler into `evict-slow-store` scheduler. It's useful and necessary for users who wanna use `evict-slow-score` scheduler before we GA `evict-slow-trend` scheduler. Signed-off-by: lucasliang Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/schedulers/evict_slow_store.go | 97 +++++++++++++++++-- .../schedulers/evict_slow_store_test.go | 4 + pkg/schedule/schedulers/init.go | 2 +- tests/pdctl/scheduler/scheduler_test.go | 31 +++--- tools/pd-ctl/pdctl/command/scheduler.go | 20 ++++ 5 files changed, 130 insertions(+), 24 deletions(-) diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index a6665c3e5e7..cc1b16300c5 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -15,6 +15,11 @@ package schedulers import ( + "net/http" + "sync/atomic" + "time" + + "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -23,6 +28,8 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -40,8 +47,27 @@ const ( var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") type evictSlowStoreSchedulerConfig struct { - storage endpoint.ConfigStorage - EvictedStores []uint64 `json:"evict-stores"` + storage endpoint.ConfigStorage + // Last timestamp of the chosen slow store for eviction. + lastSlowStoreCaptureTS time.Time + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + EvictedStores []uint64 `json:"evict-stores"` +} + +func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + storage: storage, + lastSlowStoreCaptureTS: time.Time{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap), + } } func (conf *evictSlowStoreSchedulerConfig) Persist() error { @@ -78,8 +104,18 @@ func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { return conf.EvictedStores[0] } +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { + recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap) + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap +} + func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { conf.EvictedStores = []uint64{id} + conf.lastSlowStoreCaptureTS = time.Now() return conf.Persist() } @@ -87,14 +123,58 @@ func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err oldID = conf.evictStore() if oldID > 0 { conf.EvictedStores = []uint64{} + conf.lastSlowStoreCaptureTS = time.Time{} err = conf.Persist() } return } +type evictSlowStoreHandler struct { + rd *render.Render + config *evictSlowStoreSchedulerConfig +} + +func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler { + h := &evictSlowStoreHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + recoveryDurationGap := (uint64)(recoveryDurationGapFloat) + prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap) + atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap) + log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + type evictSlowStoreScheduler struct { *BaseScheduler - conf *evictSlowStoreSchedulerConfig + conf *evictSlowStoreSchedulerConfig + handler http.Handler +} + +func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } func (s *evictSlowStoreScheduler) GetName() string { @@ -168,7 +248,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // slow node next time. log.Info("slow store has been removed", zap.Uint64("store-id", store.GetID())) - } else if store.GetSlowScore() <= slowStoreRecoverThreshold { + } else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() { log.Info("slow store has been recovered", zap.Uint64("store-id", store.GetID())) } else { @@ -211,11 +291,10 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { - base := NewBaseScheduler(opController) - - s := &evictSlowStoreScheduler{ - BaseScheduler: base, + handler := newEvictSlowStoreHandler(conf) + return &evictSlowStoreScheduler{ + BaseScheduler: NewBaseScheduler(opController), conf: conf, + handler: handler, } - return s } diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 0b0c1d9ad39..813d17ae541 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -67,6 +67,7 @@ func (suite *evictSlowStoreTestSuite) TearDownTest() { } func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)")) storeInfo := suite.tc.GetStore(1) newStoreInfo := storeInfo.Clone(func(store *core.StoreInfo) { store.GetStoreStats().SlowScore = 100 @@ -113,6 +114,8 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { suite.NoError(err) suite.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) suite.Zero(persistValue.evictStore()) + suite.True(persistValue.readyForRecovery()) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap")) } func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { @@ -124,6 +127,7 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictStore()) + suite.False(es2.conf.readyForRecovery()) // prepare with evict store. suite.es.Prepare(suite.tc) } diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index dde10610643..d45602b90e1 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -159,7 +159,7 @@ func schedulersRegister() { }) RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)} + conf := initEvictSlowStoreSchedulerConfig(storage) if err := decoder(conf); err != nil { return nil, err } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index c5b118a9f5e..b3d9f356ad1 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -407,20 +407,23 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) re.Contains(echo, "Success!") - // test evict-slow-trend scheduler config - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-trend-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.Contains(echo, "evict-slow-trend-scheduler") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "set", "recovery-duration", "100"}, nil) - re.Contains(echo, "Success!") - conf = make(map[string]interface{}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "show"}, &conf) - re.Equal(100., conf["recovery-duration"]) - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-slow-trend-scheduler"}, nil) - re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.NotContains(echo, "evict-slow-trend-scheduler") + // test evict-slow-store && evict-slow-trend schedulers config + evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} + for _, schedulerName := range evictSlownessSchedulers { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.Contains(echo, schedulerName) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) + re.Equal(100., conf["recovery-duration"]) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.NotContains(echo, schedulerName) + } // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(status string, expected []string) { diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 40bf9a07a4d..4349735f06d 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -499,6 +499,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigGrantHotRegionCommand(), newConfigBalanceLeaderCommand(), newSplitBucketCommand(), + newConfigEvictSlowStoreCommand(), newConfigEvictSlowTrendCommand(), ) return c @@ -776,6 +777,25 @@ func setShuffleRegionSchedulerRolesCommandFunc(cmd *cobra.Command, args []string cmd.Println("Success!") } +func newConfigEvictSlowStoreCommand() *cobra.Command { + c := &cobra.Command{ + Use: "evict-slow-store-scheduler", + Short: "evict-slow-store-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "list the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + return c +} + func newConfigEvictSlowTrendCommand() *cobra.Command { c := &cobra.Command{ Use: "evict-slow-trend-scheduler", From 43551e221d8689e43bc4ee2e3e6a13e10ea7c491 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 20 Oct 2023 14:13:59 +0800 Subject: [PATCH 4/4] coordinator, mcs/scheduling: fix the default schedulers initialization (#7236) close tikv/pd#7169 Fix the default scheduler initialization of the scheduling service. Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/config/config.go | 18 ++++++--- pkg/schedule/coordinator.go | 21 +++++++--- .../mcs/scheduling/server_test.go | 38 +++++++++++++------ 3 files changed, 54 insertions(+), 23 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 001a433ba07..4f9caca41e6 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "strings" "sync/atomic" "time" @@ -232,6 +233,14 @@ func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} { return v.(chan<- struct{}) } +func (o *PersistConfig) tryNotifySchedulersUpdating() { + notifier := o.getSchedulersUpdatingNotifier() + if notifier == nil { + return + } + notifier <- struct{}{} +} + // GetClusterVersion returns the cluster version. func (o *PersistConfig) GetClusterVersion() *semver.Version { return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion)) @@ -251,11 +260,10 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig { func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { old := o.GetScheduleConfig() o.schedule.Store(cfg) - // The coordinator is not aware of the underlying scheduler config changes, however, it - // should react on the scheduler number changes to handle the add/remove scheduler events. - if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil && - len(old.Schedulers) != len(cfg.Schedulers) { - notifier <- struct{}{} + // The coordinator is not aware of the underlying scheduler config changes, + // we should notify it to update the schedulers proactively. + if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) { + o.tryNotifySchedulersUpdating() } } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index f35bd6d4de3..8fb9ec8b286 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -458,13 +458,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) continue } - log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if needRun { + log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } - } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { - log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName())) + if err = c.schedulers.AddSchedulerHandler(s); err != nil { + log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } } } @@ -484,8 +487,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) { continue } - log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if needRun { + log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } else { @@ -493,8 +496,14 @@ func (c *Coordinator) InitSchedulers(needRun bool) { scheduleCfg.Schedulers[k] = schedulerCfg k++ } - } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { - log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + log.Info("create scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) + if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + scheduleCfg.Schedulers[k] = schedulerCfg + k++ + } } } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 324c8e5cad5..85cf84361b4 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -265,18 +265,32 @@ func (suite *serverTestSuite) TestSchedulerSync() { api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName) checkEvictLeaderSchedulerExist(re, schedulersController, false) - // TODO: test more schedulers. - // Fixme: the following code will fail because the scheduler is not removed but not synced. - // checkDelete := func(schedulerName string) { - // re.NotNil(schedulersController.GetScheduler(schedulers.BalanceLeaderName) != nil) - // api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.BalanceLeaderName) - // testutil.Eventually(re, func() bool { - // return schedulersController.GetScheduler(schedulers.BalanceLeaderName) == nil - // }) - // } - // checkDelete(schedulers.BalanceLeaderName) - // checkDelete(schedulers.BalanceRegionName) - // checkDelete(schedulers.HotRegionName) + // The default scheduler could not be deleted, it could only be disabled. + defaultSchedulerNames := []string{ + schedulers.BalanceLeaderName, + schedulers.BalanceRegionName, + schedulers.BalanceWitnessName, + schedulers.HotRegionName, + schedulers.TransferWitnessLeaderName, + } + checkDisabled := func(name string, shouldDisabled bool) { + re.NotNil(schedulersController.GetScheduler(name), name) + testutil.Eventually(re, func() bool { + disabled, err := schedulersController.IsSchedulerDisabled(name) + re.NoError(err, name) + return disabled == shouldDisabled + }) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, false) + api.MustDeleteScheduler(re, suite.backendEndpoints, name) + checkDisabled(name, true) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, true) + api.MustAddScheduler(re, suite.backendEndpoints, name, nil) + checkDisabled(name, false) + } } func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) {