Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Nov 16, 2023
1 parent 8f7cadc commit b28debf
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 88 deletions.
89 changes: 15 additions & 74 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,9 @@ type Manager struct { //nolint:revive
maxTaskConcurrency int

changefeedID model.ChangeFeedID
<<<<<<< HEAD
slowestTableID model.TableID
slowestPuller model.TableID
slowestSink model.TableID
slowTableHeap SetHeap
=======
slowestPuller tablepb.Span
slowestSink tablepb.Span
>>>>>>> bf206ea9a2 (puller(cdc): add metrics for slowest changefeed puller (#10054))
acceptAddTableTask int
acceptRemoveTableTask int
acceptMoveTableTask int
Expand Down Expand Up @@ -532,63 +528,14 @@ func (r *Manager) AdvanceCheckpoint(
}
}()

r.slowestPuller = tablepb.Span{}
r.slowestSink = tablepb.Span{}
r.slowestPuller = model.TableID(0)
r.slowestSink = model.TableID(0)
var slowestPullerResolvedTs uint64 = math.MaxUint64

newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
<<<<<<< HEAD
slowestTableID := int64(0)
for _, tableID := range currentTables {
table, ok := r.tables[tableID]
if !ok {
=======
cannotProceed := false
currentTables.Iter(func(tableID model.TableID, tableStart, tableEnd tablepb.Span) bool {
tableSpanFound, tableHasHole := false, false
tableSpanStartFound, tableSpanEndFound := false, false
lastSpan := tablepb.Span{}
r.spans.AscendRange(tableStart, tableEnd,
func(span tablepb.Span, table *ReplicationSet) bool {
if lastSpan.TableID != 0 && !bytes.Equal(lastSpan.EndKey, span.StartKey) {
log.Warn("schedulerv3: span hole detected, skip advance checkpoint",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID),
zap.String("lastSpan", lastSpan.String()),
zap.String("span", span.String()))
tableHasHole = true
return false
}
lastSpan = span
tableSpanFound = true
if bytes.Equal(span.StartKey, tableStart.StartKey) {
tableSpanStartFound = true
}
if bytes.Equal(span.EndKey, tableEnd.StartKey) {
tableSpanEndFound = true
}

// Find the minimum checkpoint ts and resolved ts.
if newCheckpointTs > table.Checkpoint.CheckpointTs {
newCheckpointTs = table.Checkpoint.CheckpointTs
r.slowestSink = span
}
if newResolvedTs > table.Checkpoint.ResolvedTs {
newResolvedTs = table.Checkpoint.ResolvedTs
}

// Find the minimum puller resolved ts.
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
if slowestPullerResolvedTs > pullerCkpt.ResolvedTs {
slowestPullerResolvedTs = pullerCkpt.ResolvedTs
r.slowestPuller = span
}
}
return true
})
if !tableSpanFound || !tableSpanStartFound || !tableSpanEndFound || tableHasHole {
// Can not advance checkpoint there is a span missing.
>>>>>>> bf206ea9a2 (puller(cdc): add metrics for slowest changefeed puller (#10054))
now := time.Now()
if now.Sub(r.lastLogMissTime) > logMissingTableInterval {
// Can not advance checkpoint there is a table missing.
Expand All @@ -608,18 +555,19 @@ func (r *Manager) AdvanceCheckpoint(
// Find the minimum checkpoint ts and resolved ts.
if newCheckpointTs > table.Checkpoint.CheckpointTs {
newCheckpointTs = table.Checkpoint.CheckpointTs
slowestTableID = tableID
r.slowestSink = tableID
}
if newResolvedTs > table.Checkpoint.ResolvedTs {
newResolvedTs = table.Checkpoint.ResolvedTs
}
// Find the minimum puller resolved ts.
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
if slowestPullerResolvedTs > pullerCkpt.ResolvedTs {
slowestPullerResolvedTs = pullerCkpt.ResolvedTs
r.slowestPuller = tableID
}
}
}
<<<<<<< HEAD
if slowestTableID != 0 {
r.slowestTableID = slowestTableID
}
=======
>>>>>>> bf206ea9a2 (puller(cdc): add metrics for slowest changefeed puller (#10054))

// If currentTables is empty, we should advance newResolvedTs to global barrier ts and
// advance newCheckpointTs to min table barrier ts.
Expand Down Expand Up @@ -708,17 +656,10 @@ func (r *Manager) logSlowTableInfo(currentTables []model.TableID, currentTime ti
func (r *Manager) CollectMetrics() {
cf := r.changefeedID
tableGauge.
<<<<<<< HEAD
WithLabelValues(cf.Namespace, cf.ID).Set(float64(len(r.tables)))
if table, ok := r.tables[r.slowestTableID]; ok {
slowestTableIDGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestTableID))
=======
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
if table, ok := r.spans.Get(r.slowestSink); ok {
if table, ok := r.tables[r.slowestSink]; ok {
slowestTableIDGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestSink.TableID))
>>>>>>> bf206ea9a2 (puller(cdc): add metrics for slowest changefeed puller (#10054))
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestSink))
slowestTableStateGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(table.State))
phyCkpTs := oracle.ExtractPhysical(table.Checkpoint.CheckpointTs)
Expand Down Expand Up @@ -800,7 +741,7 @@ func (r *Manager) CollectMetrics() {
Set(float64(counter))
}

if table, ok := r.spans.Get(r.slowestSink); ok {
if table, ok := r.tables[r.slowestSink]; ok {
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))
Expand Down
20 changes: 6 additions & 14 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3426,19 +3426,15 @@
"hide": false,
"interval": "",
"intervalFactor": 1,
<<<<<<< HEAD
"legendFormat": "{{changefeed}}",
=======
"legendFormat": "{{namespace}}-{{changefeed}}-barrier",
>>>>>>> bf206ea9a2 (puller(cdc): add metrics for slowest changefeed puller (#10054))
"legendFormat": "{{changefeed}}-barrier",
"refId": "C"
},
{
"exemplar": true,
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}) by (namespace, changefeed)",
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)",
"hide": false,
"interval": "",
"legendFormat": "{{namespace}}-{{changefeed}}-puller",
"legendFormat": "{{changefeed}}-puller",
"refId": "B"
}
],
Expand Down Expand Up @@ -3644,19 +3640,15 @@
"format": "time_series",
"interval": "",
"intervalFactor": 1,
<<<<<<< HEAD
"legendFormat": "{{changefeed}}",
=======
"legendFormat": "{{namespace}}-{{changefeed}}-barrier",
>>>>>>> bf206ea9a2 (puller(cdc): add metrics for slowest changefeed puller (#10054))
"legendFormat": "{{changefeed}}-barrier",
"refId": "C"
},
{
"exemplar": true,
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", changefeed=~\"$changefeed\"}) by (namespace, changefeed)",
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)",
"hide": false,
"interval": "",
"legendFormat": "{{namespace}}-{{changefeed}}-puller",
"legendFormat": "{{changefeed}}-puller",
"refId": "A"
}
],
Expand Down

0 comments on commit b28debf

Please sign in to comment.