Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into sche-redirect2
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 21, 2023
2 parents 51780f2 + 96ace89 commit 27b4458
Show file tree
Hide file tree
Showing 32 changed files with 461 additions and 164 deletions.
40 changes: 23 additions & 17 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,14 @@ func (r *RegionInfo) isRegionRecreated() bool {
return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0)
}

// RegionChanged is a struct that records the changes of the region.
type RegionChanged struct {
IsNew, SaveKV, SaveCache, NeedSync bool
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) *RegionChanged

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -697,18 +702,19 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
// Mark IsNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (changed *RegionChanged) {
changed = &RegionChanged{}
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, isNew = true, true, true
changed.SaveKV, changed.SaveCache, changed.IsNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
isNew = true
changed.IsNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -721,7 +727,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
changed.SaveKV, changed.SaveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -732,11 +738,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
changed.IsNew = true
} else if log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
Expand All @@ -745,57 +751,57 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
return
}
if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) {
if log.GetLevel() <= zap.DebugLevel {
debug("bucket key changed", zap.Uint64("region-id", region.GetID()))
}
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
return
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() ||
region.flowRoundDivisor < origin.flowRoundDivisor {
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
}
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
}
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
changed.SaveCache = true
return
}
if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
changed.SaveCache = true
return
}
// Do not save to kv, because 1) flashback will be eventually set to
// false, 2) flashback changes almost all regions in a cluster.
// Saving kv may downgrade PD performance when there are many regions.
if region.IsFlashbackChanged(origin) {
saveCache = true
changed.SaveCache = true
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
changed := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, changed.NeedSync)
}
}

Expand Down
39 changes: 23 additions & 16 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
initialMinSpace = 8 * units.GiB // 2^33=8GB
slowStoreThreshold = 80
awakenStoreInterval = 10 * time.Minute // 2 * slowScoreRecoveryTime
splitStoreWait = time.Minute

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
Expand All @@ -50,22 +51,23 @@ const (
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
recentlySplitRegionsTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -539,6 +541,11 @@ func (s *StoreInfo) NeedAwakenStore() bool {
return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval
}

// HasRecentlySplitRegions checks if there are some region are splitted in this store.
func (s *StoreInfo) HasRecentlySplitRegions() bool {
return time.Since(s.recentlySplitRegionsTime) < splitStoreWait
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,10 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetRecentlySplitRegionsTime sets last split time for the store.
func SetRecentlySplitRegionsTime(recentlySplitRegionsTime time.Time) StoreCreateOption {
return func(store *StoreInfo) {
store.recentlySplitRegionsTime = recentlySplitRegionsTime
}
}
7 changes: 6 additions & 1 deletion pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,16 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
}
// Send the consumption to update the metrics.
isBackground := req.GetIsBackground()
isTiFlash := req.GetIsTiflash()
if isBackground && isTiFlash {
return errors.New("background and tiflash cannot be true at the same time")
}
s.manager.consumptionDispatcher <- struct {
resourceGroupName string
*rmpb.Consumption
isBackground bool
}{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground}
isTiFlash bool
}{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground, isTiFlash}
if isBackground {
continue
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Manager struct {
resourceGroupName string
*rmpb.Consumption
isBackground bool
isTiFlash bool
}
// record update time of each resource group
consumptionRecord map[string]time.Time
Expand All @@ -81,6 +82,7 @@ func NewManager[T ConfigProvider](srv bs.Server) *Manager {
resourceGroupName string
*rmpb.Consumption
isBackground bool
isTiFlash bool
}, defaultConsumptionChanSize),
consumptionRecord: make(map[string]time.Time),
}
Expand Down Expand Up @@ -361,20 +363,23 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
if consumption == nil {
continue
}
backgroundType := ""
ruLabelType := tidbTypeLabel
if consumptionInfo.isBackground {
backgroundType = backgroundTypeLabel
ruLabelType = backgroundTypeLabel
}
if consumptionInfo.isTiFlash {
ruLabelType = tiflashTypeLabel
}

var (
name = consumptionInfo.resourceGroupName
rruMetrics = readRequestUnitCost.WithLabelValues(name, backgroundType)
wruMetrics = writeRequestUnitCost.WithLabelValues(name, backgroundType)
rruMetrics = readRequestUnitCost.WithLabelValues(name, ruLabelType)
wruMetrics = writeRequestUnitCost.WithLabelValues(name, ruLabelType)
sqlLayerRuMetrics = sqlLayerRequestUnitCost.WithLabelValues(name)
readByteMetrics = readByteCost.WithLabelValues(name, backgroundType)
writeByteMetrics = writeByteCost.WithLabelValues(name, backgroundType)
kvCPUMetrics = kvCPUCost.WithLabelValues(name, backgroundType)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, backgroundType)
readByteMetrics = readByteCost.WithLabelValues(name, ruLabelType)
writeByteMetrics = writeByteCost.WithLabelValues(name, ruLabelType)
kvCPUMetrics = kvCPUCost.WithLabelValues(name, ruLabelType)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, ruLabelType)
readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel)
)
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/resourcemanager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
readTypeLabel = "read"
writeTypeLabel = "write"
backgroundTypeLabel = "background"
tiflashTypeLabel = "tiflash"
tidbTypeLabel = "tidb"
)

var (
Expand Down
10 changes: 5 additions & 5 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
checkMembershipCh: checkMembershipCh,
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels())
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
if err != nil {
cancel()
return nil, err
Expand Down Expand Up @@ -433,8 +433,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
changed := core.GenerateRegionGuideFunc(true)(region, origin)
if !changed.SaveCache && !changed.IsNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -444,7 +444,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

var overlaps []*core.RegionInfo
if saveCache {
if changed.SaveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
Expand All @@ -456,7 +456,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared())
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels)
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel)
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ var (
mergeCheckerPausedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "paused")
mergeCheckerRecentlySplitCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-split")
mergeCheckerRecentlyStartCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-start")
mergeCheckerSkipUninitRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "skip-uninit-region")
mergeCheckerNoLeaderCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-leader")
mergeCheckerNoNeedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-need")
mergeCheckerSpecialPeerCounter = checkerCounter.WithLabelValues(mergeCheckerName, "special-peer")
mergeCheckerUnhealthyRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "unhealthy-region")
mergeCheckerAbnormalReplicaCounter = checkerCounter.WithLabelValues(mergeCheckerName, "abnormal-replica")
mergeCheckerHotRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "hot-region")
mergeCheckerNoTargetCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-target")
Expand Down Expand Up @@ -129,7 +129,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {

// when pd just started, it will load region meta from region storage,
if region.GetLeader() == nil {
mergeCheckerSkipUninitRegionCounter.Inc()
mergeCheckerNoLeaderCounter.Inc()
return nil
}

Expand All @@ -141,7 +141,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {

// skip region has down peers or pending peers
if !filter.IsRegionHealthy(region) {
mergeCheckerSpecialPeerCounter.Inc()
mergeCheckerUnhealthyRegionCounter.Inc()
return nil
}

Expand Down
Loading

0 comments on commit 27b4458

Please sign in to comment.