Skip to content

Commit

Permalink
*: add region heartbeat duration breakdown metrics
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Mar 4, 2024
1 parent e264a61 commit e36b9c1
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 82 deletions.
222 changes: 222 additions & 0 deletions pkg/core/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var (
// HeartbeatBreakdownHandleDurationSum is the summary of the processing time of handle the heartbeat stage.
HeartbeatBreakdownHandleDurationSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "heartbeat_breakdown_handle_duration_seconds_sum",
Help: "Bucketed histogram of processing time (s) of handle the heartbeat stage.",
}, []string{"name"})

// HeartbeatBreakdownHandleCount is the summary of the processing count of handle the heartbeat stage.
HeartbeatBreakdownHandleCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "heartbeat_breakdown_handle_duration_seconds_count",
Help: "Bucketed histogram of processing count of handle the heartbeat stage.",
}, []string{"name"})

waitLockDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("WaitLock")
waitLockCount = HeartbeatBreakdownHandleCount.WithLabelValues("WaitLock")
preCheckDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("PreCheck")
preCheckCount = HeartbeatBreakdownHandleCount.WithLabelValues("PreCheck")
asyncHotStatsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("AsyncHotStatsDuration")
asyncHotStatsCount = HeartbeatBreakdownHandleCount.WithLabelValues("AsyncHotStatsDuration")
regionGuideDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("RegionGuide")
regionGuideCount = HeartbeatBreakdownHandleCount.WithLabelValues("RegionGuide")
checkOverlapsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_CheckOverlaps")
checkOverlapsCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_CheckOverlaps")
validateRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_InvalidRegion")
validateRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_InvalidRegion")
setRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_SetRegion")
setRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_SetRegion")
updateSubTreeDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_UpdateSubTree")
updateSubTreeCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_UpdateSubTree")
otherDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("Other")
otherCount = HeartbeatBreakdownHandleCount.WithLabelValues("Other")
)

func init() {
prometheus.MustRegister(HeartbeatBreakdownHandleDurationSum)
prometheus.MustRegister(HeartbeatBreakdownHandleCount)
}

type saveCacheStats struct {
startTime time.Time
lastCheckTime time.Time
checkOverlapsDuration time.Duration
validateRegionDuration time.Duration
setRegionDuration time.Duration
updateSubTreeDuration time.Duration
}

// RegionHeartbeatProcessTracer is used to trace the process of handling region heartbeat.
type RegionHeartbeatProcessTracer interface {
Begin()
OnPreCheckFinished()
OnAsyncHotStatsFinished()
OnRegionGuideFinished()
OnSaveCacheBegin()
OnSaveCacheFinished()
OnCheckOverlapsFinished()
OnValidateRegionFinished()
OnSetRegionFinished()
OnUpdateSubTreeFinished()
OnAllStageFinished()
LogFields() []zap.Field
}

type noopHeartbeatProcessTracer struct{}

// NewNoopHeartbeatProcessTracer returns a noop heartbeat process tracer.
func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &noopHeartbeatProcessTracer{}
}

func (n *noopHeartbeatProcessTracer) Begin() {}
func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {}
func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {}
func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {}
func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {}
func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {}
func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {}
func (n *noopHeartbeatProcessTracer) LogFields() []zap.Field {
return nil
}

type regionHeartbeatProcessTracer struct {
startTime time.Time
lastCheckTime time.Time
preCheckDuration time.Duration
asyncHotStatsDuration time.Duration
regionGuideDuration time.Duration
saveCacheStats saveCacheStats
OtherDuration time.Duration
}

// NewHeartbeatProcessTracer returns a heartbeat process tracer.
func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &regionHeartbeatProcessTracer{}
}

func (h *regionHeartbeatProcessTracer) Begin() {
now := time.Now()
h.startTime = now
h.lastCheckTime = now
}

