Skip to content

Commit

Permalink
puller(cdc): add metrics for slowest changefeed puller (#10054) (#10068)
Browse files Browse the repository at this point in the history
close #10053
  • Loading branch information
ti-chi-bot authored Nov 16, 2023
1 parent 1b69de9 commit 04685c5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 12 deletions.
18 changes: 18 additions & 0 deletions cdc/scheduler/internal/v3/replication/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,21 @@ var (
Name: "slow_table_region_count",
Help: "The number of regions captured by the slowest table",
}, []string{"namespace", "changefeed"})

slowestTablePullerResolvedTs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "scheduler",
Name: "slow_table_puller_resolved_ts",
Help: "Puller Slowest ResolvedTs",
}, []string{"namespace", "changefeed"})
slowestTablePullerResolvedTsLag = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "scheduler",
Name: "slow_table_puller_resolved_ts_lag",
Help: "Puller Slowest ResolvedTs lag",
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics used in scheduler
Expand All @@ -144,4 +159,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(slowestTableStageCheckpointTsLagHistogramVec)
registry.MustRegister(slowestTableStageResolvedTsLagHistogramVec)
registry.MustRegister(slowestTableRegionGaugeVec)

registry.MustRegister(slowestTablePullerResolvedTs)
registry.MustRegister(slowestTablePullerResolvedTsLag)
}
39 changes: 29 additions & 10 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ type Manager struct { //nolint:revive
maxTaskConcurrency int

changefeedID model.ChangeFeedID
slowestTableID tablepb.Span
slowestPuller tablepb.Span
slowestSink tablepb.Span
acceptAddTableTask int
acceptRemoveTableTask int
acceptMoveTableTask int
Expand Down Expand Up @@ -589,14 +590,16 @@ func (r *Manager) AdvanceCheckpoint(
}
}()

r.slowestPuller = tablepb.Span{}
r.slowestSink = tablepb.Span{}
var slowestPullerResolvedTs uint64 = math.MaxUint64

newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
slowestRange := tablepb.Span{}
cannotProceed := false
lastSpan := tablepb.Span{}
currentTables.Iter(func(tableID model.TableID, tableStart, tableEnd tablepb.Span) bool {
tableSpanFound, tableHasHole := false, false
tableSpanStartFound, tableSpanEndFound := false, false
lastSpan = tablepb.Span{}
lastSpan := tablepb.Span{}
r.spans.AscendRange(tableStart, tableEnd,
func(span tablepb.Span, table *ReplicationSet) bool {
if lastSpan.TableID != 0 && !bytes.Equal(lastSpan.EndKey, span.StartKey) {
Expand All @@ -620,11 +623,19 @@ func (r *Manager) AdvanceCheckpoint(
// Find the minimum checkpoint ts and resolved ts.
if newCheckpointTs > table.Checkpoint.CheckpointTs {
newCheckpointTs = table.Checkpoint.CheckpointTs
slowestRange = span
r.slowestSink = span
}
if newResolvedTs > table.Checkpoint.ResolvedTs {
newResolvedTs = table.Checkpoint.ResolvedTs
}

// Find the minimum puller resolved ts.
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
if slowestPullerResolvedTs > pullerCkpt.ResolvedTs {
slowestPullerResolvedTs = pullerCkpt.ResolvedTs
r.slowestPuller = span
}
}
return true
})
if !tableSpanFound || !tableSpanStartFound || !tableSpanEndFound || tableHasHole {
Expand Down Expand Up @@ -657,9 +668,6 @@ func (r *Manager) AdvanceCheckpoint(
}
return checkpointCannotProceed, checkpointCannotProceed
}
if slowestRange.TableID != 0 {
r.slowestTableID = slowestRange
}

// If currentTables is empty, we should advance newResolvedTs to global barrier ts and
// advance newCheckpointTs to min table barrier ts.
Expand Down Expand Up @@ -745,9 +753,9 @@ func (r *Manager) CollectMetrics() {
cf := r.changefeedID
tableGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
if table, ok := r.spans.Get(r.slowestTableID); ok {
if table, ok := r.spans.Get(r.slowestSink); ok {
slowestTableIDGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestTableID.TableID))
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestSink.TableID))
slowestTableStateGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(table.State))
phyCkpTs := oracle.ExtractPhysical(table.Checkpoint.CheckpointTs)
Expand Down Expand Up @@ -829,6 +837,17 @@ func (r *Manager) CollectMetrics() {
WithLabelValues(cf.Namespace, cf.ID, ReplicationSetState(s).String()).
Set(float64(counter))
}

if table, ok := r.spans.Get(r.slowestSink); ok {
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))

phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
}
}
}

// CleanMetrics cleans metrics.
Expand Down
20 changes: 18 additions & 2 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3426,8 +3426,16 @@
"hide": false,
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}",
"legendFormat": "{{changefeed}}-barrier",
"refId": "C"
},
{
"exemplar": true,
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)",
"hide": false,
"interval": "",
"legendFormat": "{{changefeed}}-puller",
"refId": "B"
}
],
"thresholds": [],
Expand Down Expand Up @@ -3633,8 +3641,16 @@
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}",
"legendFormat": "{{changefeed}}-barrier",
"refId": "C"
},
{
"exemplar": true,
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)",
"hide": false,
"interval": "",
"legendFormat": "{{changefeed}}-puller",
"refId": "A"
}
],
"thresholds": [],
Expand Down

0 comments on commit 04685c5

Please sign in to comment.