diff --git a/cdc/scheduler/internal/v3/replication/metrics.go b/cdc/scheduler/internal/v3/replication/metrics.go index ccd546f67fe..a427035eff5 100644 --- a/cdc/scheduler/internal/v3/replication/metrics.go +++ b/cdc/scheduler/internal/v3/replication/metrics.go @@ -125,6 +125,21 @@ var ( Name: "slow_table_region_count", Help: "The number of regions captured by the slowest table", }, []string{"namespace", "changefeed"}) + + slowestTablePullerResolvedTs = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_puller_resolved_ts", + Help: "Puller Slowest ResolvedTs", + }, []string{"namespace", "changefeed"}) + slowestTablePullerResolvedTsLag = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_puller_resolved_ts_lag", + Help: "Puller Slowest ResolvedTs lag", + }, []string{"namespace", "changefeed"}) ) // InitMetrics registers all metrics used in scheduler @@ -144,4 +159,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(slowestTableStageCheckpointTsLagHistogramVec) registry.MustRegister(slowestTableStageResolvedTsLagHistogramVec) registry.MustRegister(slowestTableRegionGaugeVec) + + registry.MustRegister(slowestTablePullerResolvedTs) + registry.MustRegister(slowestTablePullerResolvedTsLag) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 0ec7c7a671b..4228cc0ec75 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -147,7 +147,8 @@ type Manager struct { //nolint:revive maxTaskConcurrency int changefeedID model.ChangeFeedID - slowestTableID tablepb.Span + slowestPuller tablepb.Span + slowestSink tablepb.Span acceptAddTableTask int acceptRemoveTableTask int acceptMoveTableTask int @@ -589,14 +590,16 @@ func (r *Manager) AdvanceCheckpoint( } }() + r.slowestPuller = tablepb.Span{} + r.slowestSink = tablepb.Span{} + var slowestPullerResolvedTs uint64 = math.MaxUint64 + newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64 - slowestRange := tablepb.Span{} cannotProceed := false - lastSpan := tablepb.Span{} currentTables.Iter(func(tableID model.TableID, tableStart, tableEnd tablepb.Span) bool { tableSpanFound, tableHasHole := false, false tableSpanStartFound, tableSpanEndFound := false, false - lastSpan = tablepb.Span{} + lastSpan := tablepb.Span{} r.spans.AscendRange(tableStart, tableEnd, func(span tablepb.Span, table *ReplicationSet) bool { if lastSpan.TableID != 0 && !bytes.Equal(lastSpan.EndKey, span.StartKey) { @@ -620,11 +623,19 @@ func (r *Manager) AdvanceCheckpoint( // Find the minimum checkpoint ts and resolved ts. if newCheckpointTs > table.Checkpoint.CheckpointTs { newCheckpointTs = table.Checkpoint.CheckpointTs - slowestRange = span + r.slowestSink = span } if newResolvedTs > table.Checkpoint.ResolvedTs { newResolvedTs = table.Checkpoint.ResolvedTs } + + // Find the minimum puller resolved ts. + if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok { + if slowestPullerResolvedTs > pullerCkpt.ResolvedTs { + slowestPullerResolvedTs = pullerCkpt.ResolvedTs + r.slowestPuller = span + } + } return true }) if !tableSpanFound || !tableSpanStartFound || !tableSpanEndFound || tableHasHole { @@ -657,9 +668,6 @@ func (r *Manager) AdvanceCheckpoint( } return checkpointCannotProceed, checkpointCannotProceed } - if slowestRange.TableID != 0 { - r.slowestTableID = slowestRange - } // If currentTables is empty, we should advance newResolvedTs to global barrier ts and // advance newCheckpointTs to min table barrier ts. @@ -745,9 +753,9 @@ func (r *Manager) CollectMetrics() { cf := r.changefeedID tableGauge. WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len())) - if table, ok := r.spans.Get(r.slowestTableID); ok { + if table, ok := r.spans.Get(r.slowestSink); ok { slowestTableIDGauge. - WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestTableID.TableID)) + WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestSink.TableID)) slowestTableStateGauge. WithLabelValues(cf.Namespace, cf.ID).Set(float64(table.State)) phyCkpTs := oracle.ExtractPhysical(table.Checkpoint.CheckpointTs) @@ -829,6 +837,17 @@ func (r *Manager) CollectMetrics() { WithLabelValues(cf.Namespace, cf.ID, ReplicationSetState(s).String()). Set(float64(counter)) } + + if table, ok := r.spans.Get(r.slowestSink); ok { + if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok { + phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs) + slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs)) + + phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs) + lag := float64(phyCurrentTs-phyCkptTs) / 1e3 + slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag) + } + } } // CleanMetrics cleans metrics. diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 6be8d5d5f0a..cdf9a324da5 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -3426,8 +3426,16 @@ "hide": false, "interval": "", "intervalFactor": 1, - "legendFormat": "{{changefeed}}", + "legendFormat": "{{changefeed}}-barrier", "refId": "C" + }, + { + "exemplar": true, + "expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)", + "hide": false, + "interval": "", + "legendFormat": "{{changefeed}}-puller", + "refId": "B" } ], "thresholds": [], @@ -3633,8 +3641,16 @@ "format": "time_series", "interval": "", "intervalFactor": 1, - "legendFormat": "{{changefeed}}", + "legendFormat": "{{changefeed}}-barrier", "refId": "C" + }, + { + "exemplar": true, + "expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)", + "hide": false, + "interval": "", + "legendFormat": "{{changefeed}}-puller", + "refId": "A" } ], "thresholds": [],