func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() {
now := time.Now()
h.preCheckDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
preCheckDurationSum.Add(h.preCheckDuration.Seconds())
preCheckCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() {
now := time.Now()
h.asyncHotStatsDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
asyncHotStatsDurationSum.Add(h.preCheckDuration.Seconds())
asyncHotStatsCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() {
now := time.Now()
h.regionGuideDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
regionGuideDurationSum.Add(h.regionGuideDuration.Seconds())
regionGuideCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() {
now := time.Now()
h.saveCacheStats.startTime = now
h.saveCacheStats.lastCheckTime = now
h.lastCheckTime = now
}

func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() {
// update the outer checkpoint time
h.lastCheckTime = time.Now()
}

func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() {
now := time.Now()
h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
checkOverlapsDurationSum.Add(h.saveCacheStats.checkOverlapsDuration.Seconds())
checkOverlapsCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() {
now := time.Now()
h.saveCacheStats.validateRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
validateRegionDurationSum.Add(h.saveCacheStats.validateRegionDuration.Seconds())
validateRegionCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() {
now := time.Now()
h.saveCacheStats.setRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
setRegionDurationSum.Add(h.saveCacheStats.setRegionDuration.Seconds())
setRegionCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() {
now := time.Now()
h.saveCacheStats.updateSubTreeDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
updateSubTreeDurationSum.Add(h.saveCacheStats.updateSubTreeDuration.Seconds())
updateSubTreeCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnAllStageFinished() {
now := time.Now()
h.OtherDuration = now.Sub(h.lastCheckTime)
otherDurationSum.Add(h.OtherDuration.Seconds())
otherCount.Inc()
}

func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field {
return []zap.Field{
zap.Duration("pre-check-duration", h.preCheckDuration),
zap.Duration("async-hot-stats-duration", h.asyncHotStatsDuration),
zap.Duration("region-guide-duration", h.regionGuideDuration),
zap.Duration("check-overlaps-duration", h.saveCacheStats.checkOverlapsDuration),
zap.Duration("validate-region-duration", h.saveCacheStats.validateRegionDuration),
zap.Duration("set-region-duration", h.saveCacheStats.setRegionDuration),
zap.Duration("update-sub-tree-duration", h.saveCacheStats.updateSubTreeDuration),
zap.Duration("other-duration", h.OtherDuration),
}
}
77 changes: 71 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,12 +824,49 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
}

// RWLockStats is a read-write lock with statistics.
type RWLockStats struct {
syncutil.RWMutex
totalWaitTime int64
lockCount int64
lastLockCount int64
lastTotalWaitTime int64
}

// Lock locks the lock and records the waiting time.
func (l *RWLockStats) Lock() {
startTime := time.Now()
l.RWMutex.Lock()
elapsed := time.Since(startTime).Nanoseconds()
atomic.AddInt64(&l.totalWaitTime, elapsed)
atomic.AddInt64(&l.lockCount, 1)
}

// Unlock unlocks the lock.
func (l *RWLockStats) Unlock() {
l.RWMutex.Unlock()
}

// RLock locks the lock for reading and records the waiting time.
func (l *RWLockStats) RLock() {
startTime := time.Now()
l.RWMutex.RLock()
elapsed := time.Since(startTime).Nanoseconds()
atomic.AddInt64(&l.totalWaitTime, elapsed)
atomic.AddInt64(&l.lockCount, 1)
}

// RUnlock unlocks the lock for reading.
func (l *RWLockStats) RUnlock() {
l.RWMutex.RUnlock()
}

// RegionsInfo for export
type RegionsInfo struct {
t syncutil.RWMutex
t RWLockStats
tree *regionTree
regions map[uint64]*regionItem // regionID -> regionInfo
st syncutil.RWMutex
st RWLockStats
subRegions map[uint64]*regionItem // regionID -> regionInfo
leaders map[uint64]*regionTree // storeID -> sub regionTree
followers map[uint64]*regionTree // storeID -> sub regionTree
Expand Down Expand Up @@ -896,33 +933,38 @@ func (r *RegionsInfo) PutRegion(region *RegionInfo) []*RegionInfo {
}

// PreCheckPutRegion checks if the region is valid to put.
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region)
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region, trace)
err := check(region, origin, overlaps)
return origin, overlaps, err
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) {
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) {
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
trace.OnCheckOverlapsFinished()
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
trace.OnValidateRegionFinished()
return nil, err
}
trace.OnValidateRegionFinished()
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
trace.OnSetRegionFinished()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
trace.OnUpdateSubTreeFinished()
return overlaps, nil
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*regionItem) {
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, trace RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) {
r.t.RLock()
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
Expand Down Expand Up @@ -1653,6 +1695,29 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
return size
}

const magicCount = 15 * time.Second

// CollectWaitLockMetrics collects the metrics of waiting time for lock
func (r *RegionsInfo) CollectWaitLockMetrics() {
sTotalTime := atomic.LoadInt64(&r.t.totalWaitTime)
stTotalTime := atomic.LoadInt64(&r.st.totalWaitTime)
sLockCount := atomic.LoadInt64(&r.t.lockCount)
stLockCount := atomic.LoadInt64(&r.st.lockCount)
lastTotalWaitTime := atomic.LoadInt64(&r.t.lastTotalWaitTime) + atomic.LoadInt64(&r.st.lastTotalWaitTime)
lastLockCount := atomic.LoadInt64(&r.t.lastLockCount) + atomic.LoadInt64(&r.st.lastLockCount)
totalLockCount := sLockCount + stLockCount
totalWaitTime := sTotalTime + stTotalTime
atomic.StoreInt64(&r.t.lastTotalWaitTime, sTotalTime)
atomic.StoreInt64(&r.t.lastLockCount, sLockCount)
atomic.StoreInt64(&r.st.lastTotalWaitTime, stTotalTime)
atomic.StoreInt64(&r.st.lastLockCount, stLockCount)
if lastTotalWaitTime == 0 || lastLockCount == 0 || totalLockCount-lastLockCount < 0 || totalLockCount-lastLockCount > int64(magicCount) {
return
}
waitLockDurationSum.Add(time.Duration(totalWaitTime - lastTotalWaitTime).Seconds())
waitLockCount.Add(float64(totalLockCount - lastLockCount))
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
r.t.RLock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,9 @@ func TestSetRegionConcurrence(t *testing.T) {
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(region)
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
}()
regions.AtomicCheckAndPutRegion(region)
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

Expand Down
Loading

0 comments on commit e36b9c1

Please sign in to comment.