From 87785f1c509b97b2690b8033914cd6da5332cafd Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 2 Nov 2023 18:38:10 +0800 Subject: [PATCH] redo(ticdc): fix redo initialization block the owner (#9887) (#9993) close pingcap/tiflow#9886 --- cdc/owner/changefeed.go | 31 +-- cdc/processor/processor.go | 5 +- cdc/processor/processor_test.go | 3 +- cdc/redo/manager.go | 93 ++++---- cdc/redo/manager_test.go | 72 +++---- cdc/redo/meta_manager.go | 198 +++++++++++------- cdc/redo/meta_manager_test.go | 96 +++++---- .../writer/blackhole/blackhole_log_writer.go | 37 ++-- cdc/redo/writer/factory/factory.go | 21 +- cdc/redo/writer/file/file.go | 4 +- cdc/redo/writer/memory/encoding_worker.go | 2 +- cdc/redo/writer/memory/file_worker.go | 2 +- cdc/redo/writer/memory/mem_log_writer.go | 4 +- cdc/redo/writer/memory/mem_log_writer_test.go | 2 +- cdc/redo/writer/writer.go | 2 +- cdc/scheduler/internal/v3/agent/table.go | 2 +- .../replication/replication_manager_test.go | 4 + cmd/kafka-consumer/main.go | 3 +- pkg/redo/config.go | 14 +- .../integration_tests/changefeed_error/run.sh | 2 +- 20 files changed, 324 insertions(+), 273 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index b261f689f57..35cfd6ba6f7 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -339,6 +339,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* default: } + if c.redoMetaMgr.Enabled() { + if !c.redoMetaMgr.Running() { + return nil + } + } + // TODO: pass table checkpointTs when we support concurrent process ddl allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { @@ -582,13 +588,7 @@ LOOP2: } c.observerLastTick = atomic.NewTime(time.Time{}) - c.redoDDLMgr, err = redo.NewDDLManager(cancelCtx, c.id, c.state.Info.Config.Consistent, ddlStartTs) - failpoint.Inject("ChangefeedNewRedoManagerError", func() { - err = errors.New("changefeed new redo manager injected error") - }) - if err != nil { - return err - } + c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs) if c.redoDDLMgr.Enabled() { c.wg.Add(1) go func() { @@ -597,12 +597,7 @@ LOOP2: }() } - c.redoMetaMgr, err = redo.NewMetaManagerWithInit(cancelCtx, - c.id, - c.state.Info.Config.Consistent, checkpointTs) - if err != nil { - return err - } + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs) if c.redoMetaMgr.Enabled() { c.wg.Add(1) go func() { @@ -771,15 +766,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { } // when removing a paused changefeed, the redo manager is nil, create a new one if c.redoMetaMgr == nil { - redoMetaMgr, err := redo.NewMetaManager(ctx, c.id, c.state.Info.Config.Consistent) - if err != nil { - log.Info("owner creates redo manager for clean fail", - zap.String("namespace", c.id.Namespace), - zap.String("changefeed", c.id.ID), - zap.Error(err)) - return - } - c.redoMetaMgr = redoMetaMgr + c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0) } err := c.redoMetaMgr.Cleanup(ctx) if err != nil { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 3c279758881..58205b11008 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -605,10 +605,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { } p.latestInfo.Config.Sink.TiDBSourceID = sourceID - p.redo.r, err = redo.NewDMLManager(prcCtx, p.changefeedID, p.latestInfo.Config.Consistent) - if err != nil { - return err - } + p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent) p.redo.name = "RedoManager" p.redo.changefeedID = p.changefeedID p.redo.spawn(prcCtx) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 4b2e006f6de..042e243a24a 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -71,14 +71,13 @@ func newProcessor4Test( } else { tmpDir := t.TempDir() redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID) - dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{ + dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{ Level: string(redoPkg.ConsistentLevelEventual), MaxLogSize: redoPkg.DefaultMaxLogSize, FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, Storage: "file://" + redoDir, UseFileBackend: false, }) - require.NoError(t, err) p.redo.r = dmlMgr } p.redo.name = "RedoManager" diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 0fc95448916..1c6f2bd3610 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -19,8 +19,8 @@ import ( "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo/common" @@ -65,20 +65,17 @@ func NewDisabledDDLManager() *ddlManager { // NewDDLManager creates a new ddl Manager. func NewDDLManager( - ctx context.Context, changefeedID model.ChangeFeedID, + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, ddlStartTs model.Ts, -) (*ddlManager, error) { - logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoDDLLogFileType) - if err != nil { - return nil, err - } +) *ddlManager { + m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType) span := spanz.TableIDToComparableSpan(0) - logManager.AddTable(span, ddlStartTs) + m.AddTable(span, ddlStartTs) return &ddlManager{ - logManager: logManager, - // The current fakeSpan is meaningless, find a meaningful sapn in the future. + logManager: m, + // The current fakeSpan is meaningless, find a meaningful span in the future. fakeSpan: span, - }, nil + } } type ddlManager struct { @@ -115,14 +112,12 @@ type DMLManager interface { } // NewDMLManager creates a new dml Manager. -func NewDMLManager(ctx context.Context, changefeedID model.ChangeFeedID, +func NewDMLManager(changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, -) (*dmlManager, error) { - logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoRowLogFileType) - if err != nil { - return nil, err +) *dmlManager { + return &dmlManager{ + logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType), } - return &dmlManager{logManager: logManager}, nil } // NewDisabledDMLManager creates a disabled dml Manager. @@ -228,29 +223,22 @@ type logManager struct { } func newLogManager( - ctx context.Context, changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, logType string, -) (*logManager, error) { +) *logManager { // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &logManager{enabled: false}, nil + return &logManager{enabled: false} } - uri, err := storage.ParseRawURL(cfg.Storage) - if err != nil { - return nil, err - } - m := &logManager{ + return &logManager{ enabled: true, cfg: &writer.LogWriterConfig{ - ConsistentConfig: *cfg, - LogType: logType, - CaptureID: config.GetGlobalServerConfig().AdvertiseAddr, - ChangeFeedID: changefeedID, - URI: *uri, - UseExternalStorage: redo.IsExternalStorage(uri.Scheme), - MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, + ConsistentConfig: *cfg, + LogType: logType, + CaptureID: config.GetGlobalServerConfig().AdvertiseAddr, + ChangeFeedID: changefeedID, + MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, }, logBuffer: chann.NewAutoDrainChann[cacheEvents](), rtsMap: spanz.SyncMap{}, @@ -263,21 +251,30 @@ func newLogManager( metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio. WithLabelValues(changefeedID.Namespace, changefeedID.ID), } - - m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg) - if err != nil { - return nil, err - } - return m, nil } // Run implements pkg/util.Runnable. func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error { - if m.Enabled() { - defer m.close() - return m.bgUpdateLog(ctx) + failpoint.Inject("ChangefeedNewRedoManagerError", func() { + failpoint.Return(errors.New("changefeed new redo manager injected error")) + }) + if !m.Enabled() { + return nil } - return nil + + defer m.close() + start := time.Now() + w, err := factory.NewRedoLogWriter(ctx, m.cfg) + if err != nil { + log.Error("redo: failed to create redo log writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return err + } + m.writer = w + return m.bgUpdateLog(ctx) } // WaitForReady implements pkg/util.Runnable. @@ -549,11 +546,13 @@ func (m *logManager) close() { atomic.StoreInt32(&m.closed, 1) m.logBuffer.CloseAndDrain() - if err := m.writer.Close(); err != nil { - log.Error("redo manager fails to close writer", - zap.String("namespace", m.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", m.cfg.ChangeFeedID.ID), - zap.Error(err)) + if m.writer != nil { + if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled { + log.Error("redo manager fails to close writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Error(err)) + } } log.Info("redo manager closed", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index 0a85e01ba8a..14db8ffc251 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo/writer" - "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" @@ -121,15 +120,11 @@ func TestLogManagerInProcessor(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } - dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - dmlMgr.Run(ctx) - }() - + dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group + eg.Go(func() error { + return dmlMgr.Run(ctx) + }) // check emit row changed events can move forward resolved ts spans := []tablepb.Span{ spanz.TableIDToComparableSpan(53), @@ -202,7 +197,7 @@ func TestLogManagerInProcessor(t *testing.T) { checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs) cancel() - wg.Wait() + require.ErrorIs(t, eg.Wait(), context.Canceled) } testWriteDMLs("blackhole://", true) @@ -233,18 +228,16 @@ func TestLogManagerInOwner(t *testing.T) { UseFileBackend: useFileBackend, } startTs := model.Ts(10) - ddlMgr, err := NewDDLManager(ctx, model.DefaultChangeFeedID("test"), cfg, startTs) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - ddlMgr.Run(ctx) - }() + ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return ddlMgr.Run(ctx) + }) require.Equal(t, startTs, ddlMgr.GetResolvedTs()) ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"} - err = ddlMgr.EmitDDLEvent(ctx, ddl) + err := ddlMgr.EmitDDLEvent(ctx, ddl) require.NoError(t, err) require.Equal(t, startTs, ddlMgr.GetResolvedTs()) @@ -252,7 +245,7 @@ func TestLogManagerInOwner(t *testing.T) { checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs) cancel() - wg.Wait() + require.ErrorIs(t, eg.Wait(), context.Canceled) } testWriteDDLs("blackhole://", true) @@ -275,23 +268,14 @@ func TestLogManagerError(t *testing.T) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), MaxLogSize: redo.DefaultMaxLogSize, - Storage: "blackhole://", + Storage: "blackhole-invalid://", FlushIntervalInMs: redo.MinFlushIntervalInMs, } - logMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg) - require.NoError(t, err) - err = logMgr.writer.Close() - require.NoError(t, err) - logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := logMgr.Run(ctx) - require.Regexp(t, ".*invalid black hole writer.*", err) - require.Regexp(t, ".*WriteLog.*", err) - }() + logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group + eg.Go(func() error { + return logMgr.Run(ctx) + }) testCases := []struct { span tablepb.Span @@ -310,7 +294,10 @@ func TestLogManagerError(t *testing.T) { err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...) require.NoError(t, err) } - wg.Wait() + + err := eg.Wait() + require.Regexp(t, ".*invalid black hole writer.*", err) + require.Regexp(t, ".*WriteLog.*", err) } func BenchmarkBlackhole(b *testing.B) { @@ -336,9 +323,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } - dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg) - require.Nil(b, err) - eg := errgroup.Group{} + dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group eg.Go(func() error { return dmlMgr.Run(ctx) }) @@ -366,7 +352,7 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { go func(span tablepb.Span) { defer wg.Done() maxCommitTs := maxTsMap.GetV(span) - rows := []*model.RowChangedEvent{} + var rows []*model.RowChangedEvent for i := 0; i < maxRowCount; i++ { if i%100 == 0 { // prepare new row change events @@ -409,6 +395,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { time.Sleep(time.Millisecond * 500) } cancel() - err = eg.Wait() - require.ErrorIs(b, err, context.Canceled) + + require.ErrorIs(b, eg.Wait(), context.Canceled) } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index e0aefe30df8..f1ad1acac72 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/uuid" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -46,6 +47,9 @@ type MetaManager interface { // Cleanup deletes all redo logs, which are only called from the owner // when changefeed is deleted. Cleanup(ctx context.Context) error + + // Running return true if the meta manager is running or not. + Running() bool } type metaManager struct { @@ -53,6 +57,9 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool + // running means the meta manager now running normally. + running atomic.Bool + metaCheckpointTs statefulRts metaResolvedTs statefulRts @@ -62,8 +69,10 @@ type metaManager struct { uuidGenerator uuid.Generator preMetaFile string + startTs model.Ts + lastFlushTime time.Time - flushIntervalInMs int64 + cfg *config.ConsistentConfig metricFlushLogDuration prometheus.Observer } @@ -74,94 +83,87 @@ func NewDisabledMetaManager() *metaManager { } } -// NewMetaManagerWithInit creates a new Manager and initializes the meta. -func NewMetaManagerWithInit( - ctx context.Context, changefeedID model.ChangeFeedID, - cfg *config.ConsistentConfig, startTs model.Ts, -) (*metaManager, error) { - m, err := NewMetaManager(ctx, changefeedID, cfg) - if err != nil { - return nil, err - } - - // There is no need to perform initialize operation if metaMgr is disabled - // or the scheme is blackhole. - if m.extStorage != nil { - m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. - WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) - if err = m.preCleanupExtStorage(ctx); err != nil { - log.Warn("pre clean redo logs fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - if err = m.initMeta(ctx, startTs); err != nil { - log.Warn("init redo meta fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - } - - return m, nil -} - // NewMetaManager creates a new meta Manager. -func NewMetaManager(ctx context.Context, changefeedID model.ChangeFeedID, - cfg *config.ConsistentConfig, -) (*metaManager, error) { +func NewMetaManager( + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts, +) *metaManager { // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &metaManager{enabled: false}, nil + return &metaManager{enabled: false} } m := &metaManager{ - captureID: config.GetGlobalServerConfig().AdvertiseAddr, - changeFeedID: changefeedID, - uuidGenerator: uuid.NewGenerator(), - enabled: true, - flushIntervalInMs: cfg.FlushIntervalInMs, - } + captureID: config.GetGlobalServerConfig().AdvertiseAddr, + changeFeedID: changefeedID, + uuidGenerator: uuid.NewGenerator(), + enabled: true, + cfg: cfg, + startTs: checkpoint, + } + return m +} + +// Enabled returns whether this log manager is enabled +func (m *metaManager) Enabled() bool { + return m.enabled +} + +// Running return whether the meta manager is initialized, +// which means the external storage is accessible to the meta manager. +func (m *metaManager) Running() bool { + return m.running.Load() +} - uri, err := storage.ParseRawURL(cfg.Storage) +func (m *metaManager) preStart(ctx context.Context) error { + uri, err := storage.ParseRawURL(m.cfg.Storage) if err != nil { - return nil, err - } - if redo.IsBlackholeStorage(uri.Scheme) { - return m, nil + return err } - // "nfs" and "local" scheme are converted to "file" scheme redo.FixLocalScheme(uri) + extStorage, err := redo.InitExternalStorage(ctx, *uri) if err != nil { - return nil, err + return err } m.extStorage = extStorage - return m, nil -} -// Enabled returns whether this log manager is enabled -func (m *metaManager) Enabled() bool { - return m.enabled + m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + + err = m.preCleanupExtStorage(ctx) + if err != nil { + log.Warn("redo: pre clean redo logs fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + err = m.initMeta(ctx) + if err != nil { + log.Warn("redo: init redo meta fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + return nil } // Run runs bgFlushMeta and bgGC. func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { - if m.extStorage == nil { - log.Warn("extStorage of redo meta manager is nil, skip running") - return nil + if err := m.preStart(ctx); err != nil { + return err } - eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { - return m.bgFlushMeta(egCtx, m.flushIntervalInMs) + return m.bgFlushMeta(egCtx, m.cfg.FlushIntervalInMs) }) eg.Go(func() error { return m.bgGC(egCtx) }) + + m.running.Store(true) return eg.Wait() } @@ -194,9 +196,9 @@ func (m *metaManager) GetFlushedMeta() common.LogMeta { return common.LogMeta{CheckpointTs: checkpointTs, ResolvedTs: resolvedTs} } -// initMeta will read the meta file from external storage and initialize the meta -// field of metaManager. -func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { +// initMeta will read the meta file from external storage and +// use it to initialize the meta field of the metaManager. +func (m *metaManager) initMeta(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -204,10 +206,14 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { } metas := []*common.LogMeta{ - {CheckpointTs: startTs, ResolvedTs: startTs}, + {CheckpointTs: m.startTs, ResolvedTs: m.startTs}, } var toRemoveMetaFiles []string err := m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + log.Info("redo: meta manager walk dir", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Int64("size", size)) // TODO: use prefix to accelerate traverse operation if !strings.HasSuffix(path, redo.MetaEXT) { return nil @@ -215,22 +221,30 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { toRemoveMetaFiles = append(toRemoveMetaFiles, path) data, err := m.extStorage.ReadFile(ctx, path) - if err != nil && !util.IsNotExistInExtStorage(err) { - return err - } - if len(data) != 0 { - var meta common.LogMeta - _, err = meta.UnmarshalMsg(data) - if err != nil { + if err != nil { + log.Warn("redo: read meta file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Error(err)) + if !util.IsNotExistInExtStorage(err) { return err } - metas = append(metas, &meta) + return nil + } + var meta common.LogMeta + _, err = meta.UnmarshalMsg(data) + if err != nil { + log.Error("redo: unmarshal meta data failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err), zap.ByteString("data", data)) + return err } + metas = append(metas, &meta) return nil }) if err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "read meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } var checkpointTs, resolvedTs uint64 @@ -243,9 +257,16 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { m.metaResolvedTs.unflushed = resolvedTs m.metaCheckpointTs.unflushed = checkpointTs if err := m.maybeFlushMeta(ctx); err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "flush meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } + + flushedMeta := m.GetFlushedMeta() + log.Info("redo: meta manager flush init meta success", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Uint64("checkpointTs", flushedMeta.CheckpointTs), + zap.Uint64("resolvedTs", flushedMeta.ResolvedTs)) + return util.DeleteFilesInExtStorage(ctx, m.extStorage, toRemoveMetaFiles) } @@ -305,8 +326,19 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool { // deleteAllLogs delete all redo logs and leave a deleted mark. func (m *metaManager) deleteAllLogs(ctx context.Context) error { + // when one changefeed with redo enabled gets deleted, it's extStorage should always be set to not nil + // otherwise it should have already meet panic during changefeed running time. + // the extStorage may be nil in the unit test, so just set the external storage to make unit test happy. if m.extStorage == nil { - return nil + uri, err := storage.ParseRawURL(m.cfg.Storage) + redo.FixLocalScheme(uri) + if err != nil { + return err + } + m.extStorage, err = redo.InitExternalStorage(ctx, *uri) + if err != nil { + return err + } } // Write deleted mark before clean any files. deleteMarker := getDeletedChangefeedMarker(m.changeFeedID) @@ -382,6 +414,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } metaFile := getMetafileName(m.captureID, m.changeFeedID, m.uuidGenerator) if err := m.extStorage.WriteFile(ctx, metaFile, data); err != nil { + log.Error("redo: meta manager flush meta write file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } @@ -392,6 +428,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } err := m.extStorage.DeleteFile(ctx, m.preMetaFile) if err != nil && !util.IsNotExistInExtStorage(err) { + log.Error("redo: meta manager flush meta delete file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } } diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index cb110f5baae..675b96dcfd7 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -50,17 +50,18 @@ func TestInitAndWriteMeta(t *testing.T) { {CheckpointTs: 8, ResolvedTs: 9}, {CheckpointTs: 9, ResolvedTs: 11}, } - toReomoveFiles := []string{} + + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) metaName := getMetafileName(captureID, changefeedID, uuid.NewGenerator()) err = extStorage.WriteFile(ctx, metaName, data) require.NoError(t, err) - toReomoveFiles = append(toReomoveFiles, metaName) + toRemoveFiles = append(toRemoveFiles, metaName) } - // err = extStorage.WriteFile(ctx, getDeletedChangefeedMarker(changefeedID), []byte{}) - notRemoveFiles := []string{} + + var notRemoveFiles []string require.NoError(t, err) for i := 0; i < 10; i++ { fileName := "dummy" + getChangefeedMatcher(changefeedID) + strconv.Itoa(i) @@ -76,11 +77,22 @@ func TestInitAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, changefeedID, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, uint64(11), m.metaResolvedTs.getFlushed()) - for _, fileName := range toReomoveFiles { + m := NewMetaManager(changefeedID, cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return uint64(11) == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) @@ -91,7 +103,10 @@ func TestInitAndWriteMeta(t *testing.T) { require.True(t, ret, "file %s should not be removed", fileName) } - testWriteMeta(t, m) + testWriteMeta(ctx, t, m) + + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestPreCleanupAndWriteMeta(t *testing.T) { @@ -114,7 +129,7 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { {CheckpointTs: 9, ResolvedTs: 11}, {CheckpointTs: 11, ResolvedTs: 12}, } - toRemoveFiles := []string{} + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) @@ -140,24 +155,36 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, changefeedID, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) + m := NewMetaManager(changefeedID, cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) } - testWriteMeta(t, m) -} + testWriteMeta(ctx, t, m) -func testWriteMeta(t *testing.T, m *metaManager) { - ctx, cancel := context.WithCancel(context.Background()) + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) +} - checkMeta := func(targetCkpts, targetRts uint64) { +func testWriteMeta(ctx context.Context, t *testing.T, m *metaManager) { + checkMeta := func(targetCheckpointTs, targetResolvedTs uint64) { var checkpointTs, resolvedTs uint64 - metas := []*common.LogMeta{} + var metas []*common.LogMeta cnt := 0 m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { if !strings.HasSuffix(path, redo.MetaEXT) { @@ -174,14 +201,10 @@ func testWriteMeta(t *testing.T, m *metaManager) { }) require.Equal(t, 1, cnt) common.ParseMeta(metas, &checkpointTs, &resolvedTs) - require.Equal(t, targetCkpts, checkpointTs) - require.Equal(t, targetRts, resolvedTs) + require.Equal(t, targetCheckpointTs, checkpointTs) + require.Equal(t, targetResolvedTs, resolvedTs) } - eg := errgroup.Group{} - eg.Go(func() error { - return m.Run(ctx) - }) // test both regressed meta := m.GetFlushedMeta() m.UpdateMeta(1, 2) @@ -207,9 +230,6 @@ func testWriteMeta(t *testing.T, m *metaManager) { return m.metaCheckpointTs.getFlushed() == 16 }, time.Second, 50*time.Millisecond) checkMeta(16, 21) - - cancel() - require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestGCAndCleanup(t *testing.T) { @@ -265,15 +285,21 @@ func TestGCAndCleanup(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, changefeedID, cfg, startTs) - require.NoError(t, err) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) + m := NewMetaManager(changefeedID, cfg, startTs) - eg := errgroup.Group{} + var eg errgroup.Group eg.Go(func() error { return m.Run(ctx) }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + checkGC(startTs) for i := startTs; i <= uint64(maxCommitTs); i++ { diff --git a/cdc/redo/writer/blackhole/blackhole_log_writer.go b/cdc/redo/writer/blackhole/blackhole_log_writer.go index 24cc03bd3df..9a877f262f3 100644 --- a/cdc/redo/writer/blackhole/blackhole_log_writer.go +++ b/cdc/redo/writer/blackhole/blackhole_log_writer.go @@ -26,14 +26,21 @@ var _ writer.RedoLogWriter = (*blackHoleWriter)(nil) // blackHoleSink defines a blackHole storage, it receives events and persists // without any latency -type blackHoleWriter struct{} +type blackHoleWriter struct { + invalid bool +} // NewLogWriter creates a blackHole writer -func NewLogWriter() *blackHoleWriter { - return &blackHoleWriter{} +func NewLogWriter(invalid bool) *blackHoleWriter { + return &blackHoleWriter{ + invalid: invalid, + } } func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoEvent) (err error) { + if bs.invalid { + return errors.New("[WriteLog] invalid black hole writer") + } if len(events) == 0 { return nil } @@ -45,30 +52,12 @@ func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoE } func (bs *blackHoleWriter) FlushLog(_ context.Context) error { + if bs.invalid { + return errors.New("[FlushLog] invalid black hole writer") + } return nil } func (bs *blackHoleWriter) Close() error { return nil } - -type invalidBlackHoleWriter struct { - *blackHoleWriter -} - -// NewInvalidLogWriter creates a invalid blackHole writer -func NewInvalidLogWriter(rl writer.RedoLogWriter) *invalidBlackHoleWriter { - return &invalidBlackHoleWriter{ - blackHoleWriter: rl.(*blackHoleWriter), - } -} - -func (ibs *invalidBlackHoleWriter) WriteEvents( - _ context.Context, _ ...writer.RedoEvent, -) (err error) { - return errors.New("[WriteLog] invalid black hole writer") -} - -func (ibs *invalidBlackHoleWriter) FlushLog(_ context.Context) error { - return errors.New("[FlushLog] invalid black hole writer") -} diff --git a/cdc/redo/writer/factory/factory.go b/cdc/redo/writer/factory/factory.go index 7684b3f06f0..796d803ae45 100644 --- a/cdc/redo/writer/factory/factory.go +++ b/cdc/redo/writer/factory/factory.go @@ -15,7 +15,9 @@ package factory import ( "context" + "strings" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/cdc/redo/writer/file" @@ -28,14 +30,23 @@ import ( func NewRedoLogWriter( ctx context.Context, lwCfg *writer.LogWriterConfig, ) (writer.RedoLogWriter, error) { - scheme := lwCfg.URI.Scheme - if !redo.IsValidConsistentStorage(scheme) { - return nil, errors.ErrConsistentStorage.GenWithStackByArgs(scheme) + uri, err := storage.ParseRawURL(lwCfg.Storage) + if err != nil { + return nil, err } - if redo.IsBlackholeStorage(scheme) { - return blackhole.NewLogWriter(), nil + if !redo.IsValidConsistentStorage(uri.Scheme) { + return nil, errors.ErrConsistentStorage.GenWithStackByArgs(uri.Scheme) } + + lwCfg.URI = uri + lwCfg.UseExternalStorage = redo.IsExternalStorage(uri.Scheme) + + if redo.IsBlackholeStorage(uri.Scheme) { + invalid := strings.HasSuffix(uri.Scheme, "invalid") + return blackhole.NewLogWriter(invalid), nil + } + if lwCfg.UseFileBackend { return file.NewLogWriter(ctx, lwCfg) } diff --git a/cdc/redo/writer/file/file.go b/cdc/redo/writer/file/file.go index e895bffe8c6..e12a5df59de 100644 --- a/cdc/redo/writer/file/file.go +++ b/cdc/redo/writer/file/file.go @@ -86,7 +86,7 @@ func NewFileWriter( var extStorage storage.ExternalStorage if cfg.UseExternalStorage { var err error - extStorage, err = redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err = redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } @@ -129,7 +129,7 @@ func NewFileWriter( // if we use S3 as the remote storage, a file allocator can be leveraged to // pre-allocate files for us. // TODO: test whether this improvement can also be applied to NFS. - if cfg.UseExternalStorage { + if w.cfg.UseExternalStorage { w.allocator = fsutil.NewFileAllocator(cfg.Dir, cfg.LogType, cfg.MaxLogSizeInBytes) } diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 337379f3b50..57b15ce45f3 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -124,7 +124,7 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { close(e.closed) - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo fileWorkerGroup closed with error", zap.Error(err)) } }() diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 771888a418d..45b366117c0 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -138,7 +138,7 @@ func (f *fileWorkerGroup) Run( ) (err error) { defer func() { f.close() - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo file workers closed with error", zap.Error(err)) } }() diff --git a/cdc/redo/writer/memory/mem_log_writer.go b/cdc/redo/writer/memory/mem_log_writer.go index a47a83e855b..0460fdbf08b 100644 --- a/cdc/redo/writer/memory/mem_log_writer.go +++ b/cdc/redo/writer/memory/mem_log_writer.go @@ -54,11 +54,11 @@ func NewLogWriter( } // "nfs" and "local" scheme are converted to "file" scheme if !cfg.UseExternalStorage { - redo.FixLocalScheme(&cfg.URI) + redo.FixLocalScheme(cfg.URI) cfg.UseExternalStorage = redo.IsExternalStorage(cfg.URI.Scheme) } - extStorage, err := redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err := redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 60ea189a875..bba18c8068f 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -61,7 +61,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { LogType: redo.RedoDDLLogFileType, CaptureID: "test-capture", ChangeFeedID: model.DefaultChangeFeedID("test-changefeed"), - URI: *uri, + URI: uri, UseExternalStorage: true, MaxLogSizeInBytes: 10 * redo.Megabyte, } diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 18779997b30..42ac64d4ff6 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -52,7 +52,7 @@ type LogWriterConfig struct { CaptureID model.CaptureID ChangeFeedID model.ChangeFeedID - URI url.URL + URI *url.URL UseExternalStorage bool Dir string MaxLogSizeInBytes int64 diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index 506bef29a54..e6a8525af98 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -311,7 +311,7 @@ func newTableSpanManager( func (tm *tableSpanManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) var err error - toBeDropped := []tablepb.Span{} + var toBeDropped []tablepb.Span tm.tables.Ascend(func(span tablepb.Span, table *tableSpan) bool { message, err1 := table.poll(ctx) if err != nil { diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 0584e62972c..632a0835aa3 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -604,6 +604,10 @@ func (m *mockRedoMetaManager) Enabled() bool { return m.enable } +func (m *mockRedoMetaManager) Running() bool { + return true +} + func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { t.Parallel() r := NewReplicationManager(1, model.ChangeFeedID{}) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index b160d423b54..3a14b5e742d 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -773,10 +773,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { defer c.ddlListMu.Unlock() // DDL CommitTs fallback, just crash it to indicate the bug. if c.ddlWithMaxCommitTs != nil && ddl.CommitTs < c.ddlWithMaxCommitTs.CommitTs { - log.Panic("DDL CommitTs < maxCommitTsDDL.CommitTs", + log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.Uint64("maxCommitTs", c.ddlWithMaxCommitTs.CommitTs), zap.Any("DDL", ddl)) + return } // A rename tables DDL job contains multiple DDL events with same CommitTs. diff --git a/pkg/redo/config.go b/pkg/redo/config.go index fbcdb3f1eb3..82c2e6f3457 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -172,7 +172,7 @@ func FixLocalScheme(uri *url.URL) { // IsBlackholeStorage returns whether a blackhole storage is used. func IsBlackholeStorage(scheme string) bool { - return ConsistentStorage(scheme) == consistentStorageBlackhole + return strings.HasPrefix(scheme, string(consistentStorageBlackhole)) } // InitExternalStorage init an external storage. @@ -220,6 +220,18 @@ func ValidateStorage(uri *url.URL) error { return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, fmt.Sprintf("can't make dir for new redo log: %+v", uri))) } + + file := filepath.Join(uri.Path, "file.test") + if err := os.WriteFile(file, []byte(""), DefaultFileMode); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't write file for new redo log: %+v", uri))) + } + + if _, err := os.ReadFile(file); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't read file for new redo log: %+v", uri))) + } + _ = os.Remove(file) return nil } diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 5065d147875..f48b72fbf4d 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -108,7 +108,7 @@ function run() { cleanup_process $CDC_BINARY # make sure initialize changefeed error will not stuck the owner - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)' + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_3="changefeed-initialize-error"