From 20018eb6688d93a57770eadc621fe565aea293e5 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 1 Nov 2023 04:48:37 -0500 Subject: [PATCH] This is an automated cherry-pick of #9887 Signed-off-by: ti-chi-bot --- cdc/owner/changefeed.go | 24 +++ cdc/processor/processor.go | 10 ++ cdc/processor/processor_test.go | 41 +++++ cdc/redo/manager.go | 88 +++++++-- cdc/redo/manager_test.go | 53 +++++- cdc/redo/meta_manager.go | 167 ++++++++++++++---- cdc/redo/meta_manager_test.go | 92 +++++++--- .../writer/blackhole/blackhole_log_writer.go | 37 ++-- cdc/redo/writer/factory/factory.go | 21 ++- cdc/redo/writer/file/file.go | 4 +- cdc/redo/writer/memory/encoding_worker.go | 2 +- cdc/redo/writer/memory/file_worker.go | 2 +- cdc/redo/writer/memory/mem_log_writer.go | 5 +- cdc/redo/writer/memory/mem_log_writer_test.go | 2 +- cdc/redo/writer/writer.go | 2 +- cdc/scheduler/internal/v3/agent/table.go | 7 + .../replication/replication_manager_test.go | 5 + cmd/kafka-consumer/main.go | 3 +- pkg/redo/config.go | 14 +- .../integration_tests/changefeed_error/run.sh | 2 +- 20 files changed, 465 insertions(+), 116 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 80a32391a07..cc405487611 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -317,6 +317,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* default: } + if c.redoMetaMgr.Enabled() { + if !c.redoMetaMgr.Running() { + return 0, 0, nil + } + } + // TODO: pass table checkpointTs when we support concurrent process ddl allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { @@ -551,6 +557,7 @@ LOOP2: ctx.Throw(c.ddlPuller.Run(cancelCtx)) }() +<<<<<<< HEAD stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id) c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs) failpoint.Inject("ChangefeedNewRedoManagerError", func() { @@ -559,6 +566,15 @@ LOOP2: if err != nil { return err } +======= + c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, c.latestInfo.SinkURI, c.latestInfo.Config) + if err != nil { + return err + } + c.observerLastTick = atomic.NewTime(time.Time{}) + + c.redoDDLMgr = redo.NewDDLManager(c.id, c.latestInfo.Config.Consistent, ddlStartTs) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) if c.redoDDLMgr.Enabled() { c.wg.Add(1) go func() { @@ -567,11 +583,15 @@ LOOP2: }() } +<<<<<<< HEAD c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx, c.state.Info.Config.Consistent, checkpointTs) if err != nil { return err } +======= + c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) if c.redoMetaMgr.Enabled() { c.wg.Add(1) go func() { @@ -733,6 +753,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { } // when removing a paused changefeed, the redo manager is nil, create a new one if c.redoMetaMgr == nil { +<<<<<<< HEAD redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent) if err != nil { log.Info("owner creates redo manager for clean fail", @@ -742,6 +763,9 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { return } c.redoMetaMgr = redoMetaMgr +======= + c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, 0) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } err := c.redoMetaMgr.Cleanup(ctx) if err != nil { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 56f377ad8d0..91a6a59bfe4 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -782,6 +782,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { return errors.Trace(err) } +<<<<<<< HEAD stdCtx := contextutil.PutChangefeedIDInCtx(ctx, p.changefeedID) stdCtx = contextutil.PutRoleInCtx(stdCtx, util.RoleProcessor) @@ -817,6 +818,15 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { }() } log.Info("processor creates redo manager", +======= + p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent) + p.redo.name = "RedoManager" + p.redo.changefeedID = p.changefeedID + p.redo.spawn(prcCtx) + + sortEngine, err := p.globalVars.SortEngineFactory.Create(p.changefeedID) + log.Info("Processor creates sort engine", +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 3c6a02bddfe..de621c7d8b9 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -54,8 +54,49 @@ func newProcessor4Test( captureInfo, model.ChangeFeedID4Test("processor-test", "processor-test"), up, liveness, 0) p.lazyInit = func(ctx cdcContext.Context) error { +<<<<<<< HEAD p.agent = &mockAgent{executor: p} p.sinkV1 = mocksink.NewNormalMockSink() +======= + if p.initialized { + return nil + } + + if !enableRedo { + p.redo.r = redo.NewDisabledDMLManager() + } else { + tmpDir := t.TempDir() + redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID) + dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{ + Level: string(redoPkg.ConsistentLevelEventual), + MaxLogSize: redoPkg.DefaultMaxLogSize, + FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs, + Storage: "file://" + redoDir, + UseFileBackend: false, + }) + p.redo.r = dmlMgr + } + p.redo.name = "RedoManager" + p.redo.changefeedID = changefeedID + p.redo.spawn(ctx) + + p.agent = &mockAgent{executor: p, liveness: liveness} + p.sinkManager.r, p.sourceManager.r, _ = sinkmanager.NewManagerWithMemEngine( + t, changefeedID, info, p.redo.r) + p.sinkManager.name = "SinkManager" + p.sinkManager.changefeedID = changefeedID + p.sinkManager.spawn(ctx) + p.sourceManager.name = "SourceManager" + p.sourceManager.changefeedID = changefeedID + p.sourceManager.spawn(ctx) + + // NOTICE: we have to bind the sourceManager to the sinkManager + // otherwise the sinkManager will not receive the resolvedTs. + p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs) + + p.initialized = true +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) return nil } p.redoDMLMgr = redo.NewDisabledDMLManager() diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 7abea626afb..b0aa0f41514 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -19,9 +19,13 @@ import ( "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" +<<<<<<< HEAD "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/contextutil" +======= +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" @@ -62,6 +66,7 @@ func NewDisabledDDLManager() *ddlManager { // NewDDLManager creates a new ddl Manager. func NewDDLManager( +<<<<<<< HEAD ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts, ) (*ddlManager, error) { logManager, err := newLogManager(ctx, cfg, redo.RedoDDLLogFileType) @@ -75,6 +80,19 @@ func NewDDLManager( // The current fakeTableID is meaningless, find a meaningful id in the future. fakeTableID: tableID, }, nil +======= + changefeedID model.ChangeFeedID, + cfg *config.ConsistentConfig, ddlStartTs model.Ts, +) *ddlManager { + m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType) + span := spanz.TableIDToComparableSpan(0) + m.AddTable(span, ddlStartTs) + return &ddlManager{ + logManager: m, + // The current fakeSpan is meaningless, find a meaningful span in the future. + fakeSpan: span, + } +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } type ddlManager struct { @@ -111,12 +129,19 @@ type DMLManager interface { } // NewDMLManager creates a new dml Manager. +<<<<<<< HEAD func NewDMLManager(ctx context.Context, cfg *config.ConsistentConfig) (*dmlManager, error) { logManager, err := newLogManager(ctx, cfg, redo.RedoRowLogFileType) if err != nil { return nil, err +======= +func NewDMLManager(changefeedID model.ChangeFeedID, + cfg *config.ConsistentConfig, +) *dmlManager { + return &dmlManager{ + logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType), +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } - return &dmlManager{logManager: logManager}, nil } // NewDisabledDMLManager creates a disabled dml Manager. @@ -222,13 +247,20 @@ type logManager struct { } func newLogManager( +<<<<<<< HEAD ctx context.Context, cfg *config.ConsistentConfig, logType string, ) (*logManager, error) { +======= + changefeedID model.ChangeFeedID, + cfg *config.ConsistentConfig, logType string, +) *logManager { +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &logManager{enabled: false}, nil + return &logManager{enabled: false} } +<<<<<<< HEAD uri, err := storage.ParseRawURL(cfg.Storage) if err != nil { return nil, err @@ -244,6 +276,16 @@ func newLogManager( URI: *uri, UseExternalStorage: redo.IsExternalStorage(uri.Scheme), MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, +======= + return &logManager{ + enabled: true, + cfg: &writer.LogWriterConfig{ + ConsistentConfig: *cfg, + LogType: logType, + CaptureID: config.GetGlobalServerConfig().AdvertiseAddr, + ChangeFeedID: changefeedID, + MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) }, logBuffer: chann.NewDrainableChann[cacheEvents](), metricWriteLogDuration: common.RedoWriteLogDurationHistogram. @@ -255,16 +297,34 @@ func newLogManager( metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio. WithLabelValues(changefeedID.Namespace, changefeedID.ID), } - - m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg) - if err != nil { - return nil, err - } - return m, nil } +<<<<<<< HEAD func (m *logManager) Run(ctx context.Context) error { defer m.close() +======= +// Run implements pkg/util.Runnable. +func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error { + failpoint.Inject("ChangefeedNewRedoManagerError", func() { + failpoint.Return(errors.New("changefeed new redo manager injected error")) + }) + if !m.Enabled() { + return nil + } + + defer m.close() + start := time.Now() + w, err := factory.NewRedoLogWriter(ctx, m.cfg) + if err != nil { + log.Error("redo: failed to create redo log writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return err + } + m.writer = w +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) return m.bgUpdateLog(ctx) } @@ -531,11 +591,13 @@ func (m *logManager) close() { atomic.StoreInt32(&m.closed, 1) m.logBuffer.CloseAndDrain() - if err := m.writer.Close(); err != nil { - log.Error("redo manager fails to close writer", - zap.String("namespace", m.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", m.cfg.ChangeFeedID.ID), - zap.Error(err)) + if m.writer != nil { + if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled { + log.Error("redo manager fails to close writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Error(err)) + } } log.Info("redo manager closed", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index a84d5a5a4dc..ad9980b0043 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/writer" - "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/redo" "github.com/stretchr/testify/require" @@ -119,6 +118,7 @@ func TestLogManagerInProcessor(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } +<<<<<<< HEAD dmlMgr, err := NewDMLManager(ctx, cfg) require.NoError(t, err) wg := sync.WaitGroup{} @@ -128,6 +128,13 @@ func TestLogManagerInProcessor(t *testing.T) { dmlMgr.Run(ctx) }() +======= + dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group + eg.Go(func() error { + return dmlMgr.Run(ctx) + }) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) // check emit row changed events can move forward resolved ts spans := []model.TableID{53, 55, 57, 59} @@ -195,7 +202,7 @@ func TestLogManagerInProcessor(t *testing.T) { checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs) cancel() - wg.Wait() + require.ErrorIs(t, eg.Wait(), context.Canceled) } testWriteDMLs("blackhole://", true) @@ -226,6 +233,7 @@ func TestLogManagerInOwner(t *testing.T) { UseFileBackend: useFileBackend, } startTs := model.Ts(10) +<<<<<<< HEAD ddlMgr, err := NewDDLManager(ctx, cfg, startTs) require.NoError(t, err) wg := sync.WaitGroup{} @@ -234,10 +242,18 @@ func TestLogManagerInOwner(t *testing.T) { defer wg.Done() ddlMgr.Run(ctx) }() +======= + ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return ddlMgr.Run(ctx) + }) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) require.Equal(t, startTs, ddlMgr.GetResolvedTs()) ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"} - err = ddlMgr.EmitDDLEvent(ctx, ddl) + err := ddlMgr.EmitDDLEvent(ctx, ddl) require.NoError(t, err) require.Equal(t, startTs, ddlMgr.GetResolvedTs()) @@ -245,7 +261,7 @@ func TestLogManagerInOwner(t *testing.T) { checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs) cancel() - wg.Wait() + require.ErrorIs(t, eg.Wait(), context.Canceled) } testWriteDDLs("blackhole://", true) @@ -268,9 +284,10 @@ func TestLogManagerError(t *testing.T) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), MaxLogSize: redo.DefaultMaxLogSize, - Storage: "blackhole://", + Storage: "blackhole-invalid://", FlushIntervalInMs: redo.MinFlushIntervalInMs, } +<<<<<<< HEAD logMgr, err := NewDMLManager(ctx, cfg) require.NoError(t, err) err = logMgr.writer.Close() @@ -285,6 +302,13 @@ func TestLogManagerError(t *testing.T) { require.Regexp(t, ".*invalid black hole writer.*", err) require.Regexp(t, ".*WriteLog.*", err) }() +======= + logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group + eg.Go(func() error { + return logMgr.Run(ctx) + }) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) testCases := []struct { span model.TableID @@ -303,7 +327,10 @@ func TestLogManagerError(t *testing.T) { err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...) require.NoError(t, err) } - wg.Wait() + + err := eg.Wait() + require.Regexp(t, ".*invalid black hole writer.*", err) + require.Regexp(t, ".*WriteLog.*", err) } func BenchmarkBlackhole(b *testing.B) { @@ -329,9 +356,14 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } +<<<<<<< HEAD dmlMgr, err := NewDMLManager(ctx, cfg) require.Nil(b, err) eg := errgroup.Group{} +======= + dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) eg.Go(func() error { return dmlMgr.Run(ctx) }) @@ -357,8 +389,13 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { wg.Add(1) go func(tableID model.TableID) { defer wg.Done() +<<<<<<< HEAD maxCommitTs := maxTsMap[tableID] rows := []*model.RowChangedEvent{} +======= + maxCommitTs := maxTsMap.GetV(span) + var rows []*model.RowChangedEvent +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) for i := 0; i < maxRowCount; i++ { if i%100 == 0 { // prepare new row change events @@ -399,6 +436,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { time.Sleep(time.Millisecond * 500) } cancel() - err = eg.Wait() - require.ErrorIs(b, err, context.Canceled) + + require.ErrorIs(b, eg.Wait(), context.Canceled) } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 231f6db99e8..d5ff11e9e70 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/uuid" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -47,6 +48,9 @@ type MetaManager interface { // Cleanup deletes all redo logs, which are only called from the owner // when changefeed is deleted. Cleanup(ctx context.Context) error + + // Running return true if the meta manager is running or not. + Running() bool } type metaManager struct { @@ -54,6 +58,9 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool + // running means the meta manager now running normally. + running atomic.Bool + metaCheckpointTs statefulRts metaResolvedTs statefulRts @@ -63,9 +70,13 @@ type metaManager struct { uuidGenerator uuid.Generator preMetaFile string + startTs model.Ts + lastFlushTime time.Time - flushIntervalInMs int64 + cfg *config.ConsistentConfig metricFlushLogDuration prometheus.Observer + + flushIntervalInMs int64 } // NewDisabledMetaManager creates a disabled Meta Manager. @@ -75,6 +86,7 @@ func NewDisabledMetaManager() *metaManager { } } +<<<<<<< HEAD // NewMetaManagerWithInit creates a new Manager and initializes the meta. func NewMetaManagerWithInit( ctx context.Context, cfg *config.ConsistentConfig, startTs model.Ts, @@ -110,9 +122,15 @@ func NewMetaManagerWithInit( // NewMetaManager creates a new meta Manager. func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaManager, error) { +======= +// NewMetaManager creates a new meta Manager. +func NewMetaManager( + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts, +) *metaManager { +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &metaManager{enabled: false}, nil + return &metaManager{enabled: false} } m := &metaManager{ @@ -120,46 +138,91 @@ func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaMan changeFeedID: contextutil.ChangefeedIDFromCtx(ctx), uuidGenerator: uuid.NewGenerator(), enabled: true, +<<<<<<< HEAD flushIntervalInMs: cfg.FlushIntervalInMs, +======= + cfg: cfg, + startTs: checkpoint, + flushIntervalInMs: cfg.MetaFlushIntervalInMs, } - uri, err := storage.ParseRawURL(cfg.Storage) - if err != nil { - return nil, err - } - if redo.IsBlackholeStorage(uri.Scheme) { - return m, nil + if m.flushIntervalInMs < redo.MinFlushIntervalInMs { + log.Warn("redo flush interval is too small, use default value", + zap.Int64("interval", m.flushIntervalInMs)) + m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } + return m +} +// Enabled returns whether this log manager is enabled +func (m *metaManager) Enabled() bool { + return m.enabled +} + +// Running return whether the meta manager is initialized, +// which means the external storage is accessible to the meta manager. +func (m *metaManager) Running() bool { + return m.running.Load() +} + +func (m *metaManager) preStart(ctx context.Context) error { + uri, err := storage.ParseRawURL(m.cfg.Storage) + if err != nil { + return err + } // "nfs" and "local" scheme are converted to "file" scheme redo.FixLocalScheme(uri) + extStorage, err := redo.InitExternalStorage(ctx, *uri) if err != nil { - return nil, err + return err } m.extStorage = extStorage - return m, nil -} -// Enabled returns whether this log manager is enabled -func (m *metaManager) Enabled() bool { - return m.enabled + m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + + err = m.preCleanupExtStorage(ctx) + if err != nil { + log.Warn("redo: pre clean redo logs fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + err = m.initMeta(ctx) + if err != nil { + log.Warn("redo: init redo meta fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + return nil } // Run runs bgFlushMeta and bgGC. +<<<<<<< HEAD func (m *metaManager) Run(ctx context.Context) error { if m.extStorage == nil { log.Warn("extStorage of redo meta manager is nil, skip running") return nil +======= +func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { + if err := m.preStart(ctx); err != nil { + return err +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } - eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { - return m.bgFlushMeta(egCtx, m.flushIntervalInMs) + return m.bgFlushMeta(egCtx, m.cfg.FlushIntervalInMs) }) eg.Go(func() error { return m.bgGC(egCtx) }) + + m.running.Store(true) return eg.Wait() } @@ -188,9 +251,9 @@ func (m *metaManager) GetFlushedMeta() common.LogMeta { return common.LogMeta{CheckpointTs: checkpointTs, ResolvedTs: resolvedTs} } -// initMeta will read the meta file from external storage and initialize the meta -// field of metaManager. -func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { +// initMeta will read the meta file from external storage and +// use it to initialize the meta field of the metaManager. +func (m *metaManager) initMeta(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -198,10 +261,14 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { } metas := []*common.LogMeta{ - {CheckpointTs: startTs, ResolvedTs: startTs}, + {CheckpointTs: m.startTs, ResolvedTs: m.startTs}, } var toRemoveMetaFiles []string err := m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + log.Info("redo: meta manager walk dir", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Int64("size", size)) // TODO: use prefix to accelerate traverse operation if !strings.HasSuffix(path, redo.MetaEXT) { return nil @@ -209,22 +276,30 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { toRemoveMetaFiles = append(toRemoveMetaFiles, path) data, err := m.extStorage.ReadFile(ctx, path) - if err != nil && !util.IsNotExistInExtStorage(err) { - return err - } - if len(data) != 0 { - var meta common.LogMeta - _, err = meta.UnmarshalMsg(data) - if err != nil { + if err != nil { + log.Warn("redo: read meta file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Error(err)) + if !util.IsNotExistInExtStorage(err) { return err } - metas = append(metas, &meta) + return nil + } + var meta common.LogMeta + _, err = meta.UnmarshalMsg(data) + if err != nil { + log.Error("redo: unmarshal meta data failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err), zap.ByteString("data", data)) + return err } + metas = append(metas, &meta) return nil }) if err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "read meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } var checkpointTs, resolvedTs uint64 @@ -237,9 +312,16 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { m.metaResolvedTs.unflushed = resolvedTs m.metaCheckpointTs.unflushed = checkpointTs if err := m.maybeFlushMeta(ctx); err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "flush meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } + + flushedMeta := m.GetFlushedMeta() + log.Info("redo: meta manager flush init meta success", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Uint64("checkpointTs", flushedMeta.CheckpointTs), + zap.Uint64("resolvedTs", flushedMeta.ResolvedTs)) + return util.DeleteFilesInExtStorage(ctx, m.extStorage, toRemoveMetaFiles) } @@ -299,8 +381,19 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool { // deleteAllLogs delete all redo logs and leave a deleted mark. func (m *metaManager) deleteAllLogs(ctx context.Context) error { + // when one changefeed with redo enabled gets deleted, it's extStorage should always be set to not nil + // otherwise it should have already meet panic during changefeed running time. + // the extStorage may be nil in the unit test, so just set the external storage to make unit test happy. if m.extStorage == nil { - return nil + uri, err := storage.ParseRawURL(m.cfg.Storage) + redo.FixLocalScheme(uri) + if err != nil { + return err + } + m.extStorage, err = redo.InitExternalStorage(ctx, *uri) + if err != nil { + return err + } } // Write deleted mark before clean any files. deleteMarker := getDeletedChangefeedMarker(m.changeFeedID) @@ -376,6 +469,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } metaFile := getMetafileName(m.captureID, m.changeFeedID, m.uuidGenerator) if err := m.extStorage.WriteFile(ctx, metaFile, data); err != nil { + log.Error("redo: meta manager flush meta write file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } @@ -386,6 +483,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } err := m.extStorage.DeleteFile(ctx, m.preMetaFile) if err != nil && !util.IsNotExistInExtStorage(err) { + log.Error("redo: meta manager flush meta delete file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } } diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index 3a13fdccb35..db487e7ec63 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -51,17 +51,18 @@ func TestInitAndWriteMeta(t *testing.T) { {CheckpointTs: 8, ResolvedTs: 9}, {CheckpointTs: 9, ResolvedTs: 11}, } - toReomoveFiles := []string{} + + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) metaName := getMetafileName(captureID, changefeedID, uuid.NewGenerator()) err = extStorage.WriteFile(ctx, metaName, data) require.NoError(t, err) - toReomoveFiles = append(toReomoveFiles, metaName) + toRemoveFiles = append(toRemoveFiles, metaName) } - // err = extStorage.WriteFile(ctx, getDeletedChangefeedMarker(changefeedID), []byte{}) - notRemoveFiles := []string{} + + var notRemoveFiles []string require.NoError(t, err) for i := 0; i < 10; i++ { fileName := "dummy" + getChangefeedMatcher(changefeedID) + strconv.Itoa(i) @@ -77,11 +78,30 @@ func TestInitAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } +<<<<<<< HEAD m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, uint64(11), m.metaResolvedTs.getFlushed()) for _, fileName := range toReomoveFiles { +======= + m := NewMetaManager(changefeedID, cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return uint64(11) == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + for _, fileName := range toRemoveFiles { +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) @@ -92,7 +112,10 @@ func TestInitAndWriteMeta(t *testing.T) { require.True(t, ret, "file %s should not be removed", fileName) } - testWriteMeta(t, m) + testWriteMeta(ctx, t, m) + + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestPreCleanupAndWriteMeta(t *testing.T) { @@ -115,7 +138,7 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { {CheckpointTs: 9, ResolvedTs: 11}, {CheckpointTs: 11, ResolvedTs: 12}, } - toRemoveFiles := []string{} + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) @@ -141,24 +164,43 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } +<<<<<<< HEAD m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) +======= + m := NewMetaManager(changefeedID, cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) } - testWriteMeta(t, m) -} + testWriteMeta(ctx, t, m) -func testWriteMeta(t *testing.T, m *metaManager) { - ctx, cancel := context.WithCancel(context.Background()) + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) +} - checkMeta := func(targetCkpts, targetRts uint64) { +func testWriteMeta(ctx context.Context, t *testing.T, m *metaManager) { + checkMeta := func(targetCheckpointTs, targetResolvedTs uint64) { var checkpointTs, resolvedTs uint64 - metas := []*common.LogMeta{} + var metas []*common.LogMeta cnt := 0 m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { if !strings.HasSuffix(path, redo.MetaEXT) { @@ -175,14 +217,10 @@ func testWriteMeta(t *testing.T, m *metaManager) { }) require.Equal(t, 1, cnt) common.ParseMeta(metas, &checkpointTs, &resolvedTs) - require.Equal(t, targetCkpts, checkpointTs) - require.Equal(t, targetRts, resolvedTs) + require.Equal(t, targetCheckpointTs, checkpointTs) + require.Equal(t, targetResolvedTs, resolvedTs) } - eg := errgroup.Group{} - eg.Go(func() error { - return m.Run(ctx) - }) // test both regressed meta := m.GetFlushedMeta() m.UpdateMeta(1, 2) @@ -208,9 +246,6 @@ func testWriteMeta(t *testing.T, m *metaManager) { return m.metaCheckpointTs.getFlushed() == 16 }, time.Second, 50*time.Millisecond) checkMeta(16, 21) - - cancel() - require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestGCAndCleanup(t *testing.T) { @@ -272,15 +307,28 @@ func TestGCAndCleanup(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } +<<<<<<< HEAD m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) +======= + m := NewMetaManager(changefeedID, cfg, startTs) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) - eg := errgroup.Group{} + var eg errgroup.Group eg.Go(func() error { return m.Run(ctx) }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + checkGC(startTs) for i := startTs; i <= uint64(maxCommitTs); i++ { diff --git a/cdc/redo/writer/blackhole/blackhole_log_writer.go b/cdc/redo/writer/blackhole/blackhole_log_writer.go index 24cc03bd3df..9a877f262f3 100644 --- a/cdc/redo/writer/blackhole/blackhole_log_writer.go +++ b/cdc/redo/writer/blackhole/blackhole_log_writer.go @@ -26,14 +26,21 @@ var _ writer.RedoLogWriter = (*blackHoleWriter)(nil) // blackHoleSink defines a blackHole storage, it receives events and persists // without any latency -type blackHoleWriter struct{} +type blackHoleWriter struct { + invalid bool +} // NewLogWriter creates a blackHole writer -func NewLogWriter() *blackHoleWriter { - return &blackHoleWriter{} +func NewLogWriter(invalid bool) *blackHoleWriter { + return &blackHoleWriter{ + invalid: invalid, + } } func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoEvent) (err error) { + if bs.invalid { + return errors.New("[WriteLog] invalid black hole writer") + } if len(events) == 0 { return nil } @@ -45,30 +52,12 @@ func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoE } func (bs *blackHoleWriter) FlushLog(_ context.Context) error { + if bs.invalid { + return errors.New("[FlushLog] invalid black hole writer") + } return nil } func (bs *blackHoleWriter) Close() error { return nil } - -type invalidBlackHoleWriter struct { - *blackHoleWriter -} - -// NewInvalidLogWriter creates a invalid blackHole writer -func NewInvalidLogWriter(rl writer.RedoLogWriter) *invalidBlackHoleWriter { - return &invalidBlackHoleWriter{ - blackHoleWriter: rl.(*blackHoleWriter), - } -} - -func (ibs *invalidBlackHoleWriter) WriteEvents( - _ context.Context, _ ...writer.RedoEvent, -) (err error) { - return errors.New("[WriteLog] invalid black hole writer") -} - -func (ibs *invalidBlackHoleWriter) FlushLog(_ context.Context) error { - return errors.New("[FlushLog] invalid black hole writer") -} diff --git a/cdc/redo/writer/factory/factory.go b/cdc/redo/writer/factory/factory.go index 7684b3f06f0..796d803ae45 100644 --- a/cdc/redo/writer/factory/factory.go +++ b/cdc/redo/writer/factory/factory.go @@ -15,7 +15,9 @@ package factory import ( "context" + "strings" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/cdc/redo/writer/file" @@ -28,14 +30,23 @@ import ( func NewRedoLogWriter( ctx context.Context, lwCfg *writer.LogWriterConfig, ) (writer.RedoLogWriter, error) { - scheme := lwCfg.URI.Scheme - if !redo.IsValidConsistentStorage(scheme) { - return nil, errors.ErrConsistentStorage.GenWithStackByArgs(scheme) + uri, err := storage.ParseRawURL(lwCfg.Storage) + if err != nil { + return nil, err } - if redo.IsBlackholeStorage(scheme) { - return blackhole.NewLogWriter(), nil + if !redo.IsValidConsistentStorage(uri.Scheme) { + return nil, errors.ErrConsistentStorage.GenWithStackByArgs(uri.Scheme) } + + lwCfg.URI = uri + lwCfg.UseExternalStorage = redo.IsExternalStorage(uri.Scheme) + + if redo.IsBlackholeStorage(uri.Scheme) { + invalid := strings.HasSuffix(uri.Scheme, "invalid") + return blackhole.NewLogWriter(invalid), nil + } + if lwCfg.UseFileBackend { return file.NewLogWriter(ctx, lwCfg) } diff --git a/cdc/redo/writer/file/file.go b/cdc/redo/writer/file/file.go index e895bffe8c6..e12a5df59de 100644 --- a/cdc/redo/writer/file/file.go +++ b/cdc/redo/writer/file/file.go @@ -86,7 +86,7 @@ func NewFileWriter( var extStorage storage.ExternalStorage if cfg.UseExternalStorage { var err error - extStorage, err = redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err = redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } @@ -129,7 +129,7 @@ func NewFileWriter( // if we use S3 as the remote storage, a file allocator can be leveraged to // pre-allocate files for us. // TODO: test whether this improvement can also be applied to NFS. - if cfg.UseExternalStorage { + if w.cfg.UseExternalStorage { w.allocator = fsutil.NewFileAllocator(cfg.Dir, cfg.LogType, cfg.MaxLogSizeInBytes) } diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index fa59c6dfe94..f2dfa102590 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -123,7 +123,7 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { close(e.closed) - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo fileWorkerGroup closed with error", zap.Error(err)) } }() diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 771888a418d..45b366117c0 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -138,7 +138,7 @@ func (f *fileWorkerGroup) Run( ) (err error) { defer func() { f.close() - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo file workers closed with error", zap.Error(err)) } }() diff --git a/cdc/redo/writer/memory/mem_log_writer.go b/cdc/redo/writer/memory/mem_log_writer.go index a47a83e855b..66a41873046 100644 --- a/cdc/redo/writer/memory/mem_log_writer.go +++ b/cdc/redo/writer/memory/mem_log_writer.go @@ -52,13 +52,14 @@ func NewLogWriter( return nil, errors.WrapError(errors.ErrRedoConfigInvalid, errors.New("invalid LogWriterConfig")) } + // "nfs" and "local" scheme are converted to "file" scheme if !cfg.UseExternalStorage { - redo.FixLocalScheme(&cfg.URI) + redo.FixLocalScheme(cfg.URI) cfg.UseExternalStorage = redo.IsExternalStorage(cfg.URI.Scheme) } - extStorage, err := redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err := redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 8d1a03a7107..53943e52a29 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -68,7 +68,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { LogType: redo.RedoDDLLogFileType, CaptureID: "test-capture", ChangeFeedID: model.DefaultChangeFeedID("test-changefeed"), - URI: *uri, + URI: uri, UseExternalStorage: true, MaxLogSizeInBytes: 10 * redo.Megabyte, } diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 18779997b30..42ac64d4ff6 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -52,7 +52,7 @@ type LogWriterConfig struct { CaptureID model.CaptureID ChangeFeedID model.ChangeFeedID - URI url.URL + URI *url.URL UseExternalStorage bool Dir string MaxLogSizeInBytes int64 diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index 927d7c41028..c2b7a6d7f5e 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -309,8 +309,15 @@ func newTableManager( func (tm *tableManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) +<<<<<<< HEAD for tableID, table := range tm.tables { message, err := table.poll(ctx) +======= + var err error + var toBeDropped []tablepb.Span + tm.tables.Ascend(func(span tablepb.Span, table *tableSpan) bool { + message, err1 := table.poll(ctx) +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) if err != nil { return result, errors.Trace(err) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index c5e7aad2c93..a5a0d228fca 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -585,8 +585,13 @@ func (m *mockRedoMetaManager) Enabled() bool { return m.enable } +<<<<<<< HEAD func (m *mockRedoMetaManager) Run(ctx context.Context) error { return nil +======= +func (m *mockRedoMetaManager) Running() bool { + return true +>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887)) } func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 48582905789..bee9095f566 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -686,10 +686,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { defer c.ddlListMu.Unlock() // DDL CommitTs fallback, just crash it to indicate the bug. if c.ddlWithMaxCommitTs != nil && ddl.CommitTs < c.ddlWithMaxCommitTs.CommitTs { - log.Panic("DDL CommitTs < maxCommitTsDDL.CommitTs", + log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.Uint64("maxCommitTs", c.ddlWithMaxCommitTs.CommitTs), zap.Any("DDL", ddl)) + return } // A rename tables DDL job contains multiple DDL events with same CommitTs. diff --git a/pkg/redo/config.go b/pkg/redo/config.go index fbcdb3f1eb3..82c2e6f3457 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -172,7 +172,7 @@ func FixLocalScheme(uri *url.URL) { // IsBlackholeStorage returns whether a blackhole storage is used. func IsBlackholeStorage(scheme string) bool { - return ConsistentStorage(scheme) == consistentStorageBlackhole + return strings.HasPrefix(scheme, string(consistentStorageBlackhole)) } // InitExternalStorage init an external storage. @@ -220,6 +220,18 @@ func ValidateStorage(uri *url.URL) error { return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, fmt.Sprintf("can't make dir for new redo log: %+v", uri))) } + + file := filepath.Join(uri.Path, "file.test") + if err := os.WriteFile(file, []byte(""), DefaultFileMode); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't write file for new redo log: %+v", uri))) + } + + if _, err := os.ReadFile(file); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't read file for new redo log: %+v", uri))) + } + _ = os.Remove(file) return nil } diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index abaf3d6b556..745515a5347 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -104,7 +104,7 @@ function run() { cleanup_process $CDC_BINARY # make sure initialize changefeed error will not stuck the owner - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)' + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_3="changefeed-initialize-error"