Skip to content

Commit

Permalink
remove useless field from the ddl manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 6, 2023
1 parent fb83e92 commit 2760d4c
Showing 1 changed file with 14 additions and 26 deletions.
40 changes: 14 additions & 26 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ type ddlManager struct {
redoMetaManager redo.MetaManager
// ddlSink is used to ddlSink DDL events to the downstream
ddlSink DDLSink
// tableCheckpoint store the tableCheckpoint of each table. We need to wait
// for the tableCheckpoint to reach the next ddl commitTs before executing the ddl
tableCheckpoint map[model.TableName]model.Ts

// pendingDDLs store the pending DDL events of all tables
// the DDL events in the same table are ordered by commitTs.
pendingDDLs map[model.TableName][]*model.DDLEvent
Expand All @@ -137,7 +135,6 @@ type ddlManager struct {
physicalTablesCache []model.TableID

BDRMode bool
sinkType model.DownstreamType
ddlResolvedTs model.Ts

// needBootstrap is true when the downstream is kafka
Expand Down Expand Up @@ -169,19 +166,16 @@ func newDDLManager(
zap.Stringer("sinkType", sinkType))

return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
// use the passed sinkType after we support get resolvedTs from sink
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
changfeedID: changefeedID,
ddlSink: ddlSink,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
errCh: make(chan error, 1),
needSendBootstrapEvent: needSendBootstrapEvent,
Expand All @@ -202,12 +196,12 @@ func (m *ddlManager) tick(
checkpointTs model.Ts,
) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) {
if m.needSendBootstrapEvent {
ok, err := m.checkAndBootstrap(ctx)
finished, err := m.checkAndBootstrap(ctx)
if err != nil {
return nil, nil, err
}
if !ok {
return nil, nil, nil
if !finished {
return nil, schedulepb.NewBarrierWithMinTs(checkpointTs), nil
}
}

Expand Down Expand Up @@ -311,12 +305,6 @@ func (m *ddlManager) tick(
zap.Uint64("commitTs", nextDDL.CommitTs))
}

// TODO: Complete this logic, when sinkType is not DB,
// we should not block the execution of DDLs by the checkpointTs.
if m.sinkType != model.DB {
log.Panic("Downstream type is not DB, it never happens in current version")
}

if m.shouldExecDDL(nextDDL) {
if m.executingDDL == nil {
log.Info("execute a ddl event",
Expand Down

0 comments on commit 2760d4c

Please sign in to comment.