Skip to content

Commit

Permalink
*: add region heartbeat duration breakdown metrics
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Mar 4, 2024
1 parent 87f6923 commit 50d0754
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 98 deletions.
1 change: 1 addition & 0 deletions pkg/cluster/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cluster
201 changes: 191 additions & 10 deletions pkg/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,204 @@

package core

import "github.com/prometheus/client_golang/prometheus"
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var (
AcquireLockWaitDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
HeartbeatBreakdownHandleDurationSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "heartbeat_breakdown_handle_duration_seconds_sum",
Help: "Bucketed histogram of processing time (s) of handle the heartbeat stage.",
}, []string{"name"})

HeartbeatBreakdownHandleCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "core",
Name: "acquire_lock_wait_duration_seconds",
Help: "Bucketed histogram of processing time (s) of wait the lock.",
Buckets: prometheus.ExponentialBuckets(0.000001, 2, 29), // 1us ~ 512s
Name: "heartbeat_breakdown_handle_duration_seconds_count",
Help: "Bucketed histogram of processing count of handle the heartbeat stage.",
}, []string{"name"})

scanRegionsLockWait = AcquireLockWaitDuration.WithLabelValues("ScanRegions")
getRelevantRegionsLockWait = AcquireLockWaitDuration.WithLabelValues("GetRelevantRegions")
atomicCheckAndPutRegionLockWait = AcquireLockWaitDuration.WithLabelValues("AtomicCheckAndPutRegion")
waitLockDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("WaitLock")
waitLockCount = HeartbeatBreakdownHandleCount.WithLabelValues("WaitLock")
preCheckDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("PreCheck")
preCheckCount = HeartbeatBreakdownHandleCount.WithLabelValues("PreCheck")
asyncHotStatsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("AsyncHotStatsDuration")
asyncHotStatsCount = HeartbeatBreakdownHandleCount.WithLabelValues("AsyncHotStatsDuration")
regionGuideDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("RegionGuide")
regionGuideCount = HeartbeatBreakdownHandleCount.WithLabelValues("RegionGuide")
checkOverlapsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_CheckOverlaps")
checkOverlapsCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_CheckOverlaps")
validateRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_InvalidRegion")
validateRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_InvalidRegion")
setRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_SetRegion")
setRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_SetRegion")
updateSubTreeDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_UpdateSubTree")
updateSubTreeCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_UpdateSubTree")
otherDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("Other")
otherCount = HeartbeatBreakdownHandleCount.WithLabelValues("Other")
)

func init() {
prometheus.MustRegister(AcquireLockWaitDuration)
prometheus.MustRegister(HeartbeatBreakdownHandleDurationSum)
prometheus.MustRegister(HeartbeatBreakdownHandleCount)
}

type saveCacheStats struct {
startTime time.Time
lastCheckTime time.Time
checkOverlapsDuration time.Duration
validateRegionDuration time.Duration
setRegionDuration time.Duration
updateSubTreeDuration time.Duration
}

type RegionHeartbeatProcessTracer interface {
Begin()
OnPreCheckFinished()
OnAsyncHotStatsFinished()
OnRegionGuideFinished()
OnSaveCacheBegin()
OnSaveCacheFinished()
OnCheckOverlapsFinished()
OnValidateRegionFinished()
OnSetRegionFinished()
OnUpdateSubTreeFinished()
OnAllStageFinished()
LogFields() []zap.Field
}

type noopHeartbeatProcessTracer struct{}

func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &noopHeartbeatProcessTracer{}
}

func (n *noopHeartbeatProcessTracer) Begin() {}
func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {}
func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {}
func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {}
func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {}
func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {}
func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {}
func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {}
func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {}
func (n *noopHeartbeatProcessTracer) LogFields() []zap.Field {
return nil
}

type regionHeartbeatProcessTracer struct {
startTime time.Time
lastCheckTime time.Time
preCheckDuration time.Duration
asyncHotStatsDuration time.Duration
regionGuideDuration time.Duration
saveCacheStats saveCacheStats
OtherDuration time.Duration
}

func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &regionHeartbeatProcessTracer{}
}

func (h *regionHeartbeatProcessTracer) Begin() {
now := time.Now()
h.startTime = now
h.lastCheckTime = now
}

