diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index b39cbf7a554..56cd8966f6e 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -319,7 +319,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* if c.redoMetaMgr.Enabled() { if !c.redoMetaMgr.Running() { - return 0, 0, nil + return nil } } @@ -557,13 +557,7 @@ LOOP2: ctx.Throw(c.ddlPuller.Run(cancelCtx)) }() - c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs) - failpoint.Inject("ChangefeedNewRedoManagerError", func() { - err = errors.New("changefeed new redo manager injected error") - }) - if err != nil { - return err - } + c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs) if c.redoDDLMgr.Enabled() { c.wg.Add(1) go func() { @@ -572,7 +566,7 @@ LOOP2: }() } - c.redoMetaMgr = redo.NewMetaManager(c.id, contextutil.CaptureAddrFromCtx(ctx), c.state.Info.Config.Consistent, checkpointTs) + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs) if c.redoMetaMgr.Enabled() { c.wg.Add(1) go func() { @@ -734,7 +728,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { } // when removing a paused changefeed, the redo manager is nil, create a new one if c.redoMetaMgr == nil { - c.redoMetaMgr = redo.NewMetaManager(c.id, contextutil.CaptureAddrFromCtx(ctx), c.state.Info.Config.Consistent, 0) + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0) } err := c.redoMetaMgr.Cleanup(ctx) if err != nil { diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index a639e528b50..a9c78fb1998 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -246,7 +246,7 @@ func newLogManager( } // Run implements pkg/util.Runnable. -func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error { +func (m *logManager) Run(ctx context.Context) error { failpoint.Inject("ChangefeedNewRedoManagerError", func() { failpoint.Return(errors.New("changefeed new redo manager injected error")) }) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index d5ff11e9e70..b14d08db8b4 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -86,48 +86,10 @@ func NewDisabledMetaManager() *metaManager { } } -<<<<<<< HEAD -// NewMetaManagerWithInit creates a new Manager and initializes the meta. -func NewMetaManagerWithInit( - ctx context.Context, cfg *config.ConsistentConfig, startTs model.Ts, -) (*metaManager, error) { - m, err := NewMetaManager(ctx, cfg) - if err != nil { - return nil, err - } - - // There is no need to perform initialize operation if metaMgr is disabled - // or the scheme is blackhole. - if m.extStorage != nil { - m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. - WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) - if err = m.preCleanupExtStorage(ctx); err != nil { - log.Warn("pre clean redo logs fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - if err = m.initMeta(ctx, startTs); err != nil { - log.Warn("init redo meta fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - } - - return m, nil -} - -// NewMetaManager creates a new meta Manager. -func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaManager, error) { -======= // NewMetaManager creates a new meta Manager. func NewMetaManager( changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts, ) *metaManager { ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { return &metaManager{enabled: false} @@ -138,19 +100,9 @@ func NewMetaManager( changeFeedID: contextutil.ChangefeedIDFromCtx(ctx), uuidGenerator: uuid.NewGenerator(), enabled: true, -<<<<<<< HEAD - flushIntervalInMs: cfg.FlushIntervalInMs, -======= cfg: cfg, startTs: checkpoint, - flushIntervalInMs: cfg.MetaFlushIntervalInMs, - } - - if m.flushIntervalInMs < redo.MinFlushIntervalInMs { - log.Warn("redo flush interval is too small, use default value", - zap.Int64("interval", m.flushIntervalInMs)) - m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) + flushIntervalInMs: cfg.FlushIntervalInMs, } return m } @@ -203,16 +155,9 @@ func (m *metaManager) preStart(ctx context.Context) error { } // Run runs bgFlushMeta and bgGC. -<<<<<<< HEAD func (m *metaManager) Run(ctx context.Context) error { - if m.extStorage == nil { - log.Warn("extStorage of redo meta manager is nil, skip running") - return nil -======= -func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { if err := m.preStart(ctx); err != nil { return err ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error {