Skip to content

Commit

Permalink
Merge branch 'master' into delete-slow-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Oct 10, 2023
2 parents cf85819 + 2266c94 commit f6b9caa
Show file tree
Hide file tree
Showing 18 changed files with 474 additions and 427 deletions.
548 changes: 254 additions & 294 deletions metrics/grafana/pd.json

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import (

var _ bs.Server = (*Server)(nil)

const serviceName = "Resource Manager"

// Server is the resource manager server, and it implements bs.Server.
type Server struct {
*server.BaseServer
Expand Down Expand Up @@ -168,6 +170,7 @@ func (s *Server) campaignLeader() {
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(0)
})

// maintain the leadership, after this, Resource Manager could be ready to provide service.
Expand All @@ -180,6 +183,7 @@ func (s *Server) campaignLeader() {
}

s.participant.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("resource manager primary is ready to serve", zap.String("resource-manager-primary-name", s.participant.Name()))

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
Expand Down Expand Up @@ -382,8 +386,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
// Flushing any buffered log entries
defer log.Sync()

versioninfo.Log("Resource Manager")
log.Info("Resource Manager config", zap.Reflect("config", cfg))
versioninfo.Log(serviceName)
log.Info("resource manager config", zap.Reflect("config", cfg))

grpcprometheus.EnableHandlingTimeHistogram()
metricutil.Push(&cfg.Metric)
Expand Down
106 changes: 104 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Cluster struct {
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
running atomic.Bool
}

const regionLabelGCInterval = time.Hour
Expand Down Expand Up @@ -203,6 +204,14 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

func trySend(notifier chan struct{}) {
select {
case notifier <- struct{}{}:
// If the channel is not empty, it means the check is triggered.
default:
}
}

// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) updateScheduler() {
defer logutil.LogPanic()
Expand All @@ -213,8 +222,11 @@ func (c *Cluster) updateScheduler() {
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)
// Make sure the check will be triggered once later.
notifier <- struct{}{}
trySend(notifier)
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
Expand All @@ -224,6 +236,18 @@ func (c *Cluster) updateScheduler() {
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
trySend(notifier)
continue
}
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
var (
schedulersController = c.coordinator.GetSchedulersController()
Expand Down Expand Up @@ -394,15 +418,93 @@ func (c *Cluster) runUpdateStoreStats() {
}
}

// runCoordinator runs the main scheduling loop.
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
c.coordinator.RunUntilStop()
}

func (c *Cluster) runMetricsCollectionJob() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("metrics are reset")
c.resetMetrics()
log.Info("metrics collection job has been stopped")
return
case <-ticker.C:
c.collectMetrics()
}
}
}

func (c *Cluster) collectMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.persistConfig)
stores := c.GetStores()
for _, s := range stores {
statsMap.Observe(s)
statsMap.ObserveHotStat(s, c.hotStat.StoresStats)
}
statsMap.Collect()

c.coordinator.GetSchedulersController().CollectSchedulerMetrics()
c.coordinator.CollectHotSpotMetrics()
c.collectClusterMetrics()
}

func (c *Cluster) collectClusterMetrics() {
if c.regionStats == nil {
return
}
c.regionStats.Collect()
c.labelStats.Collect()
// collect hot cache metrics
c.hotStat.CollectMetrics()
}

func (c *Cluster) resetMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.persistConfig)
statsMap.Reset()

c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
c.resetClusterMetrics()
}

func (c *Cluster) resetClusterMetrics() {
if c.regionStats == nil {
return
}
c.regionStats.Reset()
c.labelStats.Reset()
// reset hot cache metrics
c.hotStat.ResetMetrics()
}