func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() {
now := time.Now()
h.preCheckDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
preCheckDurationSum.Add(h.preCheckDuration.Seconds())
preCheckCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() {
now := time.Now()
h.asyncHotStatsDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
asyncHotStatsDurationSum.Add(h.preCheckDuration.Seconds())
asyncHotStatsCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() {
now := time.Now()
h.regionGuideDuration = now.Sub(h.lastCheckTime)
h.lastCheckTime = now
regionGuideDurationSum.Add(h.regionGuideDuration.Seconds())
regionGuideCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() {
now := time.Now()
h.saveCacheStats.startTime = now
h.saveCacheStats.lastCheckTime = now
h.lastCheckTime = now
}

func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() {
// update the outer checkpoint time
h.lastCheckTime = time.Now()
}

func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() {
now := time.Now()
h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
checkOverlapsDurationSum.Add(h.saveCacheStats.checkOverlapsDuration.Seconds())
checkOverlapsCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() {
now := time.Now()
h.saveCacheStats.validateRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
validateRegionDurationSum.Add(h.saveCacheStats.validateRegionDuration.Seconds())
validateRegionCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() {
now := time.Now()
h.saveCacheStats.setRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
setRegionDurationSum.Add(h.saveCacheStats.setRegionDuration.Seconds())
setRegionCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() {
now := time.Now()
h.saveCacheStats.updateSubTreeDuration = now.Sub(h.saveCacheStats.lastCheckTime)
h.saveCacheStats.lastCheckTime = now
updateSubTreeDurationSum.Add(h.saveCacheStats.updateSubTreeDuration.Seconds())
updateSubTreeCount.Inc()
}

func (h *regionHeartbeatProcessTracer) OnAllStageFinished() {
now := time.Now()
h.OtherDuration = now.Sub(h.lastCheckTime)
otherDurationSum.Add(h.OtherDuration.Seconds())
otherCount.Inc()
}

func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field {
return []zap.Field{
zap.Duration("pre-check-duration", h.preCheckDuration),
zap.Duration("async-hot-stats-duration", h.asyncHotStatsDuration),
zap.Duration("region-guide-duration", h.regionGuideDuration),
zap.Duration("check-overlaps-duration", h.saveCacheStats.checkOverlapsDuration),
zap.Duration("validate-region-duration", h.saveCacheStats.validateRegionDuration),
zap.Duration("set-region-duration", h.saveCacheStats.setRegionDuration),
zap.Duration("update-sub-tree-duration", h.saveCacheStats.updateSubTreeDuration),
zap.Duration("other-duration", h.OtherDuration),
}
}
67 changes: 55 additions & 12 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,12 +824,44 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
}

type RWLockStats struct {
syncutil.RWMutex
totalWaitTime int64
lockCount int64
lastLockCount int64
lastTotalWaitTime int64
}

func (t *RWLockStats) Lock() {

Check failure on line 835 in pkg/core/region.go

View workflow job for this annotation

GitHub Actions / statics

ST1016: methods on the same type should have the same receiver name (seen 1x "t", 3x "s") (stylecheck)
startTime := time.Now()
t.RWMutex.Lock()
elapsed := time.Since(startTime).Nanoseconds()
atomic.AddInt64(&t.totalWaitTime, elapsed)
atomic.AddInt64(&t.lockCount, 1)
}

func (s *RWLockStats) Unlock() {
s.RWMutex.Unlock()
}

func (s *RWLockStats) RLock() {
startTime := time.Now()
s.RWMutex.RLock()
elapsed := time.Since(startTime).Nanoseconds()
atomic.AddInt64(&s.totalWaitTime, elapsed)
atomic.AddInt64(&s.lockCount, 1)
}

func (s *RWLockStats) RUnlock() {
s.RWMutex.RUnlock()
}

// RegionsInfo for export
type RegionsInfo struct {
t syncutil.RWMutex
t RWLockStats
tree *regionTree
regions map[uint64]*regionItem // regionID -> regionInfo
st syncutil.RWMutex
st RWLockStats
subRegions map[uint64]*regionItem // regionID -> regionInfo
leaders map[uint64]*regionTree // storeID -> sub regionTree
followers map[uint64]*regionTree // storeID -> sub regionTree
Expand Down Expand Up @@ -896,38 +928,39 @@ func (r *RegionsInfo) PutRegion(region *RegionInfo) []*RegionInfo {
}

// PreCheckPutRegion checks if the region is valid to put.
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region)
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region, trace)
err := check(region, origin, overlaps)
return origin, overlaps, err
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) {
start := time.Now()
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) {
r.t.Lock()
atomicCheckAndPutRegionLockWait.Observe(time.Since(start).Seconds())
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
trace.OnCheckOverlapsFinished()
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
trace.OnValidateRegionFinished()
return nil, err
}
trace.OnValidateRegionFinished()
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
trace.OnSetRegionFinished()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
trace.OnUpdateSubTreeFinished()
return overlaps, nil
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*regionItem) {
start := time.Now()
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, trace RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) {
r.t.RLock()
getRelevantRegionsLockWait.Observe(time.Since(start).Seconds())
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
Expand Down Expand Up @@ -1603,9 +1636,7 @@ func (r *RegionsInfo) GetRegionCount(startKey, endKey []byte) int {
// ScanRegions scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (r *RegionsInfo) ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo {
start := time.Now()
r.t.RLock()
scanRegionsLockWait.Observe(time.Since(start).Seconds())
defer r.t.RUnlock()
var res []*RegionInfo
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
Expand Down Expand Up @@ -1659,6 +1690,18 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
return size
}

func (r *RegionsInfo) CollectWaitLockMetrics() {
lastTotalWaitTime := atomic.LoadInt64(&r.t.lastTotalWaitTime) + atomic.LoadInt64(&r.st.lastTotalWaitTime)
lastLockCount := atomic.LoadInt64(&r.t.lastLockCount) + atomic.LoadInt64(&r.st.lastLockCount)
totalLockCount := atomic.LoadInt64(&r.t.lockCount) + atomic.LoadInt64(&r.st.lockCount)
totalWaitTime := atomic.LoadInt64(&r.t.totalWaitTime) + atomic.LoadInt64(&r.st.totalWaitTime)
if lastTotalWaitTime == 0 || lastLockCount == 0 || totalLockCount-lastLockCount < 0 || totalLockCount > int64(15*time.Second) {
return
}
waitLockDurationSum.Add(time.Duration(totalWaitTime - lastTotalWaitTime).Seconds())
waitLockCount.Add(float64(totalLockCount - lastLockCount))
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
r.t.RLock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,9 @@ func TestSetRegionConcurrence(t *testing.T) {
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(region)
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
}()
regions.AtomicCheckAndPutRegion(region)
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

Expand Down
Loading

0 comments on commit 50d0754

Please sign in to comment.