Skip to content

Commit

Permalink
mcs: support history hot region in scheduling server
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Oct 9, 2023
1 parent 7dd26f9 commit d1ff6b9
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 146 deletions.
11 changes: 3 additions & 8 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/log"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
Expand Down Expand Up @@ -68,15 +67,11 @@ type Service struct {
}

type server struct {
server *scheserver.Server
}

func (s *server) GetCoordinator() *schedule.Coordinator {
return s.server.GetCoordinator()
*scheserver.Server
}

func (s *server) GetCluster() sche.SharedCluster {
return s.server.GetCluster()
return s.Server.GetCluster()
}

func createIndentRender() *render.Render {
Expand All @@ -98,7 +93,7 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server}))
c.Set(handlerKey, handler.NewHandler(&server{srv.Server}))
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,16 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
}

// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.
func (o *PersistConfig) GetHotRegionsWriteInterval() time.Duration {
return o.GetScheduleConfig().HotRegionsWriteInterval.Duration
}

// GetHotRegionsReservedDays gets days hot region information is kept.
func (o *PersistConfig) GetHotRegionsReservedDays() uint64 {
return o.GetScheduleConfig().HotRegionsReservedDays
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 {
return o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
66 changes: 56 additions & 10 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/spf13/cobra"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
Expand All @@ -46,8 +48,11 @@ import (
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -99,9 +104,11 @@ type Server struct {
serviceID *discovery.ServiceRegistryEntry
serviceRegister *discovery.ServiceRegister

cluster *Cluster
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint
cluster *Cluster
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint
hotRegionStorage *storage.HotRegionStorage
encryptionKeyManager *encryption.Manager

// for watching the PD API server meta info updates that are related to the scheduling.
configWatcher *config.Watcher
Expand Down Expand Up @@ -155,9 +162,10 @@ func (s *Server) Run() error {

func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
s.serverLoopWg.Add(2)
s.serverLoopWg.Add(3)
go s.primaryElectionLoop()
go s.updateAPIServerMemberLoop()
go s.encryptionKeyManagerLoop()
}

func (s *Server) updateAPIServerMemberLoop() {
Expand Down Expand Up @@ -239,6 +247,17 @@ func (s *Server) primaryElectionLoop() {
}
}

// encryptionKeyManagerLoop is used to start monitor encryption key changes.
func (s *Server) encryptionKeyManagerLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.encryptionKeyManager.StartBackgroundLoop(ctx)
log.Info("server is closed, exist encryption key manager loop")
}

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
Expand Down Expand Up @@ -324,6 +343,11 @@ func (s *Server) Close() {
if s.GetHTTPClient() != nil {
s.GetHTTPClient().CloseIdleConnections()
}

if err := s.hotRegionStorage.Close(); err != nil {
log.Error("close hot region storage meet error", errs.ZapError(err))
}

log.Info("scheduling server is closed")
}

Expand Down Expand Up @@ -367,6 +391,11 @@ func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.GetCluster().GetCoordinator()
}

// GetEncryptionKeyManager returns the encryption key manager.
func (s *Server) GetEncryptionKeyManager() *encryption.Manager {
return s.encryptionKeyManager
}

// ServerLoopWgDone decreases the server loop wait group.
func (s *Server) ServerLoopWgDone() {
s.serverLoopWg.Done()
Expand All @@ -392,6 +421,14 @@ func (s *Server) GetLeaderListenUrls() []string {
return s.participant.GetLeaderListenUrls()
}

type svr struct {
*Server
}

func (s *svr) GetCluster() sche.SharedCluster {
return s.Server.GetCluster()
}

func (s *Server) startServer() (err error) {
if s.clusterID, err = utils.InitClusterID(s.Context(), s.GetClient()); err != nil {
return err
Expand Down Expand Up @@ -427,6 +464,21 @@ func (s *Server) startServer() (err error) {
s.checkMembershipCh <- struct{}{}
<-serverReadyChan

s.encryptionKeyManager, err = encryption.NewManager(s.GetClient(), &s.cfg.Security.Encryption)
if err != nil {
return err
}
s.cluster.GetSchedulerConfig().GetHotRegionsWriteInterval()
s.cluster.GetSchedulerConfig().GetHotRegionsReservedDays()
s.participant.IsLeader()

h := handler.NewHandler(&svr{s})
s.hotRegionStorage, err = storage.NewHotRegionsStorage(
s.Context(), filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, h)
if err != nil {
return err
}

// Run callbacks
log.Info("triggering the start callback functions")
for _, cb := range s.GetStartCallbacks() {
Expand Down Expand Up @@ -489,12 +541,6 @@ func (s *Server) stopWatcher() {
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
return s.persistConfig
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down
3 changes: 3 additions & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ type SharedConfigProvider interface {
IsWitnessAllowed() bool
IsPlacementRulesCacheEnabled() bool
SetHaltScheduling(bool, string)
GetHotRegionsWriteInterval() time.Duration
GetHotRegionsReservedDays() uint64

// for test purpose
SetPlacementRuleEnabled(bool)
SetPlacementRulesCacheEnabled(bool)
SetEnableWitness(bool)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,25 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot
return infos
}

// GetHotRegions gets hot regions' statistics by RWType and storeIDs.
// If storeIDs is empty, it returns all hot regions' statistics by RWType.
func (c *Coordinator) GetHotRegions(typ utils.RWType, storeIDs ...uint64) *statistics.StoreHotPeersInfos {
hotRegions := c.GetHotRegionsByType(typ)
if len(storeIDs) > 0 && hotRegions != nil {
asLeader := statistics.StoreHotPeersStat{}
asPeer := statistics.StoreHotPeersStat{}
for _, storeID := range storeIDs {
asLeader[storeID] = hotRegions.AsLeader[storeID]
asPeer[storeID] = hotRegions.AsPeer[storeID]
}
return &statistics.StoreHotPeersInfos{
AsLeader: asLeader,
AsPeer: asPeer,
}
}
return hotRegions
}

// GetWaitGroup returns the wait group. Only for test purpose.
func (c *Coordinator) GetWaitGroup() *sync.WaitGroup {
return &c.wg
Expand Down
Loading

0 comments on commit d1ff6b9

Please sign in to comment.