Skip to content

Commit

Permalink
scheduler(ticdc): fix incorrect scheduling task counter (#9840) (#9848)…
Browse files Browse the repository at this point in the history
… (#9869)

close #9839
  • Loading branch information
ti-chi-bot authored Nov 17, 2023
1 parent f0f46b7 commit 2b59186
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str
retry.WithIsRetryableErr(cerror.IsRetryableError))

if tableName == nil {
log.Warn("failed to get table name for metric")
log.Warn("failed to get table name for metric", zap.Any("tableID", tableID))
return strconv.Itoa(int(tableID))
}

Expand Down
45 changes: 45 additions & 0 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package replication

import (
"container/heap"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -49,25 +50,53 @@ type BurstBalance struct {
MoveTables []MoveTable
}

func (b BurstBalance) String() string {
if len(b.AddTables) != 0 {
return fmt.Sprintf("BurstBalance, add tables: %v", b.AddTables)
}
if len(b.RemoveTables) != 0 {
return fmt.Sprintf("BurstBalance, remove tables: %v", b.RemoveTables)
}
if len(b.MoveTables) != 0 {
return fmt.Sprintf("BurstBalance, move tables: %v", b.MoveTables)
}
return "BurstBalance, no tables"
}

// MoveTable is a schedule task for moving a table.
type MoveTable struct {
TableID model.TableID
DestCapture model.CaptureID
}

func (t MoveTable) String() string {
return fmt.Sprintf("MoveTable, tableID: %d, dest: %s",
t.TableID, t.DestCapture)
}

// AddTable is a schedule task for adding a table.
type AddTable struct {
TableID model.TableID
CaptureID model.CaptureID
CheckpointTs model.Ts
}

func (t AddTable) String() string {
return fmt.Sprintf("AddTable, tableID: %d, capture: %s, checkpointTs: %d",
t.TableID, t.CaptureID, t.CheckpointTs)
}

// RemoveTable is a schedule task for removing a table.
type RemoveTable struct {
TableID model.TableID
CaptureID model.CaptureID
}

func (t RemoveTable) String() string {
return fmt.Sprintf("RemoveTable, tableID: %d, capture: %s",
t.TableID, t.CaptureID)
}

// ScheduleTask is a schedule task that wraps add/move/remove table tasks.
type ScheduleTask struct { //nolint:revive
MoveTable *MoveTable
Expand All @@ -92,6 +121,22 @@ func (s *ScheduleTask) Name() string {
return "unknown"
}

func (s *ScheduleTask) String() string {
if s.MoveTable != nil {
return s.MoveTable.String()
}
if s.AddTable != nil {
return s.AddTable.String()
}
if s.RemoveTable != nil {
return s.RemoveTable.String()
}
if s.BurstBalance != nil {
return s.BurstBalance.String()
}
return ""
}

// Manager manages replications and running scheduling tasks.
type Manager struct { //nolint:revive
tables map[model.TableID]*ReplicationSet
Expand Down
Loading

0 comments on commit 2b59186

Please sign in to comment.