diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 2b2a454d9b5..2bc570364b8 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/log" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" "github.com/tikv/pd/pkg/mcs/utils" - "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/operator" @@ -68,15 +67,11 @@ type Service struct { } type server struct { - server *scheserver.Server -} - -func (s *server) GetCoordinator() *schedule.Coordinator { - return s.server.GetCoordinator() + *scheserver.Server } func (s *server) GetCluster() sche.SharedCluster { - return s.server.GetCluster() + return s.Server.GetCluster() } func createIndentRender() *render.Render { @@ -98,7 +93,7 @@ func NewService(srv *scheserver.Service) *Service { apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) apiHandlerEngine.Use(func(c *gin.Context) { c.Set(multiservicesapi.ServiceContextKey, srv.Server) - c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server})) + c.Set(handlerKey, handler.NewHandler(&server{srv.Server})) c.Next() }) apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index e1d680069ce..13634461682 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -414,6 +414,16 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) } +// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information. +func (o *PersistConfig) GetHotRegionsWriteInterval() time.Duration { + return o.GetScheduleConfig().HotRegionsWriteInterval.Duration +} + +// GetHotRegionsReservedDays gets days hot region information is kept. +func (o *PersistConfig) GetHotRegionsReservedDays() uint64 { + return o.GetScheduleConfig().HotRegionsReservedDays +} + // GetMaxMovableHotPeerSize returns the max movable hot peer size. func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 { return o.GetScheduleConfig().MaxMovableHotPeerSize diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 32788284b70..aafa47f8d48 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -20,6 +20,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "strconv" "sync" "sync/atomic" @@ -37,6 +38,7 @@ import ( "github.com/spf13/cobra" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" @@ -46,8 +48,12 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" @@ -99,9 +105,11 @@ type Server struct { serviceID *discovery.ServiceRegistryEntry serviceRegister *discovery.ServiceRegister - cluster *Cluster - hbStreams *hbstream.HeartbeatStreams - storage *endpoint.StorageEndpoint + cluster *Cluster + hbStreams *hbstream.HeartbeatStreams + storage *endpoint.StorageEndpoint + hotRegionStorage *storage.HotRegionStorage + encryptionKeyManager *encryption.Manager // for watching the PD API server meta info updates that are related to the scheduling. configWatcher *config.Watcher @@ -155,9 +163,10 @@ func (s *Server) Run() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) - s.serverLoopWg.Add(2) + s.serverLoopWg.Add(3) go s.primaryElectionLoop() go s.updateAPIServerMemberLoop() + go s.encryptionKeyManagerLoop() } func (s *Server) updateAPIServerMemberLoop() { @@ -239,6 +248,17 @@ func (s *Server) primaryElectionLoop() { } } +// encryptionKeyManagerLoop is used to start monitor encryption key changes. +func (s *Server) encryptionKeyManagerLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + s.encryptionKeyManager.StartBackgroundLoop(ctx) + log.Info("server is closed, exist encryption key manager loop") +} + func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { @@ -324,6 +344,11 @@ func (s *Server) Close() { if s.GetHTTPClient() != nil { s.GetHTTPClient().CloseIdleConnections() } + + if err := s.hotRegionStorage.Close(); err != nil { + log.Error("close hot region storage meet error", errs.ZapError(err)) + } + log.Info("scheduling server is closed") } @@ -364,7 +389,21 @@ func (s *Server) GetBasicCluster() *core.BasicCluster { // GetCoordinator returns the coordinator. func (s *Server) GetCoordinator() *schedule.Coordinator { - return s.GetCluster().GetCoordinator() + c := s.GetCluster() + if c == nil { + return nil + } + return c.GetCoordinator() +} + +// GetEncryptionKeyManager returns the encryption key manager. +func (s *Server) GetEncryptionKeyManager() *encryption.Manager { + return s.encryptionKeyManager +} + +// GetSharedConfig returns the shared config. +func (s *Server) GetSharedConfig() sc.SharedConfigProvider { + return s.persistConfig } // ServerLoopWgDone decreases the server loop wait group. @@ -392,6 +431,14 @@ func (s *Server) GetLeaderListenUrls() []string { return s.participant.GetLeaderListenUrls() } +type svr struct { + *Server +} + +func (s *svr) GetCluster() sche.SharedCluster { + return s.Server.GetCluster() +} + func (s *Server) startServer() (err error) { if s.clusterID, err = utils.InitClusterID(s.Context(), s.GetClient()); err != nil { return err @@ -419,6 +466,18 @@ func (s *Server) startServer() (err error) { return err } + s.encryptionKeyManager, err = encryption.NewManager(s.GetClient(), &s.cfg.Security.Encryption) + if err != nil { + return err + } + + h := handler.NewHandler(&svr{s}) + s.hotRegionStorage, err = storage.NewHotRegionsStorage( + s.Context(), filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, h) + if err != nil { + return err + } + serverReadyChan := make(chan struct{}) defer close(serverReadyChan) s.startServerLoop() @@ -489,12 +548,6 @@ func (s *Server) stopWatcher() { s.metaWatcher.Close() } -// GetPersistConfig returns the persist config. -// It's used to test. -func (s *Server) GetPersistConfig() *config.PersistConfig { - return s.persistConfig -} - // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *config.Config) *Server { svr := &Server{ diff --git a/pkg/mcs/scheduling/server/testutil.go b/pkg/mcs/scheduling/server/testutil.go index 74baac44808..5736f1dc40a 100644 --- a/pkg/mcs/scheduling/server/testutil.go +++ b/pkg/mcs/scheduling/server/testutil.go @@ -73,6 +73,6 @@ func GenerateConfig(c *config.Config) (*config.Config, error) { if err != nil { return nil, err } - + cfg.DataDir = c.DataDir return cfg, nil } diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index 00f11a5950f..8b8bd485db8 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -115,8 +115,11 @@ type SharedConfigProvider interface { IsWitnessAllowed() bool IsPlacementRulesCacheEnabled() bool SetHaltScheduling(bool, string) + GetHotRegionsWriteInterval() time.Duration + GetHotRegionsReservedDays() uint64 // for test purpose + SetPlacementRuleEnabled(bool) SetPlacementRulesCacheEnabled(bool) SetEnableWitness(bool) } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index bcbef0a1a08..03105bf966c 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -615,6 +615,25 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot return infos } +// GetHotRegions gets hot regions' statistics by RWType and storeIDs. +// If storeIDs is empty, it returns all hot regions' statistics by RWType. +func (c *Coordinator) GetHotRegions(typ utils.RWType, storeIDs ...uint64) *statistics.StoreHotPeersInfos { + hotRegions := c.GetHotRegionsByType(typ) + if len(storeIDs) > 0 && hotRegions != nil { + asLeader := statistics.StoreHotPeersStat{} + asPeer := statistics.StoreHotPeersStat{} + for _, storeID := range storeIDs { + asLeader[storeID] = hotRegions.AsLeader[storeID] + asPeer[storeID] = hotRegions.AsPeer[storeID] + } + return &statistics.StoreHotPeersInfos{ + AsLeader: asLeader, + AsPeer: asPeer, + } + } + return hotRegions +} + // GetWaitGroup returns the wait group. Only for test purpose. func (c *Coordinator) GetWaitGroup() *sync.WaitGroup { return &c.wg diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index d45fd685fa2..6c6d25d27d4 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -26,14 +26,19 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" + sc "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -41,8 +46,11 @@ import ( // Server is the interface for handler about schedule. // TODO: remove it after GetCluster is unified between PD server and Scheduling server. type Server interface { + IsServing() bool GetCoordinator() *schedule.Coordinator GetCluster() sche.SharedCluster + GetEncryptionKeyManager() *encryption.Manager + GetSharedConfig() sc.SharedConfigProvider } // Handler is a handler to handle http request about schedule. @@ -381,6 +389,9 @@ func (h *Handler) HandleOperatorCreation(input map[string]interface{}) (int, int // AddTransferLeaderOperator adds an operator to transfer leader to the store. func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -402,6 +413,9 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err // AddTransferRegionOperator adds an operator to transfer region to the stores. func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -439,6 +453,9 @@ func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64 // AddTransferPeerOperator adds an operator to transfer peer. func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -465,6 +482,9 @@ func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreI // checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (sche.SharedCluster, *core.RegionInfo, error) { c := h.GetCluster() + if c == nil { + return nil, nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return nil, nil, errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -520,6 +540,9 @@ func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error // AddRemovePeerOperator adds an operator to remove peer. func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -540,6 +563,9 @@ func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) err // AddMergeRegionOperator adds an operator to merge region. func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -575,6 +601,9 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error // AddSplitRegionOperator adds an operator to split a region. func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -607,6 +636,9 @@ func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys // AddScatterRegionOperator adds an operator to scatter a region. func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -871,3 +903,75 @@ func (h *Handler) PauseOrResumeChecker(name string, t int64) (err error) { } return err } + +// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.. +func (h *Handler) GetHotRegionsWriteInterval() time.Duration { + return h.GetSharedConfig().GetHotRegionsWriteInterval() +} + +// GetHotRegionsReservedDays gets days hot region information is kept. +func (h *Handler) GetHotRegionsReservedDays() uint64 { + return h.GetSharedConfig().GetHotRegionsReservedDays() +} + +// GetHotRegions gets hot regions' statistics by RWType and storeIDs. +// If storeIDs is empty, it returns all hot regions' statistics by RWType. +func (h *Handler) GetHotRegions(typ utils.RWType, storeIDs ...uint64) (*statistics.StoreHotPeersInfos, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetHotRegions(typ, storeIDs...), nil +} + +// GetHistoryHotRegions get hot region info in HistoryHotRegion form. +func (h *Handler) GetHistoryHotRegions(typ utils.RWType) ([]storage.HistoryHotRegion, error) { + hotRegions, err := h.GetHotRegions(typ) + if hotRegions == nil || err != nil { + return nil, err + } + hotPeers := hotRegions.AsPeer + return h.packHotRegions(hotPeers, typ.String()) +} + +func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotRegionType string) (historyHotRegions []storage.HistoryHotRegion, err error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + for _, hotPeersStat := range hotPeersStat { + stats := hotPeersStat.Stats + for _, hotPeerStat := range stats { + region := c.GetRegion(hotPeerStat.RegionID) + if region == nil { + continue + } + meta := region.GetMeta() + meta, err := encryption.EncryptRegion(meta, h.GetEncryptionKeyManager()) + if err != nil { + return nil, err + } + stat := storage.HistoryHotRegion{ + // store in ms. + // TODO: distinguish store heartbeat interval and region heartbeat interval + // read statistic from store heartbeat, write statistic from region heartbeat + UpdateTime: int64(region.GetInterval().GetEndTimestamp() * 1000), + RegionID: hotPeerStat.RegionID, + StoreID: hotPeerStat.StoreID, + PeerID: region.GetStorePeer(hotPeerStat.StoreID).GetId(), + IsLeader: hotPeerStat.IsLeader, + IsLearner: core.IsLearner(region.GetPeer(hotPeerStat.StoreID)), + HotDegree: int64(hotPeerStat.HotDegree), + FlowBytes: hotPeerStat.ByteRate, + KeyRate: hotPeerStat.KeyRate, + QueryRate: hotPeerStat.QueryRate, + StartKey: string(region.GetStartKey()), + EndKey: string(region.GetEndKey()), + EncryptionMeta: meta.GetEncryptionMeta(), + HotRegionType: hotRegionType, + } + historyHotRegions = append(historyHotRegions, stat) + } + } + return +} diff --git a/pkg/storage/hot_region_storage.go b/pkg/storage/hot_region_storage.go index c33fc89221f..a8452e3aa3f 100644 --- a/pkg/storage/hot_region_storage.go +++ b/pkg/storage/hot_region_storage.go @@ -46,12 +46,12 @@ import ( // Close() must be called after the use. type HotRegionStorage struct { *kv.LevelDBKV - ekm *encryption.Manager - hotRegionLoopWg sync.WaitGroup - batchHotInfo map[string]*HistoryHotRegion - hotRegionInfoCtx context.Context - hotRegionInfoCancel context.CancelFunc - hotRegionStorageHandler HotRegionStorageHandler + ekm *encryption.Manager + hotRegionLoopWg sync.WaitGroup + batchHotInfo map[string]*HistoryHotRegion + hotRegionInfoCtx context.Context + hotRegionInfoCancel context.CancelFunc + hotRegionStorageHelper HotRegionStorageHelper curReservedDays uint64 curInterval time.Duration @@ -89,14 +89,12 @@ type HistoryHotRegion struct { EncryptionMeta *encryptionpb.EncryptionMeta `json:"encryption_meta,omitempty"` } -// HotRegionStorageHandler help hot region storage get hot region info. -type HotRegionStorageHandler interface { - // PackHistoryHotReadRegions get read hot region info in HistoryHotRegion form. - PackHistoryHotReadRegions() ([]HistoryHotRegion, error) - // PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion form. - PackHistoryHotWriteRegions() ([]HistoryHotRegion, error) - // IsLeader return true means this server is leader. - IsLeader() bool +// HotRegionStorageHelper help hot region storage get hot region info. +type HotRegionStorageHelper interface { + // GetHistoryHotRegions get hot region info in HistoryHotRegion form. + GetHistoryHotRegions(typ utils.RWType) ([]HistoryHotRegion, error) + // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. + IsServing() bool // GetHotRegionsWriteInterval gets interval for PD to store Hot Region information. GetHotRegionsWriteInterval() time.Duration // GetHotRegionsReservedDays gets days hot region information is kept. @@ -119,7 +117,7 @@ func NewHotRegionsStorage( ctx context.Context, filePath string, ekm *encryption.Manager, - hotRegionStorageHandler HotRegionStorageHandler, + hotRegionStorageHelper HotRegionStorageHelper, ) (*HotRegionStorage, error) { levelDB, err := kv.NewLevelDBKV(filePath) if err != nil { @@ -127,14 +125,14 @@ func NewHotRegionsStorage( } hotRegionInfoCtx, hotRegionInfoCancel := context.WithCancel(ctx) h := HotRegionStorage{ - LevelDBKV: levelDB, - ekm: ekm, - batchHotInfo: make(map[string]*HistoryHotRegion), - hotRegionInfoCtx: hotRegionInfoCtx, - hotRegionInfoCancel: hotRegionInfoCancel, - hotRegionStorageHandler: hotRegionStorageHandler, - curReservedDays: hotRegionStorageHandler.GetHotRegionsReservedDays(), - curInterval: hotRegionStorageHandler.GetHotRegionsWriteInterval(), + LevelDBKV: levelDB, + ekm: ekm, + batchHotInfo: make(map[string]*HistoryHotRegion), + hotRegionInfoCtx: hotRegionInfoCtx, + hotRegionInfoCancel: hotRegionInfoCancel, + hotRegionStorageHelper: hotRegionStorageHelper, + curReservedDays: hotRegionStorageHelper.GetHotRegionsReservedDays(), + curInterval: hotRegionStorageHelper.GetHotRegionsWriteInterval(), } h.hotRegionLoopWg.Add(2) go h.backgroundFlush() @@ -199,7 +197,7 @@ func (h *HotRegionStorage) backgroundFlush() { if h.getCurReservedDays() == 0 { continue } - if h.hotRegionStorageHandler.IsLeader() { + if h.hotRegionStorageHelper.IsServing() { if err := h.pullHotRegionInfo(); err != nil { log.Error("get hot_region stat meet error", errs.ZapError(err)) } @@ -240,19 +238,18 @@ func (h *HotRegionStorage) Close() error { } func (h *HotRegionStorage) pullHotRegionInfo() error { - historyHotReadRegions, err := h.hotRegionStorageHandler.PackHistoryHotReadRegions() + historyHotReadRegions, err := h.hotRegionStorageHelper.GetHistoryHotRegions(utils.Read) if err != nil { return err } if err := h.packHistoryHotRegions(historyHotReadRegions, utils.Read.String()); err != nil { return err } - historyHotWriteRegions, err := h.hotRegionStorageHandler.PackHistoryHotWriteRegions() + historyHotWriteRegions, err := h.hotRegionStorageHelper.GetHistoryHotRegions(utils.Write) if err != nil { return err } - err = h.packHistoryHotRegions(historyHotWriteRegions, utils.Write.String()) - return err + return h.packHistoryHotRegions(historyHotWriteRegions, utils.Write.String()) } func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotRegion, hotRegionType string) error { @@ -278,7 +275,7 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR func (h *HotRegionStorage) updateInterval() { h.mu.Lock() defer h.mu.Unlock() - interval := h.hotRegionStorageHandler.GetHotRegionsWriteInterval() + interval := h.hotRegionStorageHelper.GetHotRegionsWriteInterval() if interval != h.curInterval { log.Info("hot region write interval changed", zap.Duration("previous-interval", h.curInterval), @@ -296,7 +293,7 @@ func (h *HotRegionStorage) getCurInterval() time.Duration { func (h *HotRegionStorage) updateReservedDays() { h.mu.Lock() defer h.mu.Unlock() - reservedDays := h.hotRegionStorageHandler.GetHotRegionsReservedDays() + reservedDays := h.hotRegionStorageHelper.GetHotRegionsReservedDays() if reservedDays != h.curReservedDays { log.Info("hot region reserved days changed", zap.Uint64("previous-reserved-days", h.curReservedDays), diff --git a/pkg/storage/hot_region_storage_test.go b/pkg/storage/hot_region_storage_test.go index 42c505a1341..ea803db3dda 100644 --- a/pkg/storage/hot_region_storage_test.go +++ b/pkg/storage/hot_region_storage_test.go @@ -37,22 +37,22 @@ type MockPackHotRegionInfo struct { pullInterval time.Duration } -// PackHistoryHotWriteRegions get read hot region info in HistoryHotRegion from. -func (m *MockPackHotRegionInfo) PackHistoryHotReadRegions() ([]HistoryHotRegion, error) { - result := make([]HistoryHotRegion, len(m.historyHotReads)) - copy(result, m.historyHotReads) - return result, nil -} - -// PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion form. -func (m *MockPackHotRegionInfo) PackHistoryHotWriteRegions() ([]HistoryHotRegion, error) { - result := make([]HistoryHotRegion, len(m.historyHotWrites)) - copy(result, m.historyHotWrites) - return result, nil +// GetHistoryHotRegions get hot region info in HistoryHotRegion form. +func (m *MockPackHotRegionInfo) GetHistoryHotRegions(typ utils.RWType) ([]HistoryHotRegion, error) { + switch typ { + case utils.Write: + result := make([]HistoryHotRegion, len(m.historyHotWrites)) + copy(result, m.historyHotWrites) + return result, nil + default: // case utils.Read: + result := make([]HistoryHotRegion, len(m.historyHotReads)) + copy(result, m.historyHotReads) + return result, nil + } } -// IsLeader return isLeader. -func (m *MockPackHotRegionInfo) IsLeader() bool { +// IsServing return isLeader. +func (m *MockPackHotRegionInfo) IsServing() bool { return m.isLeader } diff --git a/server/handler.go b/server/handler.go index 802c9ee14e5..d6c7397c72a 100644 --- a/server/handler.go +++ b/server/handler.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" - "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" @@ -37,7 +36,6 @@ import ( "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" - "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" @@ -55,7 +53,11 @@ type server struct { } func (s *server) GetCoordinator() *schedule.Coordinator { - return s.GetRaftCluster().GetCoordinator() + c := s.GetRaftCluster() + if c == nil { + return nil + } + return c.GetCoordinator() } func (s *server) GetCluster() sche.SharedCluster { @@ -154,16 +156,6 @@ func (h *Handler) GetHotReadRegions() *statistics.StoreHotPeersInfos { return c.GetHotReadRegions() } -// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.. -func (h *Handler) GetHotRegionsWriteInterval() time.Duration { - return h.opt.GetHotRegionsWriteInterval() -} - -// GetHotRegionsReservedDays gets days hot region information is kept. -func (h *Handler) GetHotRegionsReservedDays() uint64 { - return h.opt.GetHotRegionsReservedDays() -} - // GetStoresLoads gets all hot write stores stats. func (h *Handler) GetStoresLoads() map[uint64][]float64 { rc := h.s.GetRaftCluster() @@ -488,73 +480,6 @@ func (h *Handler) SetStoreLimitTTL(data string, value float64, ttl time.Duration }, ttl) } -// IsLeader return true if this server is leader -func (h *Handler) IsLeader() bool { - return h.s.member.IsLeader() -} - -// PackHistoryHotReadRegions get read hot region info in HistoryHotRegion form. -func (h *Handler) PackHistoryHotReadRegions() ([]storage.HistoryHotRegion, error) { - hotReadRegions := h.GetHotReadRegions() - if hotReadRegions == nil { - return nil, nil - } - hotReadPeerRegions := hotReadRegions.AsPeer - return h.packHotRegions(hotReadPeerRegions, utils.Read.String()) -} - -// PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion from -func (h *Handler) PackHistoryHotWriteRegions() ([]storage.HistoryHotRegion, error) { - hotWriteRegions := h.GetHotWriteRegions() - if hotWriteRegions == nil { - return nil, nil - } - hotWritePeerRegions := hotWriteRegions.AsPeer - return h.packHotRegions(hotWritePeerRegions, utils.Write.String()) -} - -func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotRegionType string) (historyHotRegions []storage.HistoryHotRegion, err error) { - c, err := h.GetRaftCluster() - if err != nil { - return nil, err - } - for _, hotPeersStat := range hotPeersStat { - stats := hotPeersStat.Stats - for _, hotPeerStat := range stats { - region := c.GetRegion(hotPeerStat.RegionID) - if region == nil { - continue - } - meta := region.GetMeta() - meta, err := encryption.EncryptRegion(meta, h.s.encryptionKeyManager) - if err != nil { - return nil, err - } - stat := storage.HistoryHotRegion{ - // store in ms. - // TODO: distinguish store heartbeat interval and region heartbeat interval - // read statistic from store heartbeat, write statistic from region heartbeat - UpdateTime: int64(region.GetInterval().GetEndTimestamp() * 1000), - RegionID: hotPeerStat.RegionID, - StoreID: hotPeerStat.StoreID, - PeerID: region.GetStorePeer(hotPeerStat.StoreID).GetId(), - IsLeader: hotPeerStat.IsLeader, - IsLearner: core.IsLearner(region.GetPeer(hotPeerStat.StoreID)), - HotDegree: int64(hotPeerStat.HotDegree), - FlowBytes: hotPeerStat.ByteRate, - KeyRate: hotPeerStat.KeyRate, - QueryRate: hotPeerStat.QueryRate, - StartKey: string(region.GetStartKey()), - EndKey: string(region.GetEndKey()), - EncryptionMeta: meta.GetEncryptionMeta(), - HotRegionType: hotRegionType, - } - historyHotRegions = append(historyHotRegions, stat) - } - } - return -} - // GetHistoryHotRegionIter return a iter which iter all qualified item . func (h *Handler) GetHistoryHotRegionIter( hotRegionTypes []string, diff --git a/server/server.go b/server/server.go index c15e0156db0..30038fd0729 100644 --- a/server/server.go +++ b/server/server.go @@ -489,10 +489,12 @@ func (s *Server) startServer(ctx context.Context) error { s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster) // initial hot_region_storage in here. - s.hotRegionStorage, err = storage.NewHotRegionsStorage( - ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) - if err != nil { - return err + if !s.IsAPIServiceMode() { + s.hotRegionStorage, err = storage.NewHotRegionsStorage( + ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) + if err != nil { + return err + } } // Run callbacks log.Info("triggering the start callback functions") @@ -550,9 +552,10 @@ func (s *Server) Close() { if err := s.storage.Close(); err != nil { log.Error("close storage meet error", errs.ZapError(err)) } - - if err := s.hotRegionStorage.Close(); err != nil { - log.Error("close hot region storage meet error", errs.ZapError(err)) + if s.hotRegionStorage != nil { + if err := s.hotRegionStorage.Close(); err != nil { + log.Error("close hot region storage meet error", errs.ZapError(err)) + } } // Run callbacks @@ -2088,6 +2091,16 @@ func (s *Server) GetMaxResetTSGap() time.Duration { return s.persistOptions.GetMaxResetTSGap() } +// GetEncryptionKeyManager returns the encryption key manager. +func (s *Server) GetEncryptionKeyManager() *encryption.Manager { + return s.encryptionKeyManager +} + +// GetSharedConfig returns the shared config. +func (s *Server) GetSharedConfig() sc.SharedConfigProvider { + return s.persistOptions +} + // SetClient sets the etcd client. // Notes: it is only used for test. func (s *Server) SetClient(client *clientv3.Client) { diff --git a/tests/scheduling_cluster.go b/tests/scheduling_cluster.go index 1768c4128cc..8e778d8d2fe 100644 --- a/tests/scheduling_cluster.go +++ b/tests/scheduling_cluster.go @@ -16,6 +16,7 @@ package tests import ( "context" + "os" "time" "github.com/stretchr/testify/require" @@ -59,6 +60,8 @@ func (tc *TestSchedulingCluster) AddServer(addr string) error { cfg.BackendEndpoints = tc.backendEndpoints cfg.ListenAddr = addr cfg.Name = cfg.ListenAddr + tempDir, _ := os.MkdirTemp("/tmp", "pd-tests") + cfg.DataDir = tempDir generatedCfg, err := scheduling.GenerateConfig(cfg) if err != nil { return err diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index a6f11a49889..64ed5114646 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -412,7 +412,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te suite.T().Log(testCase.name) // TODO: remove this after we can sync this config to all servers. if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - sche.GetPersistConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable) + sche.GetCluster().GetSchedulerConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable) } else { svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable) }