// StartBackgroundJobs starts background jobs.
func (c *Cluster) StartBackgroundJobs() {
c.wg.Add(2)
c.wg.Add(4)
go c.updateScheduler()
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.running.Store(true)
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
if !c.running.Load() {
return
}
c.running.Store(false)
c.coordinator.Stop()
c.cancel()
c.wg.Wait()
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ func (o *PersistConfig) IsSchedulingHalted() bool {
return o.GetScheduleConfig().HaltScheduling
}

// GetStoresLimit gets the stores' limit.
func (o *PersistConfig) GetStoresLimit() map[uint64]sc.StoreLimitConfig {
return o.GetScheduleConfig().StoreLimit
}

// GetStoreLimitByType returns the limit of a store with a given type.
func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) {
limit := o.GetStoreLimit(storeID)
Expand Down Expand Up @@ -620,11 +625,21 @@ func (o *PersistConfig) GetRegionMaxSize() uint64 {
return o.GetStoreConfig().GetRegionMaxSize()
}

// GetRegionMaxKeys returns the region split keys
// GetRegionMaxKeys returns the max region keys
func (o *PersistConfig) GetRegionMaxKeys() uint64 {
return o.GetStoreConfig().GetRegionMaxKeys()
}

// GetRegionSplitSize returns the region split size in MB
func (o *PersistConfig) GetRegionSplitSize() uint64 {
return o.GetStoreConfig().GetRegionSplitSize()
}

// GetRegionSplitKeys returns the region split keys
func (o *PersistConfig) GetRegionSplitKeys() uint64 {
return o.GetStoreConfig().GetRegionSplitKeys()
}

// IsEnableRegionBucket return true if the region bucket is enabled.
func (o *PersistConfig) IsEnableRegionBucket() bool {
return o.GetStoreConfig().IsEnableRegionBucket()
Expand Down
24 changes: 16 additions & 8 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ import (

var _ bs.Server = (*Server)(nil)

const memberUpdateInterval = time.Minute
const (
serviceName = "Scheduling Service"

memberUpdateInterval = time.Minute
)

// Server is the scheduling server, and it implements bs.Server.
type Server struct {
Expand Down Expand Up @@ -255,6 +259,7 @@ func (s *Server) campaignLeader() {
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(0)
})

// maintain the leadership, after this, Scheduling could be ready to provide service.
Expand All @@ -274,6 +279,7 @@ func (s *Server) campaignLeader() {
}
}()
s.participant.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
Expand Down Expand Up @@ -456,16 +462,12 @@ func (s *Server) startCluster(context.Context) error {
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
s.cluster.StartBackgroundJobs()
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.cluster.StopBackgroundJobs()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
Expand All @@ -481,6 +483,12 @@ func (s *Server) startWatcher() (err error) {
return err
}

func (s *Server) stopWatcher() {
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
Expand Down Expand Up @@ -531,8 +539,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
// Flushing any buffered log entries
defer log.Sync()

versioninfo.Log("Scheduling")
log.Info("Scheduling config", zap.Reflect("config", cfg))
versioninfo.Log(serviceName)
log.Info("scheduling service config", zap.Reflect("config", cfg))

grpcprometheus.EnableHandlingTimeHistogram()
metricutil.Push(&cfg.Metric)
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
var _ bs.Server = (*Server)(nil)
var _ tso.ElectionMember = (*member.Participant)(nil)

const serviceName = "TSO Service"

// Server is the TSO server, and it implements bs.Server.
type Server struct {
*server.BaseServer
Expand Down Expand Up @@ -450,8 +452,8 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
// Flushing any buffered log entries
defer log.Sync()

versioninfo.Log("TSO")
log.Info("TSO config", zap.Reflect("config", cfg))
versioninfo.Log(serviceName)
log.Info("TSO service config", zap.Reflect("config", cfg))

grpcprometheus.EnableHandlingTimeHistogram()
metricutil.Push(&cfg.Metric)
Expand Down
32 changes: 32 additions & 0 deletions pkg/member/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package member

import "github.com/prometheus/client_golang/prometheus"

var (
// ServiceMemberGauge is used to record the leader/primary of services.
ServiceMemberGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "service",
Subsystem: "member",
Name: "role",
Help: "The leader/primary of services",
}, []string{"service"})
)

func init() {
prometheus.MustRegister(ServiceMemberGauge)
}
3 changes: 3 additions & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SchedulerConfigProvider interface {
SharedConfigProvider

IsSchedulingHalted() bool
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(string) bool
AddSchedulerCfg(string, []string)
Expand Down Expand Up @@ -137,6 +138,8 @@ type ConfProvider interface {
type StoreConfigProvider interface {
GetRegionMaxSize() uint64
GetRegionMaxKeys() uint64
GetRegionSplitSize() uint64
GetRegionSplitKeys() uint64
CheckRegionSize(uint64, uint64) error
CheckRegionKeys(uint64, uint64) error
IsEnableRegionBucket() bool
Expand Down
Loading

0 comments on commit f6b9caa

Please sign in to comment.