diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 738f161897f..7ebe150c1f9 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -584,40 +584,21 @@ LOOP2: } c.observerLastTick = atomic.NewTime(time.Time{}) -<<<<<<< HEAD - stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id) - c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs) - failpoint.Inject("ChangefeedNewRedoManagerError", func() { - err = errors.New("changefeed new redo manager injected error") - }) - if err != nil { - return err - } -======= - c.redoDDLMgr = redo.NewDDLManager(c.id, c.latestInfo.Config.Consistent, ddlStartTs) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) + c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs) if c.redoDDLMgr.Enabled() { c.wg.Add(1) go func() { defer c.wg.Done() - ctx.Throw(c.redoDDLMgr.Run(stdCtx)) + ctx.Throw(c.redoDDLMgr.Run(cancelCtx)) }() } -<<<<<<< HEAD - c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx, - c.state.Info.Config.Consistent, checkpointTs) - if err != nil { - return err - } -======= - c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs) if c.redoMetaMgr.Enabled() { c.wg.Add(1) go func() { defer c.wg.Done() - ctx.Throw(c.redoMetaMgr.Run(stdCtx)) + ctx.Throw(c.redoMetaMgr.Run(cancelCtx)) }() } log.Info("owner creates redo manager", @@ -779,19 +760,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { } // when removing a paused changefeed, the redo manager is nil, create a new one if c.redoMetaMgr == nil { -<<<<<<< HEAD - redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent) - if err != nil { - log.Info("owner creates redo manager for clean fail", - zap.String("namespace", c.id.Namespace), - zap.String("changefeed", c.id.ID), - zap.Error(err)) - return - } - c.redoMetaMgr = redoMetaMgr -======= - c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, 0) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0) } err := c.redoMetaMgr.Cleanup(ctx) if err != nil { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index d06107e7053..8d9adb17a77 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -670,14 +670,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { } p.changefeed.Info.Config.Sink.TiDBSourceID = sourceID -<<<<<<< HEAD - p.redo.r, err = redo.NewDMLManager(stdCtx, p.changefeed.Info.Config.Consistent) - if err != nil { - return err - } -======= p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) p.redo.name = "RedoManager" p.redo.spawn(stdCtx) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 2e574b7a3b2..5d76886d5fc 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -21,11 +21,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" -<<<<<<< HEAD - "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/cdc/contextutil" -======= ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo/common" @@ -70,19 +65,10 @@ func NewDisabledDDLManager() *ddlManager { // NewDDLManager creates a new ddl Manager. func NewDDLManager( -<<<<<<< HEAD - ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts, -) (*ddlManager, error) { - logManager, err := newLogManager(ctx, cfg, redo.RedoDDLLogFileType) - if err != nil { - return nil, err - } -======= changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, ddlStartTs model.Ts, ) *ddlManager { m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) span := spanz.TableIDToComparableSpan(0) m.AddTable(span, ddlStartTs) return &ddlManager{ @@ -126,18 +112,11 @@ type DMLManager interface { } // NewDMLManager creates a new dml Manager. -<<<<<<< HEAD -func NewDMLManager(ctx context.Context, cfg *config.ConsistentConfig) (*dmlManager, error) { - logManager, err := newLogManager(ctx, cfg, redo.RedoRowLogFileType) - if err != nil { - return nil, err -======= -func NewDMLManager(changefeedID model.ChangeFeedID, - cfg *config.ConsistentConfig, +func NewDMLManager( + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, ) *dmlManager { return &dmlManager{ logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType), ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } } @@ -244,36 +223,14 @@ type logManager struct { } func newLogManager( -<<<<<<< HEAD - ctx context.Context, cfg *config.ConsistentConfig, logType string, -) (*logManager, error) { -======= changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, logType string, ) *logManager { ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { return &logManager{enabled: false} } -<<<<<<< HEAD - uri, err := storage.ParseRawURL(cfg.Storage) - if err != nil { - return nil, err - } - changefeedID := contextutil.ChangefeedIDFromCtx(ctx) - m := &logManager{ - enabled: true, - cfg: &writer.LogWriterConfig{ - ConsistentConfig: *cfg, - LogType: logType, - CaptureID: contextutil.CaptureAddrFromCtx(ctx), - ChangeFeedID: changefeedID, - URI: *uri, - UseExternalStorage: redo.IsExternalStorage(uri.Scheme), - MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, -======= return &logManager{ enabled: true, cfg: &writer.LogWriterConfig{ @@ -282,7 +239,6 @@ func newLogManager( CaptureID: config.GetGlobalServerConfig().AdvertiseAddr, ChangeFeedID: changefeedID, MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) }, logBuffer: chann.NewAutoDrainChann[cacheEvents](), rtsMap: spanz.SyncMap{}, diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index a0706f24a38..14db8ffc251 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -120,23 +120,11 @@ func TestLogManagerInProcessor(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } -<<<<<<< HEAD - dmlMgr, err := NewDMLManager(ctx, cfg) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - dmlMgr.Run(ctx) - }() - -======= dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) var eg errgroup.Group eg.Go(func() error { return dmlMgr.Run(ctx) }) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) // check emit row changed events can move forward resolved ts spans := []tablepb.Span{ spanz.TableIDToComparableSpan(53), @@ -240,23 +228,12 @@ func TestLogManagerInOwner(t *testing.T) { UseFileBackend: useFileBackend, } startTs := model.Ts(10) -<<<<<<< HEAD - ddlMgr, err := NewDDLManager(ctx, cfg, startTs) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - ddlMgr.Run(ctx) - }() -======= ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs) var eg errgroup.Group eg.Go(func() error { return ddlMgr.Run(ctx) }) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) require.Equal(t, startTs, ddlMgr.GetResolvedTs()) ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"} @@ -294,28 +271,11 @@ func TestLogManagerError(t *testing.T) { Storage: "blackhole-invalid://", FlushIntervalInMs: redo.MinFlushIntervalInMs, } -<<<<<<< HEAD - logMgr, err := NewDMLManager(ctx, cfg) - require.NoError(t, err) - err = logMgr.writer.Close() - require.NoError(t, err) - logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := logMgr.Run(ctx) - require.Regexp(t, ".*invalid black hole writer.*", err) - require.Regexp(t, ".*WriteLog.*", err) - }() -======= logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) var eg errgroup.Group eg.Go(func() error { return logMgr.Run(ctx) }) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) testCases := []struct { span tablepb.Span @@ -363,14 +323,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } -<<<<<<< HEAD - dmlMgr, err := NewDMLManager(ctx, cfg) - require.Nil(b, err) - eg := errgroup.Group{} -======= dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) var eg errgroup.Group ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) eg.Go(func() error { return dmlMgr.Run(ctx) }) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 851c534a406..6e29bc86425 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/pkg/config" @@ -75,8 +74,6 @@ type metaManager struct { lastFlushTime time.Time cfg *config.ConsistentConfig metricFlushLogDuration prometheus.Observer - - flushIntervalInMs int64 } // NewDisabledMetaManager creates a disabled Meta Manager. @@ -86,72 +83,25 @@ func NewDisabledMetaManager() *metaManager { } } -<<<<<<< HEAD -// NewMetaManagerWithInit creates a new Manager and initializes the meta. -func NewMetaManagerWithInit( - ctx context.Context, cfg *config.ConsistentConfig, startTs model.Ts, -) (*metaManager, error) { - m, err := NewMetaManager(ctx, cfg) - if err != nil { - return nil, err - } - - // There is no need to perform initialize operation if metaMgr is disabled - // or the scheme is blackhole. - if m.extStorage != nil { - m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. - WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) - if err = m.preCleanupExtStorage(ctx); err != nil { - log.Warn("pre clean redo logs fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - if err = m.initMeta(ctx, startTs); err != nil { - log.Warn("init redo meta fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - } - - return m, nil -} - -// NewMetaManager creates a new meta Manager. -func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaManager, error) { -======= // NewMetaManager creates a new meta Manager. func NewMetaManager( - changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts, + changefeedID model.ChangeFeedID, captureID string, + cfg *config.ConsistentConfig, checkpoint model.Ts, ) *metaManager { ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { return &metaManager{enabled: false} } m := &metaManager{ - captureID: contextutil.CaptureAddrFromCtx(ctx), - changeFeedID: contextutil.ChangefeedIDFromCtx(ctx), - uuidGenerator: uuid.NewGenerator(), - enabled: true, -<<<<<<< HEAD - flushIntervalInMs: cfg.FlushIntervalInMs, -======= - cfg: cfg, - startTs: checkpoint, - flushIntervalInMs: cfg.MetaFlushIntervalInMs, - } - - if m.flushIntervalInMs < redo.MinFlushIntervalInMs { - log.Warn("redo flush interval is too small, use default value", - zap.Int64("interval", m.flushIntervalInMs)) - m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) + captureID: captureID, + changeFeedID: changefeedID, + uuidGenerator: uuid.NewGenerator(), + enabled: true, + cfg: cfg, + startTs: checkpoint, } + return m } diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index b4b96aeaa8d..32254dd5cf5 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -78,13 +78,6 @@ func TestInitAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } -<<<<<<< HEAD - m, err := NewMetaManagerWithInit(ctx, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, uint64(11), m.metaResolvedTs.getFlushed()) - for _, fileName := range toReomoveFiles { -======= m := NewMetaManager(changefeedID, cfg, startTs) var eg errgroup.Group @@ -101,7 +94,6 @@ func TestInitAndWriteMeta(t *testing.T) { }, time.Second, 50*time.Millisecond) for _, fileName := range toRemoveFiles { ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) @@ -164,12 +156,6 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } -<<<<<<< HEAD - m, err := NewMetaManagerWithInit(ctx, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) -======= m := NewMetaManager(changefeedID, cfg, startTs) var eg errgroup.Group @@ -185,7 +171,6 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { return startTs == m.metaResolvedTs.getFlushed() }, time.Second, 50*time.Millisecond) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) @@ -301,14 +286,7 @@ func TestGCAndCleanup(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } -<<<<<<< HEAD - m, err := NewMetaManagerWithInit(ctx, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) -======= m := NewMetaManager(changefeedID, cfg, startTs) ->>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) var eg errgroup.Group eg.Go(func() error { diff --git a/cdc/redo/writer/memory/mem_log_writer.go b/cdc/redo/writer/memory/mem_log_writer.go index 66a41873046..0460fdbf08b 100644 --- a/cdc/redo/writer/memory/mem_log_writer.go +++ b/cdc/redo/writer/memory/mem_log_writer.go @@ -52,7 +52,6 @@ func NewLogWriter( return nil, errors.WrapError(errors.ErrRedoConfigInvalid, errors.New("invalid LogWriterConfig")) } - // "nfs" and "local" scheme are converted to "file" scheme if !cfg.UseExternalStorage { redo.FixLocalScheme(cfg.URI)