From 74b1b42265de025a58afc803c80dc5b7714b2344 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 21 Sep 2023 14:24:13 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #7122 close tikv/pd#7121 Signed-off-by: ti-chi-bot --- pkg/mcs/scheduling/server/cluster.go | 466 ++++++++++++++++++++ pkg/mock/mockcluster/mockcluster.go | 5 + pkg/schedule/checker/rule_checker_test.go | 35 ++ pkg/schedule/placement/rule_manager.go | 5 +- pkg/schedule/placement/rule_manager_test.go | 9 +- pkg/statistics/region_collection_test.go | 4 +- server/api/operator_test.go | 4 +- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 +- server/config/persist_options.go | 7 + server/server.go | 15 +- tests/pdctl/config/config_test.go | 40 +- 12 files changed, 583 insertions(+), 19 deletions(-) create mode 100644 pkg/mcs/scheduling/server/cluster.go diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go new file mode 100644 index 00000000000..b2986f722df --- /dev/null +++ b/pkg/mcs/scheduling/server/cluster.go @@ -0,0 +1,466 @@ +package server + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/schedulingpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" +) + +// Cluster is used to manage all information for scheduling purpose. +type Cluster struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + *core.BasicCluster + persistConfig *config.PersistConfig + ruleManager *placement.RuleManager + labelerManager *labeler.RegionLabeler + regionStats *statistics.RegionStatistics + labelStats *statistics.LabelStatistics + hotStat *statistics.HotStat + storage storage.Storage + coordinator *schedule.Coordinator + checkMembershipCh chan struct{} + apiServerLeader atomic.Value + clusterID uint64 +} + +const regionLabelGCInterval = time.Hour + +// NewCluster creates a new cluster. +func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { + ctx, cancel := context.WithCancel(parentCtx) + labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) + if err != nil { + cancel() + return nil, err + } + ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) + c := &Cluster{ + ctx: ctx, + cancel: cancel, + BasicCluster: basicCluster, + ruleManager: ruleManager, + labelerManager: labelerManager, + persistConfig: persistConfig, + hotStat: statistics.NewHotStat(ctx), + labelStats: statistics.NewLabelStatistics(), + regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), + storage: storage, + clusterID: clusterID, + checkMembershipCh: checkMembershipCh, + } + c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) + if err != nil { + cancel() + return nil, err + } + return c, nil +} + +// GetCoordinator returns the coordinator +func (c *Cluster) GetCoordinator() *schedule.Coordinator { + return c.coordinator +} + +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + +// GetBasicCluster returns the basic cluster. +func (c *Cluster) GetBasicCluster() *core.BasicCluster { + return c.BasicCluster +} + +// GetSharedConfig returns the shared config. +func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { + return c.persistConfig +} + +// GetRuleManager returns the rule manager. +func (c *Cluster) GetRuleManager() *placement.RuleManager { + return c.ruleManager +} + +// GetRegionLabeler returns the region labeler. +func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { + return c.labelerManager +} + +// GetStoresLoads returns load stats of all stores. +func (c *Cluster) GetStoresLoads() map[uint64][]float64 { + return c.hotStat.GetStoresLoads() +} + +// IsRegionHot checks if a region is in hot state. +func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { + return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// GetHotPeerStat returns hot peer stat with specified regionID and storeID. +func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { + return c.hotStat.GetHotPeerStat(rw, regionID, storeID) +} + +// RegionReadStats returns hot region's read stats. +// The result only includes peers that are hot enough. +// RegionStats is a thread-safe method +func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + return c.hotStat.RegionStats(utils.Read, threshold) +} + +// RegionWriteStats returns hot region's write stats. +// The result only includes peers that are hot enough. +func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { + // RegionStats is a thread-safe method + return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +} + +// BucketsStats returns hot region's buckets stats. +func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { + return c.hotStat.BucketsStats(degree, regionIDs...) +} + +// GetStorage returns the storage. +func (c *Cluster) GetStorage() storage.Storage { + return c.storage +} + +// GetCheckerConfig returns the checker config. +func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } + +// GetSchedulerConfig returns the scheduler config. +func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } + +// GetStoreConfig returns the store config. +func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err + } + resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + if err != nil { + c.checkMembershipCh <- struct{}{} + return 0, err + } + return resp.GetId(), nil +} + +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.checkMembershipCh <- struct{}{} + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + +// SwitchAPIServerLeader switches the API server leader. +func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { + old := c.apiServerLeader.Load() + return c.apiServerLeader.CompareAndSwap(old, new) +} + +// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) updateScheduler() { + defer logutil.LogPanic() + defer c.wg.Done() + + // Make sure the coordinator has initialized all the existing schedulers. + c.waitSchedulersInitialized() + // Establish a notifier to listen the schedulers updating. + notifier := make(chan struct{}, 1) + // Make sure the check will be triggered once later. + notifier <- struct{}{} + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + for { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-notifier: + // This is triggered by the watcher when the schedulers are updated. + } + + log.Info("schedulers updating notifier is triggered, try to update the scheduler") + var ( + schedulersController = c.coordinator.GetSchedulersController() + latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers + ) + // Create the newly added schedulers. + for _, scheduler := range latestSchedulersConfig { + s, err := schedulers.CreateScheduler( + scheduler.Type, + c.coordinator.GetOperatorController(), + c.storage, + schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args), + schedulersController.RemoveScheduler, + ) + if err != nil { + log.Error("failed to create scheduler", + zap.String("scheduler-type", scheduler.Type), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + name := s.GetName() + if existed, _ := schedulersController.IsSchedulerExisted(name); existed { + log.Info("scheduler has already existed, skip adding it", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + continue + } + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + log.Error("failed to add scheduler", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + log.Info("add scheduler successfully", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + } + // Remove the deleted schedulers. + for _, name := range schedulersController.GetSchedulerNames() { + scheduler := schedulersController.GetScheduler(name) + if slice.AnyOf(latestSchedulersConfig, func(i int) bool { + return latestSchedulersConfig[i].Type == scheduler.GetType() + }) { + continue + } + if err := schedulersController.RemoveScheduler(name); err != nil { + log.Error("failed to remove scheduler", + zap.String("scheduler-name", name), + errs.ZapError(err)) + continue + } + log.Info("remove scheduler successfully", + zap.String("scheduler-name", name)) + } + } +} + +func (c *Cluster) waitSchedulersInitialized() { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + if c.coordinator.AreSchedulersInitialized() { + return + } + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop waiting the schedulers initialization") + return + case <-ticker.C: + } + } +} + +// TODO: implement the following methods + +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { + for _, region := range regions { + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + } +} + +func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { + stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) + for _, p := range region.GetPeers() { + if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { + stores = append(stores, store) + } + } + return stores +} + +// HandleStoreHeartbeat updates the store status. +func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { + stats := heartbeat.GetStats() + storeID := stats.GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("store %v not found", storeID) + } + + nowTime := time.Now() + newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + + if store := c.GetStore(storeID); store != nil { + statistics.UpdateStoreHeartbeatMetrics(store) + } + c.PutStore(newStore) + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + reportInterval := stats.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + return nil +} + +// runUpdateStoreStats updates store stats periodically. +func (c *Cluster) runUpdateStoreStats() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(9 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("update store stats background jobs has been stopped") + return + case <-ticker.C: + c.UpdateAllStoreStatus() + } + } +} + +// StartBackgroundJobs starts background jobs. +func (c *Cluster) StartBackgroundJobs() { + c.wg.Add(2) + go c.updateScheduler() + go c.runUpdateStoreStats() +} + +// StopBackgroundJobs stops background jobs. +func (c *Cluster) StopBackgroundJobs() { + c.cancel() + c.wg.Wait() +} + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + if err := c.processRegionHeartbeat(region); err != nil { + return err + } + + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { + origin, _, err := c.PreCheckPutRegion(region) + if err != nil { + return err + } + if c.GetStoreConfig().IsEnableRegionBucket() { + region.InheritBuckets(origin) + } + + cluster.HandleStatsAsync(c, region) + + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + // Mark isNew if the region in cache does not have leader. + changed := core.GenerateRegionGuideFunc(true)(region, origin) + if !changed.SaveCache && !changed.IsNew { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + return nil + } + + var overlaps []*core.RegionInfo + if changed.SaveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + return err + } + + cluster.HandleOverlaps(c, overlaps) + } + + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared()) + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 87cbc8479b2..f9fed9ed56d 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -188,8 +188,13 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { +<<<<<<< HEAD mc.RuleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), mc, mc.GetOpts()) mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels) +======= + mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig()) + mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) } } diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index cbd7624f3b1..ad140e91606 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -112,6 +112,41 @@ func (suite *ruleCheckerTestSuite) TestAddRulePeerWithIsolationLevel() { suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) } +func (suite *ruleCheckerTestSuite) TestReplaceDownPeerWithIsolationLevel() { + suite.cluster.SetMaxStoreDownTime(100 * time.Millisecond) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2", "host": "h3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z2", "host": "h4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3", "host": "h5"}) + suite.cluster.AddLabelsStore(6, 1, map[string]string{"zone": "z3", "host": "h6"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 3, 5) + suite.ruleManager.DeleteRule("pd", "default") + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone", "host"}, + IsolationLevel: "zone", + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + region := suite.cluster.GetRegion(1) + downPeer := []*pdpb.PeerStats{ + {Peer: region.GetStorePeer(5), DownSeconds: 6000}, + } + region = region.Clone(core.WithDownPeers(downPeer)) + suite.cluster.PutRegion(region) + suite.cluster.SetStoreDown(5) + suite.cluster.SetStoreDown(6) + time.Sleep(200 * time.Millisecond) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + func (suite *ruleCheckerTestSuite) TestFixPeer() { suite.cluster.AddLeaderStore(1, 1) suite.cluster.AddLeaderStore(2, 1) diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index d3f6bda066b..c56a9525b84 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -66,7 +66,7 @@ func NewRuleManager(storage endpoint.RuleStorage, storeSetInformer core.StoreSet // Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is // compatible with previous configuration. -func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error { +func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error { m.Lock() defer m.Unlock() if m.initialized { @@ -93,6 +93,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Role: Voter, Count: maxReplica - witnessCount, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, }, { GroupID: "pd", @@ -101,6 +102,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Count: witnessCount, IsWitness: true, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, }, }..., ) @@ -111,6 +113,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Role: Voter, Count: maxReplica, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, }) } for _, defaultRule := range defaultRules { diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 894f78f1fef..1d2500edd79 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -33,8 +33,13 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) var err error manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) +<<<<<<< HEAD manager.conf.SetWitnessEnabled(enableWitness) err = manager.Initialize(3, []string{"zone", "rack", "host"}) +======= + manager.conf.SetEnableWitness(enableWitness) + err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) re.NoError(err) return store, manager } @@ -157,7 +162,7 @@ func TestSaveLoad(t *testing.T) { } m2 := NewRuleManager(store, nil, nil) - err := m2.Initialize(3, []string{"no", "labels"}) + err := m2.Initialize(3, []string{"no", "labels"}, "") re.NoError(err) re.Len(m2.GetAllRules(), 3) re.Equal(rules[0].String(), m2.GetRule("pd", "default").String()) @@ -173,7 +178,7 @@ func TestSetAfterGet(t *testing.T) { manager.SetRule(rule) m2 := NewRuleManager(store, nil, nil) - err := m2.Initialize(100, []string{}) + err := m2.Initialize(100, []string{}, "") re.NoError(err) rule = m2.GetRule("pd", "default") re.Equal(1, rule.Count) diff --git a/pkg/statistics/region_collection_test.go b/pkg/statistics/region_collection_test.go index 1e071900708..005b2ae84f4 100644 --- a/pkg/statistics/region_collection_test.go +++ b/pkg/statistics/region_collection_test.go @@ -30,7 +30,7 @@ func TestRegionStatistics(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}) + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(false) @@ -135,7 +135,7 @@ func TestRegionStatisticsWithPlacementRule(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}) + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(true) diff --git a/server/api/operator_test.go b/server/api/operator_test.go index ddb605c7d87..ee849552f09 100644 --- a/server/api/operator_test.go +++ b/server/api/operator_test.go @@ -383,7 +383,9 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul if testCase.placementRuleEnable { err := suite.svr.GetRaftCluster().GetRuleManager().Initialize( suite.svr.GetRaftCluster().GetOpts().GetMaxReplicas(), - suite.svr.GetRaftCluster().GetOpts().GetLocationLabels()) + suite.svr.GetRaftCluster().GetOpts().GetLocationLabels(), + suite.svr.GetRaftCluster().GetOpts().GetIsolationLevel(), + ) suite.NoError(err) } if len(testCase.rules) > 0 { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 0e2d825895b..d12feb41e27 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -284,7 +284,7 @@ func (c *RaftCluster) Start(s Server) error { } c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { - err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels()) + err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) if err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 06c878b9faf..9408ce4931d 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -231,7 +231,7 @@ func TestSetOfflineStore(t *testing.T) { cluster.coordinator = newCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -428,7 +428,7 @@ func TestUpStore(t *testing.T) { cluster.coordinator = newCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -531,7 +531,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { cluster.coordinator = newCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -1257,7 +1257,7 @@ func TestOfflineAndMerge(t *testing.T) { cluster.coordinator = newCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -2044,7 +2044,7 @@ func newTestRaftCluster( rc.InitCluster(id, opt, s, basicCluster, nil) rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { - err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 3bc9604a1e5..ce4565b5502 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -319,6 +319,13 @@ func (o *PersistOptions) SetEnableWitness(enable bool) { o.SetScheduleConfig(v) } +// SetMaxStoreDownTime to set the max store down time. It's only used to test. +func (o *PersistOptions) SetMaxStoreDownTime(time time.Duration) { + v := o.GetScheduleConfig().Clone() + v.MaxStoreDownTime = typeutil.NewDuration(time) + o.SetScheduleConfig(v) +} + // SetMaxMergeRegionSize sets the max merge region size. func (o *PersistOptions) SetMaxMergeRegionSize(maxMergeRegionSize uint64) { v := o.GetScheduleConfig().Clone() diff --git a/server/server.go b/server/server.go index ab675280981..cdea963b23f 100644 --- a/server/server.go +++ b/server/server.go @@ -969,7 +969,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { } if cfg.EnablePlacementRules { // initialize rule manager. - if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil { return err } } else { @@ -992,19 +992,27 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { - // replication config won't work when placement rule is enabled and exceeds one default rule + // replication config won't work when placement rule is enabled and exceeds one default rule if !(defaultRule != nil && len(defaultRule.StartKey) == 0 && len(defaultRule.EndKey) == 0) { - return errors.New("cannot update MaxReplicas or LocationLabels when placement rules feature is enabled and not only default rule exists, please update rule instead") + return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead") } +<<<<<<< HEAD if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { +======= + if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) && defaultRule.IsolationLevel == old.IsolationLevel) { +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead") } return nil } +<<<<<<< HEAD if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels)) { +======= + if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels) && cfg.IsolationLevel == old.IsolationLevel) { +>>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) if err := CheckInDefaultRule(); err != nil { return err } @@ -1015,6 +1023,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels + rule.IsolationLevel = cfg.IsolationLevel rc := s.GetRaftCluster() if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 9634737e18b..5d5d1c15f54 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -684,7 +684,7 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.Equal(expect, replicationCfg.MaxReplicas) } - checkLocaltionLabels := func(expect int) { + checkLocationLabels := func(expect int) { args := []string{"-u", pdAddr, "config", "show", "replication"} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) @@ -693,6 +693,15 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.Len(replicationCfg.LocationLabels, expect) } + checkIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "show", "replication"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + replicationCfg := sc.ReplicationConfig{} + re.NoError(json.Unmarshal(output, &replicationCfg)) + re.Equal(replicationCfg.IsolationLevel, expect) + } + checkRuleCount := func(expect int) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} output, err := pdctl.ExecuteCommand(cmd, args...) @@ -711,6 +720,15 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.Len(rule.LocationLabels, expect) } + checkRuleIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + rule := placement.Rule{} + re.NoError(json.Unmarshal(output, &rule)) + re.Equal(rule.IsolationLevel, expect) + } + // update successfully when placement rules is not enabled. output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "max-replicas", "2") re.NoError(err) @@ -719,8 +737,13 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "zone,host") re.NoError(err) re.Contains(string(output), "Success!") - checkLocaltionLabels(2) + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "zone") + re.NoError(err) + re.Contains(string(output), "Success!") + checkLocationLabels(2) checkRuleLocationLabels(2) + checkIsolationLevel("zone") + checkRuleIsolationLevel("zone") // update successfully when only one default rule exists. output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -733,11 +756,18 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { checkMaxReplicas(3) checkRuleCount(3) + // We need to change isolation first because we will validate + // if the location label contains the isolation level when setting location labels. + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "host") + re.NoError(err) + re.Contains(string(output), "Success!") output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "host") re.NoError(err) re.Contains(string(output), "Success!") - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") // update unsuccessfully when many rule exists. fname := t.TempDir() @@ -761,8 +791,10 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) checkMaxReplicas(4) checkRuleCount(4) - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") } func TestPDServerConfig(t *testing.T) { From d479546646ad7df8724edca39f5069d08b265634 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 22 Sep 2023 17:07:01 +0800 Subject: [PATCH 2/2] resolve conflicts Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 466 -------------------- pkg/mock/mockcluster/mockcluster.go | 5 - pkg/schedule/placement/rule_manager_test.go | 5 - server/server.go | 12 +- tests/pdctl/config/config_test.go | 2 +- 5 files changed, 3 insertions(+), 487 deletions(-) delete mode 100644 pkg/mcs/scheduling/server/cluster.go diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go deleted file mode 100644 index b2986f722df..00000000000 --- a/pkg/mcs/scheduling/server/cluster.go +++ /dev/null @@ -1,466 +0,0 @@ -package server - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/kvproto/pkg/schedulingpb" - "github.com/pingcap/log" - "github.com/tikv/pd/pkg/cluster" - "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/mcs/scheduling/server/config" - "github.com/tikv/pd/pkg/schedule" - sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/schedule/hbstream" - "github.com/tikv/pd/pkg/schedule/labeler" - "github.com/tikv/pd/pkg/schedule/operator" - "github.com/tikv/pd/pkg/schedule/placement" - "github.com/tikv/pd/pkg/schedule/schedulers" - "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/pkg/statistics/buckets" - "github.com/tikv/pd/pkg/statistics/utils" - "github.com/tikv/pd/pkg/storage" - "github.com/tikv/pd/pkg/utils/logutil" - "go.uber.org/zap" -) - -// Cluster is used to manage all information for scheduling purpose. -type Cluster struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - *core.BasicCluster - persistConfig *config.PersistConfig - ruleManager *placement.RuleManager - labelerManager *labeler.RegionLabeler - regionStats *statistics.RegionStatistics - labelStats *statistics.LabelStatistics - hotStat *statistics.HotStat - storage storage.Storage - coordinator *schedule.Coordinator - checkMembershipCh chan struct{} - apiServerLeader atomic.Value - clusterID uint64 -} - -const regionLabelGCInterval = time.Hour - -// NewCluster creates a new cluster. -func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { - ctx, cancel := context.WithCancel(parentCtx) - labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) - if err != nil { - cancel() - return nil, err - } - ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) - c := &Cluster{ - ctx: ctx, - cancel: cancel, - BasicCluster: basicCluster, - ruleManager: ruleManager, - labelerManager: labelerManager, - persistConfig: persistConfig, - hotStat: statistics.NewHotStat(ctx), - labelStats: statistics.NewLabelStatistics(), - regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), - storage: storage, - clusterID: clusterID, - checkMembershipCh: checkMembershipCh, - } - c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) - err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) - if err != nil { - cancel() - return nil, err - } - return c, nil -} - -// GetCoordinator returns the coordinator -func (c *Cluster) GetCoordinator() *schedule.Coordinator { - return c.coordinator -} - -// GetHotStat gets hot stat. -func (c *Cluster) GetHotStat() *statistics.HotStat { - return c.hotStat -} - -// GetRegionStats gets region statistics. -func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { - return c.regionStats -} - -// GetLabelStats gets label statistics. -func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { - return c.labelStats -} - -// GetBasicCluster returns the basic cluster. -func (c *Cluster) GetBasicCluster() *core.BasicCluster { - return c.BasicCluster -} - -// GetSharedConfig returns the shared config. -func (c *Cluster) GetSharedConfig() sc.SharedConfigProvider { - return c.persistConfig -} - -// GetRuleManager returns the rule manager. -func (c *Cluster) GetRuleManager() *placement.RuleManager { - return c.ruleManager -} - -// GetRegionLabeler returns the region labeler. -func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { - return c.labelerManager -} - -// GetStoresLoads returns load stats of all stores. -func (c *Cluster) GetStoresLoads() map[uint64][]float64 { - return c.hotStat.GetStoresLoads() -} - -// IsRegionHot checks if a region is in hot state. -func (c *Cluster) IsRegionHot(region *core.RegionInfo) bool { - return c.hotStat.IsRegionHot(region, c.persistConfig.GetHotRegionCacheHitsThreshold()) -} - -// GetHotPeerStat returns hot peer stat with specified regionID and storeID. -func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *statistics.HotPeerStat { - return c.hotStat.GetHotPeerStat(rw, regionID, storeID) -} - -// RegionReadStats returns hot region's read stats. -// The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return c.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) -} - -// BucketsStats returns hot region's buckets stats. -func (c *Cluster) BucketsStats(degree int, regionIDs ...uint64) map[uint64][]*buckets.BucketStat { - return c.hotStat.BucketsStats(degree, regionIDs...) -} - -// GetStorage returns the storage. -func (c *Cluster) GetStorage() storage.Storage { - return c.storage -} - -// GetCheckerConfig returns the checker config. -func (c *Cluster) GetCheckerConfig() sc.CheckerConfigProvider { return c.persistConfig } - -// GetSchedulerConfig returns the scheduler config. -func (c *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider { return c.persistConfig } - -// GetStoreConfig returns the store config. -func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConfig } - -// AllocID allocates a new ID. -func (c *Cluster) AllocID() (uint64, error) { - client, err := c.getAPIServerLeaderClient() - if err != nil { - return 0, err - } - resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) - if err != nil { - c.checkMembershipCh <- struct{}{} - return 0, err - } - return resp.GetId(), nil -} - -func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { - cli := c.apiServerLeader.Load() - if cli == nil { - c.checkMembershipCh <- struct{}{} - return nil, errors.New("API server leader is not found") - } - return cli.(pdpb.PDClient), nil -} - -// SwitchAPIServerLeader switches the API server leader. -func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { - old := c.apiServerLeader.Load() - return c.apiServerLeader.CompareAndSwap(old, new) -} - -// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. -func (c *Cluster) updateScheduler() { - defer logutil.LogPanic() - defer c.wg.Done() - - // Make sure the coordinator has initialized all the existing schedulers. - c.waitSchedulersInitialized() - // Establish a notifier to listen the schedulers updating. - notifier := make(chan struct{}, 1) - // Make sure the check will be triggered once later. - notifier <- struct{}{} - c.persistConfig.SetSchedulersUpdatingNotifier(notifier) - for { - select { - case <-c.ctx.Done(): - log.Info("cluster is closing, stop listening the schedulers updating notifier") - return - case <-notifier: - // This is triggered by the watcher when the schedulers are updated. - } - - log.Info("schedulers updating notifier is triggered, try to update the scheduler") - var ( - schedulersController = c.coordinator.GetSchedulersController() - latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers - ) - // Create the newly added schedulers. - for _, scheduler := range latestSchedulersConfig { - s, err := schedulers.CreateScheduler( - scheduler.Type, - c.coordinator.GetOperatorController(), - c.storage, - schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args), - schedulersController.RemoveScheduler, - ) - if err != nil { - log.Error("failed to create scheduler", - zap.String("scheduler-type", scheduler.Type), - zap.Strings("scheduler-args", scheduler.Args), - errs.ZapError(err)) - continue - } - name := s.GetName() - if existed, _ := schedulersController.IsSchedulerExisted(name); existed { - log.Info("scheduler has already existed, skip adding it", - zap.String("scheduler-name", name), - zap.Strings("scheduler-args", scheduler.Args)) - continue - } - if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { - log.Error("failed to add scheduler", - zap.String("scheduler-name", name), - zap.Strings("scheduler-args", scheduler.Args), - errs.ZapError(err)) - continue - } - log.Info("add scheduler successfully", - zap.String("scheduler-name", name), - zap.Strings("scheduler-args", scheduler.Args)) - } - // Remove the deleted schedulers. - for _, name := range schedulersController.GetSchedulerNames() { - scheduler := schedulersController.GetScheduler(name) - if slice.AnyOf(latestSchedulersConfig, func(i int) bool { - return latestSchedulersConfig[i].Type == scheduler.GetType() - }) { - continue - } - if err := schedulersController.RemoveScheduler(name); err != nil { - log.Error("failed to remove scheduler", - zap.String("scheduler-name", name), - errs.ZapError(err)) - continue - } - log.Info("remove scheduler successfully", - zap.String("scheduler-name", name)) - } - } -} - -func (c *Cluster) waitSchedulersInitialized() { - ticker := time.NewTicker(time.Millisecond * 100) - defer ticker.Stop() - for { - if c.coordinator.AreSchedulersInitialized() { - return - } - select { - case <-c.ctx.Done(): - log.Info("cluster is closing, stop waiting the schedulers initialization") - return - case <-ticker.C: - } - } -} - -// TODO: implement the following methods - -// UpdateRegionsLabelLevelStats updates the status of the region label level by types. -func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { - for _, region := range regions { - c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) - } -} - -func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { - stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) - for _, p := range region.GetPeers() { - if store := c.GetStore(p.GetStoreId()); store != nil && !core.IsStoreContainLabel(store.GetMeta(), key, value) { - stores = append(stores, store) - } - } - return stores -} - -// HandleStoreHeartbeat updates the store status. -func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatRequest) error { - stats := heartbeat.GetStats() - storeID := stats.GetStoreId() - store := c.GetStore(storeID) - if store == nil { - return errors.Errorf("store %v not found", storeID) - } - - nowTime := time.Now() - newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) - - if store := c.GetStore(storeID); store != nil { - statistics.UpdateStoreHeartbeatMetrics(store) - } - c.PutStore(newStore) - c.hotStat.Observe(storeID, newStore.GetStoreStats()) - c.hotStat.FilterUnhealthyStore(c) - reportInterval := stats.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - - regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) - for _, peerStat := range stats.GetPeerStats() { - regionID := peerStat.GetRegionId() - region := c.GetRegion(regionID) - regions[regionID] = region - if region == nil { - log.Warn("discard hot peer stat for unknown region", - zap.Uint64("region-id", regionID), - zap.Uint64("store-id", storeID)) - continue - } - peer := region.GetStorePeer(storeID) - if peer == nil { - log.Warn("discard hot peer stat for unknown region peer", - zap.Uint64("region-id", regionID), - zap.Uint64("store-id", storeID)) - continue - } - readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) - loads := []float64{ - utils.RegionReadBytes: float64(peerStat.GetReadBytes()), - utils.RegionReadKeys: float64(peerStat.GetReadKeys()), - utils.RegionReadQueryNum: float64(readQueryNum), - utils.RegionWriteBytes: 0, - utils.RegionWriteKeys: 0, - utils.RegionWriteQueryNum: 0, - } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } - - // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) - return nil -} - -// runUpdateStoreStats updates store stats periodically. -func (c *Cluster) runUpdateStoreStats() { - defer logutil.LogPanic() - defer c.wg.Done() - - ticker := time.NewTicker(9 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-c.ctx.Done(): - log.Info("update store stats background jobs has been stopped") - return - case <-ticker.C: - c.UpdateAllStoreStatus() - } - } -} - -// StartBackgroundJobs starts background jobs. -func (c *Cluster) StartBackgroundJobs() { - c.wg.Add(2) - go c.updateScheduler() - go c.runUpdateStoreStats() -} - -// StopBackgroundJobs stops background jobs. -func (c *Cluster) StopBackgroundJobs() { - c.cancel() - c.wg.Wait() -} - -// HandleRegionHeartbeat processes RegionInfo reports from client. -func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { - if err := c.processRegionHeartbeat(region); err != nil { - return err - } - - c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) - return nil -} - -// processRegionHeartbeat updates the region information. -func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { - origin, _, err := c.PreCheckPutRegion(region) - if err != nil { - return err - } - if c.GetStoreConfig().IsEnableRegionBucket() { - region.InheritBuckets(origin) - } - - cluster.HandleStatsAsync(c, region) - - hasRegionStats := c.regionStats != nil - // Save to storage if meta is updated, except for flashback. - // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - changed := core.GenerateRegionGuideFunc(true)(region, origin) - if !changed.SaveCache && !changed.IsNew { - // Due to some config changes need to update the region stats as well, - // so we do some extra checks here. - if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - c.regionStats.Observe(region, c.GetRegionStores(region)) - } - return nil - } - - var overlaps []*core.RegionInfo - if changed.SaveCache { - // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, - // check its validation again here. - // - // However it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { - return err - } - - cluster.HandleOverlaps(c, overlaps) - } - - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared()) - return nil -} - -// IsPrepared return true if the prepare checker is ready. -func (c *Cluster) IsPrepared() bool { - return c.coordinator.GetPrepareChecker().IsPrepared() -} diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index f9fed9ed56d..3a6d1805af8 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -188,13 +188,8 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { -<<<<<<< HEAD mc.RuleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), mc, mc.GetOpts()) - mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels) -======= - mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig()) mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) ->>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) } } diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 1d2500edd79..a9c413ffb73 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -33,13 +33,8 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) var err error manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) -<<<<<<< HEAD manager.conf.SetWitnessEnabled(enableWitness) - err = manager.Initialize(3, []string{"zone", "rack", "host"}) -======= - manager.conf.SetEnableWitness(enableWitness) err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") ->>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) re.NoError(err) return store, manager } diff --git a/server/server.go b/server/server.go index cdea963b23f..058aaa6f816 100644 --- a/server/server.go +++ b/server/server.go @@ -997,22 +997,14 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { len(defaultRule.StartKey) == 0 && len(defaultRule.EndKey) == 0) { return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead") } -<<<<<<< HEAD - if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { -======= - if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) && defaultRule.IsolationLevel == old.IsolationLevel) { ->>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) + if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) && defaultRule.IsolationLevel == old.IsolationLevel) { return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead") } return nil } -<<<<<<< HEAD - if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels)) { -======= - if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels) && cfg.IsolationLevel == old.IsolationLevel) { ->>>>>>> 5b3d0172b (*: fix sync isolation level to default placement rule (#7122)) + if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels) && cfg.IsolationLevel == old.IsolationLevel) { if err := CheckInDefaultRule(); err != nil { return err } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 5d5d1c15f54..e082a1b051f 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -697,7 +697,7 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { args := []string{"-u", pdAddr, "config", "show", "replication"} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - replicationCfg := sc.ReplicationConfig{} + replicationCfg := config.ReplicationConfig{} re.NoError(json.Unmarshal(output, &replicationCfg)) re.Equal(replicationCfg.IsolationLevel, expect) }