Skip to content

Commit

Permalink
fix redo.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 2, 2023
1 parent eaf4c12 commit 9441e8a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
12 changes: 7 additions & 5 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ func NewDDLManager(
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
m.AddTable(0, ddlStartTs)
tableID := int64(0)
m.AddTable(tableID, ddlStartTs)
return &ddlManager{
logManager: m,
// The current fakeTableID is meaningless, find a meaningful id in the future.
fakeTableID: tableID,
}
}

Expand Down Expand Up @@ -105,8 +108,8 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
func NewDMLManager(changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig,
func NewDMLManager(
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig,
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
Expand Down Expand Up @@ -216,8 +219,7 @@ type logManager struct {
}

func newLogManager(
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, logType string,
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, logType string,
) *logManager {
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
Expand Down
17 changes: 6 additions & 11 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -75,8 +74,6 @@ type metaManager struct {
lastFlushTime time.Time
cfg *config.ConsistentConfig
metricFlushLogDuration prometheus.Observer

flushIntervalInMs int64
}

// NewDisabledMetaManager creates a disabled Meta Manager.
Expand All @@ -96,13 +93,12 @@ func NewMetaManager(
}

m := &metaManager{
captureID: contextutil.CaptureAddrFromCtx(ctx),
changeFeedID: contextutil.ChangefeedIDFromCtx(ctx),
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
flushIntervalInMs: cfg.FlushIntervalInMs,
changeFeedID: changefeedID,
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
}
return m
}
Expand Down Expand Up @@ -266,7 +262,6 @@ func (m *metaManager) initMeta(ctx context.Context) error {
zap.String("changefeed", m.changeFeedID.ID),
zap.Uint64("checkpointTs", flushedMeta.CheckpointTs),
zap.Uint64("resolvedTs", flushedMeta.ResolvedTs))

return util.DeleteFilesInExtStorage(ctx, m.extStorage, toRemoveMetaFiles)
}

Expand Down

0 comments on commit 9441e8a

Please sign in to comment.