From c42cb1d0efa81afa50d43366b752b992eaa2dd83 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 13 Oct 2023 17:43:35 +0800 Subject: [PATCH 01/25] add some logs about read redo meta. --- cdc/redo/meta_manager.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index e0aefe30df8..307b666d013 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -208,6 +208,7 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { } var toRemoveMetaFiles []string err := m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { + log.Info("redo: meta manager walk dir", zap.String("path", path), zap.Int64("size", size)) // TODO: use prefix to accelerate traverse operation if !strings.HasSuffix(path, redo.MetaEXT) { return nil @@ -215,13 +216,18 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { toRemoveMetaFiles = append(toRemoveMetaFiles, path) data, err := m.extStorage.ReadFile(ctx, path) - if err != nil && !util.IsNotExistInExtStorage(err) { - return err + if err != nil { + log.Warn("redo: read meta file failed", zap.String("path", path), zap.Error(err)) + if !util.IsNotExistInExtStorage(err) { + return err + } + return nil } if len(data) != 0 { var meta common.LogMeta _, err = meta.UnmarshalMsg(data) if err != nil { + log.Error("redo: unmarshal meta data failed", zap.Error(err), zap.ByteString("data", data)) return err } metas = append(metas, &meta) From 93fc61c441f8e9f14b621a62f97a4a091614cc1e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 16 Oct 2023 16:52:09 +0800 Subject: [PATCH 02/25] move init meta to the run method. --- cdc/owner/changefeed.go | 5 +-- cdc/redo/meta_manager.go | 62 +++++++++++++++-------------------- cdc/redo/meta_manager_test.go | 9 +++-- 3 files changed, 35 insertions(+), 41 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index b261f689f57..d40c1aed899 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -597,12 +597,13 @@ LOOP2: }() } - c.redoMetaMgr, err = redo.NewMetaManagerWithInit(cancelCtx, + redoMetaManager, err := redo.NewMetaManager(cancelCtx, c.id, - c.state.Info.Config.Consistent, checkpointTs) + c.state.Info.Config.Consistent) if err != nil { return err } + redoMetaManager.SetStartTs(checkpointTs) if c.redoMetaMgr.Enabled() { c.wg.Add(1) go func() { diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 307b666d013..1d5588c0c83 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -62,6 +62,8 @@ type metaManager struct { uuidGenerator uuid.Generator preMetaFile string + startTs model.Ts + lastFlushTime time.Time flushIntervalInMs int64 metricFlushLogDuration prometheus.Observer @@ -74,40 +76,6 @@ func NewDisabledMetaManager() *metaManager { } } -// NewMetaManagerWithInit creates a new Manager and initializes the meta. -func NewMetaManagerWithInit( - ctx context.Context, changefeedID model.ChangeFeedID, - cfg *config.ConsistentConfig, startTs model.Ts, -) (*metaManager, error) { - m, err := NewMetaManager(ctx, changefeedID, cfg) - if err != nil { - return nil, err - } - - // There is no need to perform initialize operation if metaMgr is disabled - // or the scheme is blackhole. - if m.extStorage != nil { - m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. - WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) - if err = m.preCleanupExtStorage(ctx); err != nil { - log.Warn("pre clean redo logs fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - if err = m.initMeta(ctx, startTs); err != nil { - log.Warn("init redo meta fail", - zap.String("namespace", m.changeFeedID.Namespace), - zap.String("changefeed", m.changeFeedID.ID), - zap.Error(err)) - return nil, err - } - } - - return m, nil -} - // NewMetaManager creates a new meta Manager. func NewMetaManager(ctx context.Context, changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, @@ -143,6 +111,11 @@ func NewMetaManager(ctx context.Context, changefeedID model.ChangeFeedID, return m, nil } +// SetStartTs sets the startTs of the redo log meta manager. +func (m *metaManager) SetStartTs(startTs model.Ts) { + m.startTs = startTs +} + // Enabled returns whether this log manager is enabled func (m *metaManager) Enabled() bool { return m.enabled @@ -155,6 +128,23 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { return nil } + m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. + WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + if err := m.preCleanupExtStorage(ctx); err != nil { + log.Warn("pre clean redo logs fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + if err := m.initMeta(ctx); err != nil { + log.Warn("init redo meta fail", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) + return err + } + eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { return m.bgFlushMeta(egCtx, m.flushIntervalInMs) @@ -196,7 +186,7 @@ func (m *metaManager) GetFlushedMeta() common.LogMeta { // initMeta will read the meta file from external storage and initialize the meta // field of metaManager. -func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { +func (m *metaManager) initMeta(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -204,7 +194,7 @@ func (m *metaManager) initMeta(ctx context.Context, startTs model.Ts) error { } metas := []*common.LogMeta{ - {CheckpointTs: startTs, ResolvedTs: startTs}, + {CheckpointTs: m.startTs, ResolvedTs: m.startTs}, } var toRemoveMetaFiles []string err := m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index cb110f5baae..678f241964f 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -76,8 +76,9 @@ func TestInitAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, changefeedID, cfg, startTs) + m, err := NewMetaManager(ctx, changefeedID, cfg) require.NoError(t, err) + m.SetStartTs(startTs) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, uint64(11), m.metaResolvedTs.getFlushed()) for _, fileName := range toReomoveFiles { @@ -140,8 +141,9 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, changefeedID, cfg, startTs) + m, err := NewMetaManager(ctx, changefeedID, cfg) require.NoError(t, err) + m.SetStartTs(startTs) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) for _, fileName := range toRemoveFiles { @@ -265,8 +267,9 @@ func TestGCAndCleanup(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManagerWithInit(ctx, changefeedID, cfg, startTs) + m, err := NewMetaManager(ctx, changefeedID, cfg) require.NoError(t, err) + m.SetStartTs(startTs) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) From ef5bbd9e32bc370bd0c36b9e978d646413278780 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Oct 2023 17:08:02 +0800 Subject: [PATCH 03/25] adjust meta manager. --- cdc/owner/changefeed.go | 19 ++--------- cdc/redo/meta_manager.go | 61 ++++++++++++++--------------------- cdc/redo/meta_manager_test.go | 23 ++++++------- 3 files changed, 36 insertions(+), 67 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 8d14299ea95..d5b7bc4cf87 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -623,14 +623,9 @@ LOOP2: }() } - redoMetaManager, err := redo.NewMetaManager(cancelCtx, - c.id, - c.latestInfo.Config.Consistent) - if err != nil { - return err - } - redoMetaManager.SetStartTs(checkpointTs) + c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs) if c.redoMetaMgr.Enabled() { + // todo: how to initialize this resolved ts ? c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs c.wg.Add(1) go func() { @@ -799,15 +794,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) { } // when removing a paused changefeed, the redo manager is nil, create a new one if c.redoMetaMgr == nil { - redoMetaMgr, err := redo.NewMetaManager(ctx, c.id, cfInfo.Config.Consistent) - if err != nil { - log.Info("owner creates redo manager for clean fail", - zap.String("namespace", c.id.Namespace), - zap.String("changefeed", c.id.ID), - zap.Error(err)) - return - } - c.redoMetaMgr = redoMetaMgr + c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, 0) } err := c.redoMetaMgr.Cleanup(ctx) if err != nil { diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 1d5588c0c83..84db9924998 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -65,7 +65,7 @@ type metaManager struct { startTs model.Ts lastFlushTime time.Time - flushIntervalInMs int64 + cfg *config.ConsistentConfig metricFlushLogDuration prometheus.Observer } @@ -77,43 +77,22 @@ func NewDisabledMetaManager() *metaManager { } // NewMetaManager creates a new meta Manager. -func NewMetaManager(ctx context.Context, changefeedID model.ChangeFeedID, - cfg *config.ConsistentConfig, -) (*metaManager, error) { +func NewMetaManager( + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts, +) *metaManager { // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &metaManager{enabled: false}, nil + return &metaManager{enabled: false} } - m := &metaManager{ - captureID: config.GetGlobalServerConfig().AdvertiseAddr, - changeFeedID: changefeedID, - uuidGenerator: uuid.NewGenerator(), - enabled: true, - flushIntervalInMs: cfg.FlushIntervalInMs, - } - - uri, err := storage.ParseRawURL(cfg.Storage) - if err != nil { - return nil, err - } - if redo.IsBlackholeStorage(uri.Scheme) { - return m, nil - } - - // "nfs" and "local" scheme are converted to "file" scheme - redo.FixLocalScheme(uri) - extStorage, err := redo.InitExternalStorage(ctx, *uri) - if err != nil { - return nil, err + return &metaManager{ + captureID: config.GetGlobalServerConfig().AdvertiseAddr, + changeFeedID: changefeedID, + uuidGenerator: uuid.NewGenerator(), + enabled: true, + cfg: cfg, + startTs: checkpoint, } - m.extStorage = extStorage - return m, nil -} - -// SetStartTs sets the startTs of the redo log meta manager. -func (m *metaManager) SetStartTs(startTs model.Ts) { - m.startTs = startTs } // Enabled returns whether this log manager is enabled @@ -123,10 +102,18 @@ func (m *metaManager) Enabled() bool { // Run runs bgFlushMeta and bgGC. func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { - if m.extStorage == nil { - log.Warn("extStorage of redo meta manager is nil, skip running") - return nil + uri, err := storage.ParseRawURL(m.cfg.Storage) + if err != nil { + return err } + // "nfs" and "local" scheme are converted to "file" scheme + redo.FixLocalScheme(uri) + + extStorage, err := redo.InitExternalStorage(ctx, *uri) + if err != nil { + return err + } + m.extStorage = extStorage m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) @@ -147,7 +134,7 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { - return m.bgFlushMeta(egCtx, m.flushIntervalInMs) + return m.bgFlushMeta(egCtx, m.cfg.FlushIntervalInMs) }) eg.Go(func() error { return m.bgGC(egCtx) diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index 678f241964f..d26c27ce308 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -50,17 +50,18 @@ func TestInitAndWriteMeta(t *testing.T) { {CheckpointTs: 8, ResolvedTs: 9}, {CheckpointTs: 9, ResolvedTs: 11}, } - toReomoveFiles := []string{} + + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) metaName := getMetafileName(captureID, changefeedID, uuid.NewGenerator()) err = extStorage.WriteFile(ctx, metaName, data) require.NoError(t, err) - toReomoveFiles = append(toReomoveFiles, metaName) + toRemoveFiles = append(toRemoveFiles, metaName) } - // err = extStorage.WriteFile(ctx, getDeletedChangefeedMarker(changefeedID), []byte{}) - notRemoveFiles := []string{} + + var notRemoveFiles []string require.NoError(t, err) for i := 0; i < 10; i++ { fileName := "dummy" + getChangefeedMatcher(changefeedID) + strconv.Itoa(i) @@ -76,12 +77,10 @@ func TestInitAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManager(ctx, changefeedID, cfg) - require.NoError(t, err) - m.SetStartTs(startTs) + m := NewMetaManager(changefeedID, cfg, startTs) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, uint64(11), m.metaResolvedTs.getFlushed()) - for _, fileName := range toReomoveFiles { + for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) @@ -141,9 +140,7 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManager(ctx, changefeedID, cfg) - require.NoError(t, err) - m.SetStartTs(startTs) + m := NewMetaManager(changefeedID, cfg, startTs) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) for _, fileName := range toRemoveFiles { @@ -267,9 +264,7 @@ func TestGCAndCleanup(t *testing.T) { Storage: uri.String(), FlushIntervalInMs: redo.MinFlushIntervalInMs, } - m, err := NewMetaManager(ctx, changefeedID, cfg) - require.NoError(t, err) - m.SetStartTs(startTs) + m := NewMetaManager(changefeedID, cfg, startTs) require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) From c7c8af832b736310189b40efee365446c9f3a55e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 18 Oct 2023 17:37:57 +0800 Subject: [PATCH 04/25] fix meta manager. --- cdc/redo/meta_manager.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 84db9924998..f8d49b43079 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/uuid" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -53,6 +54,8 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool + initialized atomic.Bool + metaCheckpointTs statefulRts metaResolvedTs statefulRts @@ -100,6 +103,12 @@ func (m *metaManager) Enabled() bool { return m.enabled } +// Initialized return whether the meta manager is initialized, +// which means the external storage is accessible to the meta manager. +func (m *metaManager) Initialized() bool { + return m.initialized.Load() +} + // Run runs bgFlushMeta and bgGC. func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { uri, err := storage.ParseRawURL(m.cfg.Storage) From c1655e3eb43ddbd4a6e6c1958b606e955d0c0209 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 19 Oct 2023 15:04:54 +0800 Subject: [PATCH 05/25] fix redo meta unit test. --- cdc/redo/meta_manager.go | 48 ++++++++++++++-------- cdc/redo/meta_manager_test.go | 76 ++++++++++++++++++++++++----------- 2 files changed, 84 insertions(+), 40 deletions(-) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index f8d49b43079..b7f38d72e78 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -54,6 +54,8 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool + // initialized means the meta manager now works normally. + // todo: how to use this fields ? initialized atomic.Bool metaCheckpointTs statefulRts @@ -109,8 +111,7 @@ func (m *metaManager) Initialized() bool { return m.initialized.Load() } -// Run runs bgFlushMeta and bgGC. -func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { +func (m *metaManager) preStart(ctx context.Context) error { uri, err := storage.ParseRawURL(m.cfg.Storage) if err != nil { return err @@ -126,6 +127,7 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) + if err := m.preCleanupExtStorage(ctx); err != nil { log.Warn("pre clean redo logs fail", zap.String("namespace", m.changeFeedID.Namespace), @@ -141,6 +143,14 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { return err } + return nil +} + +// Run runs bgFlushMeta and bgGC. +func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { + if err := m.preStart(ctx); err != nil { + return err + } eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { return m.bgFlushMeta(egCtx, m.cfg.FlushIntervalInMs) @@ -148,6 +158,8 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { eg.Go(func() error { return m.bgGC(egCtx) }) + + m.initialized.Store(true) return eg.Wait() } @@ -180,8 +192,8 @@ func (m *metaManager) GetFlushedMeta() common.LogMeta { return common.LogMeta{CheckpointTs: checkpointTs, ResolvedTs: resolvedTs} } -// initMeta will read the meta file from external storage and initialize the meta -// field of metaManager. +// initMeta will read the meta file from external storage and +// use it to initialize the meta field of the metaManager. func (m *metaManager) initMeta(ctx context.Context) error { select { case <-ctx.Done(): @@ -209,20 +221,17 @@ func (m *metaManager) initMeta(ctx context.Context) error { } return nil } - if len(data) != 0 { - var meta common.LogMeta - _, err = meta.UnmarshalMsg(data) - if err != nil { - log.Error("redo: unmarshal meta data failed", zap.Error(err), zap.ByteString("data", data)) - return err - } - metas = append(metas, &meta) + var meta common.LogMeta + _, err = meta.UnmarshalMsg(data) + if err != nil { + log.Error("redo: unmarshal meta data failed", zap.Error(err), zap.ByteString("data", data)) + return err } + metas = append(metas, &meta) return nil }) if err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "read meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } var checkpointTs, resolvedTs uint64 @@ -235,8 +244,7 @@ func (m *metaManager) initMeta(ctx context.Context) error { m.metaResolvedTs.unflushed = resolvedTs m.metaCheckpointTs.unflushed = checkpointTs if err := m.maybeFlushMeta(ctx); err != nil { - return errors.WrapError(errors.ErrRedoMetaInitialize, - errors.Annotate(err, "flush meta file fail")) + return errors.WrapError(errors.ErrRedoMetaInitialize, err) } return util.DeleteFilesInExtStorage(ctx, m.extStorage, toRemoveMetaFiles) } @@ -374,6 +382,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } metaFile := getMetafileName(m.captureID, m.changeFeedID, m.uuidGenerator) if err := m.extStorage.WriteFile(ctx, metaFile, data); err != nil { + log.Error("redo: meta manager flush meta write file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } @@ -384,6 +396,10 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { } err := m.extStorage.DeleteFile(ctx, m.preMetaFile) if err != nil && !util.IsNotExistInExtStorage(err) { + log.Error("redo: meta manager flush meta delete file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err)) return errors.WrapError(errors.ErrExternalStorageAPI, err) } } diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index d26c27ce308..de4844659e8 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -78,8 +78,20 @@ func TestInitAndWriteMeta(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, } m := NewMetaManager(changefeedID, cfg, startTs) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, uint64(11), m.metaResolvedTs.getFlushed()) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return uint64(11) == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) @@ -91,7 +103,10 @@ func TestInitAndWriteMeta(t *testing.T) { require.True(t, ret, "file %s should not be removed", fileName) } - testWriteMeta(t, m) + testWriteMeta(t, ctx, m) + + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestPreCleanupAndWriteMeta(t *testing.T) { @@ -114,7 +129,7 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { {CheckpointTs: 9, ResolvedTs: 11}, {CheckpointTs: 11, ResolvedTs: 12}, } - toRemoveFiles := []string{} + var toRemoveFiles []string for _, meta := range metas { data, err := meta.MarshalMsg(nil) require.NoError(t, err) @@ -141,22 +156,35 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, } m := NewMetaManager(changefeedID, cfg, startTs) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) + + var eg errgroup.Group + eg.Go(func() error { + return m.Run(ctx) + }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + for _, fileName := range toRemoveFiles { ret, err := extStorage.FileExists(ctx, fileName) require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) } - testWriteMeta(t, m) -} + testWriteMeta(t, ctx, m) -func testWriteMeta(t *testing.T, m *metaManager) { - ctx, cancel := context.WithCancel(context.Background()) + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) +} - checkMeta := func(targetCkpts, targetRts uint64) { +func testWriteMeta(t *testing.T, ctx context.Context, m *metaManager) { + checkMeta := func(targetCheckpointTs, targetResolvedTs uint64) { var checkpointTs, resolvedTs uint64 - metas := []*common.LogMeta{} + var metas []*common.LogMeta cnt := 0 m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { if !strings.HasSuffix(path, redo.MetaEXT) { @@ -173,14 +201,10 @@ func testWriteMeta(t *testing.T, m *metaManager) { }) require.Equal(t, 1, cnt) common.ParseMeta(metas, &checkpointTs, &resolvedTs) - require.Equal(t, targetCkpts, checkpointTs) - require.Equal(t, targetRts, resolvedTs) + require.Equal(t, targetCheckpointTs, checkpointTs) + require.Equal(t, targetResolvedTs, resolvedTs) } - eg := errgroup.Group{} - eg.Go(func() error { - return m.Run(ctx) - }) // test both regressed meta := m.GetFlushedMeta() m.UpdateMeta(1, 2) @@ -206,9 +230,6 @@ func testWriteMeta(t *testing.T, m *metaManager) { return m.metaCheckpointTs.getFlushed() == 16 }, time.Second, 50*time.Millisecond) checkMeta(16, 21) - - cancel() - require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestGCAndCleanup(t *testing.T) { @@ -265,13 +286,20 @@ func TestGCAndCleanup(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, } m := NewMetaManager(changefeedID, cfg, startTs) - require.Equal(t, startTs, m.metaCheckpointTs.getFlushed()) - require.Equal(t, startTs, m.metaResolvedTs.getFlushed()) - eg := errgroup.Group{} + var eg errgroup.Group eg.Go(func() error { return m.Run(ctx) }) + + require.Eventually(t, func() bool { + return startTs == m.metaCheckpointTs.getFlushed() + }, time.Second, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return startTs == m.metaResolvedTs.getFlushed() + }, time.Second, 50*time.Millisecond) + checkGC(startTs) for i := startTs; i <= uint64(maxCommitTs); i++ { From 3e3d06b74674dd65b14ff16ad436989326f1a3c4 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 19 Oct 2023 16:05:07 +0800 Subject: [PATCH 06/25] adjust redo log manager. --- cdc/owner/changefeed.go | 8 +- cdc/processor/processor.go | 5 +- cdc/redo/manager.go | 81 ++++++++++--------- cdc/redo/manager_test.go | 67 +++++++-------- cdc/redo/meta_manager_test.go | 6 +- cdc/redo/reader/file.go | 1 + cdc/redo/writer/factory/factory.go | 16 +++- cdc/redo/writer/file/file.go | 4 +- cdc/redo/writer/memory/mem_log_writer.go | 5 +- cdc/redo/writer/memory/mem_log_writer_test.go | 2 +- cdc/redo/writer/writer.go | 2 +- 11 files changed, 96 insertions(+), 101 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index d5b7bc4cf87..f646ffecb15 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -608,13 +608,7 @@ LOOP2: } c.observerLastTick = atomic.NewTime(time.Time{}) - c.redoDDLMgr, err = redo.NewDDLManager(cancelCtx, c.id, c.latestInfo.Config.Consistent, ddlStartTs) - failpoint.Inject("ChangefeedNewRedoManagerError", func() { - err = errors.New("changefeed new redo manager injected error") - }) - if err != nil { - return err - } + c.redoDDLMgr = redo.NewDDLManager(c.id, c.latestInfo.Config.Consistent, ddlStartTs) if c.redoDDLMgr.Enabled() { c.wg.Add(1) go func() { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 5a148240f80..d906e8eddee 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -606,10 +606,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { } p.latestInfo.Config.Sink.TiDBSourceID = sourceID - p.redo.r, err = redo.NewDMLManager(prcCtx, p.changefeedID, p.latestInfo.Config.Consistent) - if err != nil { - return err - } + p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent) p.redo.name = "RedoManager" p.redo.changefeedID = p.changefeedID p.redo.spawn(prcCtx) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 0fc95448916..74bad489423 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -20,7 +20,6 @@ import ( "time" "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo/common" @@ -65,20 +64,17 @@ func NewDisabledDDLManager() *ddlManager { // NewDDLManager creates a new ddl Manager. func NewDDLManager( - ctx context.Context, changefeedID model.ChangeFeedID, + changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, ddlStartTs model.Ts, -) (*ddlManager, error) { - logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoDDLLogFileType) - if err != nil { - return nil, err - } +) *ddlManager { + m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType) span := spanz.TableIDToComparableSpan(0) - logManager.AddTable(span, ddlStartTs) + m.AddTable(span, ddlStartTs) return &ddlManager{ - logManager: logManager, - // The current fakeSpan is meaningless, find a meaningful sapn in the future. + logManager: m, + // The current fakeSpan is meaningless, find a meaningful span in the future. fakeSpan: span, - }, nil + } } type ddlManager struct { @@ -115,14 +111,12 @@ type DMLManager interface { } // NewDMLManager creates a new dml Manager. -func NewDMLManager(ctx context.Context, changefeedID model.ChangeFeedID, +func NewDMLManager(changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, -) (*dmlManager, error) { - logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoRowLogFileType) - if err != nil { - return nil, err +) *dmlManager { + return &dmlManager{ + logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType), } - return &dmlManager{logManager: logManager}, nil } // NewDisabledDMLManager creates a disabled dml Manager. @@ -206,6 +200,8 @@ type logManager struct { cfg *writer.LogWriterConfig writer writer.RedoLogWriter + initialized atomic.Bool + rwlock sync.RWMutex // TODO: remove logBuffer and use writer directly after file logWriter is deprecated. logBuffer *chann.DrainableChann[cacheEvents] @@ -228,29 +224,22 @@ type logManager struct { } func newLogManager( - ctx context.Context, changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, logType string, -) (*logManager, error) { +) *logManager { // return a disabled Manager if no consistent config or normal consistent level if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) { - return &logManager{enabled: false}, nil + return &logManager{enabled: false} } - uri, err := storage.ParseRawURL(cfg.Storage) - if err != nil { - return nil, err - } - m := &logManager{ + return &logManager{ enabled: true, cfg: &writer.LogWriterConfig{ - ConsistentConfig: *cfg, - LogType: logType, - CaptureID: config.GetGlobalServerConfig().AdvertiseAddr, - ChangeFeedID: changefeedID, - URI: *uri, - UseExternalStorage: redo.IsExternalStorage(uri.Scheme), - MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, + ConsistentConfig: *cfg, + LogType: logType, + CaptureID: config.GetGlobalServerConfig().AdvertiseAddr, + ChangeFeedID: changefeedID, + MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte, }, logBuffer: chann.NewAutoDrainChann[cacheEvents](), rtsMap: spanz.SyncMap{}, @@ -263,21 +252,35 @@ func newLogManager( metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio. WithLabelValues(changefeedID.Namespace, changefeedID.ID), } +} - m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg) +// Initialized return true if the log manager is fully initialized, +// which means the external storage is accessible, and all running resource allocated. +func (m *logManager) Initialized() bool { + return m.initialized.Load() +} + +func (m *logManager) preStart(ctx context.Context) error { + w, err := factory.NewRedoLogWriter(ctx, m.cfg) if err != nil { - return nil, err + return err } - return m, nil + m.writer = w + m.initialized.Store(true) + return nil } // Run implements pkg/util.Runnable. func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error { - if m.Enabled() { - defer m.close() - return m.bgUpdateLog(ctx) + if !m.Enabled() { + return nil } - return nil + + defer m.close() + if err := m.preStart(ctx); err != nil { + return err + } + return m.bgUpdateLog(ctx) } // WaitForReady implements pkg/util.Runnable. diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index 0a85e01ba8a..bb4cd22fdca 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -121,15 +121,11 @@ func TestLogManagerInProcessor(t *testing.T) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } - dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - dmlMgr.Run(ctx) - }() - + dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group + eg.Go(func() error { + return dmlMgr.Run(ctx) + }) // check emit row changed events can move forward resolved ts spans := []tablepb.Span{ spanz.TableIDToComparableSpan(53), @@ -202,7 +198,7 @@ func TestLogManagerInProcessor(t *testing.T) { checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs) cancel() - wg.Wait() + require.ErrorIs(t, eg.Wait(), context.Canceled) } testWriteDMLs("blackhole://", true) @@ -233,18 +229,16 @@ func TestLogManagerInOwner(t *testing.T) { UseFileBackend: useFileBackend, } startTs := model.Ts(10) - ddlMgr, err := NewDDLManager(ctx, model.DefaultChangeFeedID("test"), cfg, startTs) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - ddlMgr.Run(ctx) - }() + ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs) + + var eg errgroup.Group + eg.Go(func() error { + return ddlMgr.Run(ctx) + }) require.Equal(t, startTs, ddlMgr.GetResolvedTs()) ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"} - err = ddlMgr.EmitDDLEvent(ctx, ddl) + err := ddlMgr.EmitDDLEvent(ctx, ddl) require.NoError(t, err) require.Equal(t, startTs, ddlMgr.GetResolvedTs()) @@ -252,7 +246,7 @@ func TestLogManagerInOwner(t *testing.T) { checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs) cancel() - wg.Wait() + require.ErrorIs(t, eg.Wait(), context.Canceled) } testWriteDDLs("blackhole://", true) @@ -278,20 +272,15 @@ func TestLogManagerError(t *testing.T) { Storage: "blackhole://", FlushIntervalInMs: redo.MinFlushIntervalInMs, } - logMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg) - require.NoError(t, err) - err = logMgr.writer.Close() + logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + err := logMgr.writer.Close() require.NoError(t, err) logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := logMgr.Run(ctx) - require.Regexp(t, ".*invalid black hole writer.*", err) - require.Regexp(t, ".*WriteLog.*", err) - }() + var eg errgroup.Group + eg.Go(func() error { + return logMgr.Run(ctx) + }) testCases := []struct { span tablepb.Span @@ -310,7 +299,10 @@ func TestLogManagerError(t *testing.T) { err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...) require.NoError(t, err) } - wg.Wait() + + err = eg.Wait() + require.Regexp(t, ".*invalid black hole writer.*", err) + require.Regexp(t, ".*WriteLog.*", err) } func BenchmarkBlackhole(b *testing.B) { @@ -336,9 +328,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { FlushIntervalInMs: redo.MinFlushIntervalInMs, UseFileBackend: useFileBackend, } - dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg) - require.Nil(b, err) - eg := errgroup.Group{} + dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) + var eg errgroup.Group eg.Go(func() error { return dmlMgr.Run(ctx) }) @@ -366,7 +357,7 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { go func(span tablepb.Span) { defer wg.Done() maxCommitTs := maxTsMap.GetV(span) - rows := []*model.RowChangedEvent{} + var rows []*model.RowChangedEvent for i := 0; i < maxRowCount; i++ { if i%100 == 0 { // prepare new row change events @@ -409,6 +400,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) { time.Sleep(time.Millisecond * 500) } cancel() - err = eg.Wait() - require.ErrorIs(b, err, context.Canceled) + + require.ErrorIs(b, eg.Wait(), context.Canceled) } diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index de4844659e8..675b96dcfd7 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -103,7 +103,7 @@ func TestInitAndWriteMeta(t *testing.T) { require.True(t, ret, "file %s should not be removed", fileName) } - testWriteMeta(t, ctx, m) + testWriteMeta(ctx, t, m) cancel() require.ErrorIs(t, eg.Wait(), context.Canceled) @@ -175,13 +175,13 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { require.NoError(t, err) require.False(t, ret, "file %s should be removed", fileName) } - testWriteMeta(t, ctx, m) + testWriteMeta(ctx, t, m) cancel() require.ErrorIs(t, eg.Wait(), context.Canceled) } -func testWriteMeta(t *testing.T, ctx context.Context, m *metaManager) { +func testWriteMeta(ctx context.Context, t *testing.T, m *metaManager) { checkMeta := func(targetCheckpointTs, targetResolvedTs uint64) { var checkpointTs, resolvedTs uint64 var metas []*common.LogMeta diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index 07744a884c5..0ed90284e06 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -232,6 +232,7 @@ func sortAndWriteFile( fileName string, cfg *readerConfig, ) error { sortedName := getSortedFileName(fileName) + // todo: add uri writerCfg := &writer.LogWriterConfig{ Dir: cfg.dir, MaxLogSizeInBytes: math.MaxInt32, diff --git a/cdc/redo/writer/factory/factory.go b/cdc/redo/writer/factory/factory.go index 7684b3f06f0..5bf01c6c8bc 100644 --- a/cdc/redo/writer/factory/factory.go +++ b/cdc/redo/writer/factory/factory.go @@ -16,6 +16,7 @@ package factory import ( "context" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/cdc/redo/writer/file" @@ -28,12 +29,19 @@ import ( func NewRedoLogWriter( ctx context.Context, lwCfg *writer.LogWriterConfig, ) (writer.RedoLogWriter, error) { - scheme := lwCfg.URI.Scheme - if !redo.IsValidConsistentStorage(scheme) { - return nil, errors.ErrConsistentStorage.GenWithStackByArgs(scheme) + uri, err := storage.ParseRawURL(lwCfg.Storage) + if err != nil { + return nil, err } - if redo.IsBlackholeStorage(scheme) { + if !redo.IsValidConsistentStorage(uri.Scheme) { + return nil, errors.ErrConsistentStorage.GenWithStackByArgs(uri.Scheme) + } + + lwCfg.URI = uri + lwCfg.UseExternalStorage = redo.IsExternalStorage(uri.Scheme) + + if redo.IsBlackholeStorage(uri.Scheme) { return blackhole.NewLogWriter(), nil } if lwCfg.UseFileBackend { diff --git a/cdc/redo/writer/file/file.go b/cdc/redo/writer/file/file.go index e895bffe8c6..e12a5df59de 100644 --- a/cdc/redo/writer/file/file.go +++ b/cdc/redo/writer/file/file.go @@ -86,7 +86,7 @@ func NewFileWriter( var extStorage storage.ExternalStorage if cfg.UseExternalStorage { var err error - extStorage, err = redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err = redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } @@ -129,7 +129,7 @@ func NewFileWriter( // if we use S3 as the remote storage, a file allocator can be leveraged to // pre-allocate files for us. // TODO: test whether this improvement can also be applied to NFS. - if cfg.UseExternalStorage { + if w.cfg.UseExternalStorage { w.allocator = fsutil.NewFileAllocator(cfg.Dir, cfg.LogType, cfg.MaxLogSizeInBytes) } diff --git a/cdc/redo/writer/memory/mem_log_writer.go b/cdc/redo/writer/memory/mem_log_writer.go index a47a83e855b..66a41873046 100644 --- a/cdc/redo/writer/memory/mem_log_writer.go +++ b/cdc/redo/writer/memory/mem_log_writer.go @@ -52,13 +52,14 @@ func NewLogWriter( return nil, errors.WrapError(errors.ErrRedoConfigInvalid, errors.New("invalid LogWriterConfig")) } + // "nfs" and "local" scheme are converted to "file" scheme if !cfg.UseExternalStorage { - redo.FixLocalScheme(&cfg.URI) + redo.FixLocalScheme(cfg.URI) cfg.UseExternalStorage = redo.IsExternalStorage(cfg.URI.Scheme) } - extStorage, err := redo.InitExternalStorage(ctx, cfg.URI) + extStorage, err := redo.InitExternalStorage(ctx, *cfg.URI) if err != nil { return nil, err } diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 60ea189a875..bba18c8068f 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -61,7 +61,7 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { LogType: redo.RedoDDLLogFileType, CaptureID: "test-capture", ChangeFeedID: model.DefaultChangeFeedID("test-changefeed"), - URI: *uri, + URI: uri, UseExternalStorage: true, MaxLogSizeInBytes: 10 * redo.Megabyte, } diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 18779997b30..42ac64d4ff6 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -52,7 +52,7 @@ type LogWriterConfig struct { CaptureID model.CaptureID ChangeFeedID model.ChangeFeedID - URI url.URL + URI *url.URL UseExternalStorage bool Dir string MaxLogSizeInBytes int64 From 767f96d622aff532e5c2ddef2a1069af1f21e03b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 19 Oct 2023 16:34:04 +0800 Subject: [PATCH 07/25] fix unit test. --- cdc/processor/processor_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 4b2e006f6de..042e243a24a 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -71,14 +71,13 @@ func newProcessor4Test( } else { tmpDir := t.TempDir() redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID) - dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{ + dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{ Level: string(redoPkg.ConsistentLevelEventual), MaxLogSize: redoPkg.DefaultMaxLogSize, FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, Storage: "file://" + redoDir, UseFileBackend: false, }) - require.NoError(t, err) p.redo.r = dmlMgr } p.redo.name = "RedoManager" From ee674b6c1fb354933af84644410cbb38f64b82e5 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 19 Oct 2023 17:40:50 +0800 Subject: [PATCH 08/25] set resolved ts after the redo meta manager is running. --- cdc/api/v2/api_helpers.go | 12 ++++++++++++ cdc/owner/changefeed.go | 9 +++++++-- cdc/redo/manager.go | 28 +++++++++------------------- cdc/redo/meta_manager.go | 25 +++++++++++++++++-------- pkg/redo/config.go | 4 ++++ 5 files changed, 49 insertions(+), 29 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index f84037747f6..2da4238aa43 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/storage" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/controller" "github.com/pingcap/tiflow/cdc/entry" @@ -34,6 +35,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/txnutil/gc" @@ -231,6 +233,16 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( } } + if redo.IsConsistentEnabled(replicaCfg.Consistent.Level) { + uri, err := storage.ParseRawURL(replicaCfg.Consistent.Storage) + if err != nil { + return nil, errors.Trace(err) + } + if err := redo.ValidateStorage(uri); err != nil { + return nil, errors.Trace(err) + } + } + // verify sink if err := validator.Validate(ctx, model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID}, diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index f646ffecb15..322ef9cb2de 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -369,6 +369,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, default: } + if c.redoMetaMgr.Enabled() { + if !c.redoMetaMgr.Running() { + return 0, 0, nil + } + c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs + } + // TODO: pass table checkpointTs when we support concurrent process ddl allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { @@ -619,8 +626,6 @@ LOOP2: c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs) if c.redoMetaMgr.Enabled() { - // todo: how to initialize this resolved ts ? - c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs c.wg.Add(1) go func() { defer c.wg.Done() diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 74bad489423..e1e2427bb08 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -200,8 +200,6 @@ type logManager struct { cfg *writer.LogWriterConfig writer writer.RedoLogWriter - initialized atomic.Bool - rwlock sync.RWMutex // TODO: remove logBuffer and use writer directly after file logWriter is deprecated. logBuffer *chann.DrainableChann[cacheEvents] @@ -254,22 +252,6 @@ func newLogManager( } } -// Initialized return true if the log manager is fully initialized, -// which means the external storage is accessible, and all running resource allocated. -func (m *logManager) Initialized() bool { - return m.initialized.Load() -} - -func (m *logManager) preStart(ctx context.Context) error { - w, err := factory.NewRedoLogWriter(ctx, m.cfg) - if err != nil { - return err - } - m.writer = w - m.initialized.Store(true) - return nil -} - // Run implements pkg/util.Runnable. func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error { if !m.Enabled() { @@ -277,9 +259,17 @@ func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error { } defer m.close() - if err := m.preStart(ctx); err != nil { + start := time.Now() + w, err := factory.NewRedoLogWriter(ctx, m.cfg) + if err != nil { + log.Error("redo: failed to create redo log writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) return err } + m.writer = w return m.bgUpdateLog(ctx) } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index b7f38d72e78..9946ae63e4e 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -47,6 +47,9 @@ type MetaManager interface { // Cleanup deletes all redo logs, which are only called from the owner // when changefeed is deleted. Cleanup(ctx context.Context) error + + // Running return true if the meta manager is running or not. + Running() bool } type metaManager struct { @@ -54,9 +57,8 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool - // initialized means the meta manager now works normally. - // todo: how to use this fields ? - initialized atomic.Bool + // running means the meta manager now running normally. + running atomic.Bool metaCheckpointTs statefulRts metaResolvedTs statefulRts @@ -105,10 +107,10 @@ func (m *metaManager) Enabled() bool { return m.enabled } -// Initialized return whether the meta manager is initialized, +// Running return whether the meta manager is initialized, // which means the external storage is accessible to the meta manager. -func (m *metaManager) Initialized() bool { - return m.initialized.Load() +func (m *metaManager) Running() bool { + return m.running.Load() } func (m *metaManager) preStart(ctx context.Context) error { @@ -142,7 +144,6 @@ func (m *metaManager) preStart(ctx context.Context) error { zap.Error(err)) return err } - return nil } @@ -159,7 +160,7 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { return m.bgGC(egCtx) }) - m.initialized.Store(true) + m.running.Store(true) return eg.Wait() } @@ -246,6 +247,14 @@ func (m *metaManager) initMeta(ctx context.Context) error { if err := m.maybeFlushMeta(ctx); err != nil { return errors.WrapError(errors.ErrRedoMetaInitialize, err) } + + flushedMeta := m.GetFlushedMeta() + log.Info("redo: meta manager flush init meta success", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Uint64("checkpointTs", flushedMeta.CheckpointTs), + zap.Uint64("resolvedTs", flushedMeta.ResolvedTs)) + return util.DeleteFilesInExtStorage(ctx, m.extStorage, toRemoveMetaFiles) } diff --git a/pkg/redo/config.go b/pkg/redo/config.go index fbcdb3f1eb3..dd2e3ec0997 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -220,6 +220,10 @@ func ValidateStorage(uri *url.URL) error { return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, fmt.Sprintf("can't make dir for new redo log: %+v", uri))) } + + // todo: add read and write permission check + // read, write, list, mkdir, rmdir + return nil } From 91b075a23b2321978345ebd4d2f15c6ea97d242b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 19 Oct 2023 18:02:25 +0800 Subject: [PATCH 09/25] fix scheduler. --- .../internal/v3/replication/replication_manager_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 6cbfab08396..a19ef082dd7 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -613,6 +613,10 @@ type mockRedoMetaManager struct { enable bool } +func (m *mockRedoMetaManager) Running() bool { + return true +} + func (m *mockRedoMetaManager) UpdateMeta(checkpointTs, resolvedTs model.Ts) { } From 3c1b288de0d1125ab1e6d29cecd238985da8f891 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 20 Oct 2023 11:53:56 +0800 Subject: [PATCH 10/25] fix one unit test. --- cdc/redo/manager_test.go | 9 +---- .../writer/blackhole/blackhole_log_writer.go | 37 +++++++------------ cdc/redo/writer/factory/factory.go | 5 ++- pkg/redo/config.go | 2 +- 4 files changed, 20 insertions(+), 33 deletions(-) diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index bb4cd22fdca..14db8ffc251 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo/writer" - "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" @@ -269,14 +268,10 @@ func TestLogManagerError(t *testing.T) { cfg := &config.ConsistentConfig{ Level: string(redo.ConsistentLevelEventual), MaxLogSize: redo.DefaultMaxLogSize, - Storage: "blackhole://", + Storage: "blackhole-invalid://", FlushIntervalInMs: redo.MinFlushIntervalInMs, } logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg) - err := logMgr.writer.Close() - require.NoError(t, err) - logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer) - var eg errgroup.Group eg.Go(func() error { return logMgr.Run(ctx) @@ -300,7 +295,7 @@ func TestLogManagerError(t *testing.T) { require.NoError(t, err) } - err = eg.Wait() + err := eg.Wait() require.Regexp(t, ".*invalid black hole writer.*", err) require.Regexp(t, ".*WriteLog.*", err) } diff --git a/cdc/redo/writer/blackhole/blackhole_log_writer.go b/cdc/redo/writer/blackhole/blackhole_log_writer.go index 24cc03bd3df..9a877f262f3 100644 --- a/cdc/redo/writer/blackhole/blackhole_log_writer.go +++ b/cdc/redo/writer/blackhole/blackhole_log_writer.go @@ -26,14 +26,21 @@ var _ writer.RedoLogWriter = (*blackHoleWriter)(nil) // blackHoleSink defines a blackHole storage, it receives events and persists // without any latency -type blackHoleWriter struct{} +type blackHoleWriter struct { + invalid bool +} // NewLogWriter creates a blackHole writer -func NewLogWriter() *blackHoleWriter { - return &blackHoleWriter{} +func NewLogWriter(invalid bool) *blackHoleWriter { + return &blackHoleWriter{ + invalid: invalid, + } } func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoEvent) (err error) { + if bs.invalid { + return errors.New("[WriteLog] invalid black hole writer") + } if len(events) == 0 { return nil } @@ -45,30 +52,12 @@ func (bs *blackHoleWriter) WriteEvents(_ context.Context, events ...writer.RedoE } func (bs *blackHoleWriter) FlushLog(_ context.Context) error { + if bs.invalid { + return errors.New("[FlushLog] invalid black hole writer") + } return nil } func (bs *blackHoleWriter) Close() error { return nil } - -type invalidBlackHoleWriter struct { - *blackHoleWriter -} - -// NewInvalidLogWriter creates a invalid blackHole writer -func NewInvalidLogWriter(rl writer.RedoLogWriter) *invalidBlackHoleWriter { - return &invalidBlackHoleWriter{ - blackHoleWriter: rl.(*blackHoleWriter), - } -} - -func (ibs *invalidBlackHoleWriter) WriteEvents( - _ context.Context, _ ...writer.RedoEvent, -) (err error) { - return errors.New("[WriteLog] invalid black hole writer") -} - -func (ibs *invalidBlackHoleWriter) FlushLog(_ context.Context) error { - return errors.New("[FlushLog] invalid black hole writer") -} diff --git a/cdc/redo/writer/factory/factory.go b/cdc/redo/writer/factory/factory.go index 5bf01c6c8bc..796d803ae45 100644 --- a/cdc/redo/writer/factory/factory.go +++ b/cdc/redo/writer/factory/factory.go @@ -15,6 +15,7 @@ package factory import ( "context" + "strings" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/redo/writer" @@ -42,8 +43,10 @@ func NewRedoLogWriter( lwCfg.UseExternalStorage = redo.IsExternalStorage(uri.Scheme) if redo.IsBlackholeStorage(uri.Scheme) { - return blackhole.NewLogWriter(), nil + invalid := strings.HasSuffix(uri.Scheme, "invalid") + return blackhole.NewLogWriter(invalid), nil } + if lwCfg.UseFileBackend { return file.NewLogWriter(ctx, lwCfg) } diff --git a/pkg/redo/config.go b/pkg/redo/config.go index dd2e3ec0997..cd019696400 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -172,7 +172,7 @@ func FixLocalScheme(uri *url.URL) { // IsBlackholeStorage returns whether a blackhole storage is used. func IsBlackholeStorage(scheme string) bool { - return ConsistentStorage(scheme) == consistentStorageBlackhole + return strings.HasPrefix(scheme, string(consistentStorageBlackhole)) } // InitExternalStorage init an external storage. From 53ea3e5cb03a128abed9c63f5ab1060d7db1a36e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 20 Oct 2023 14:12:42 +0800 Subject: [PATCH 11/25] fix redo unit tests. --- cdc/owner/changefeed_test.go | 1 + cdc/redo/meta_manager.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index ffb4725f0c5..14585660e64 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -594,6 +594,7 @@ func testChangefeedReleaseResource( CfID: cf.id, Type: model.AdminRemove, }) + cf.isReleased = false // changefeed tick will release resources cf.Tick(ctx, state.Info, state.Status, captures) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 9946ae63e4e..65c262d6514 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -314,8 +314,19 @@ func (m *metaManager) shouldRemoved(path string, checkPointTs uint64) bool { // deleteAllLogs delete all redo logs and leave a deleted mark. func (m *metaManager) deleteAllLogs(ctx context.Context) error { + // when one changefeed with redo enabled gets deleted, it's extStorage should always be set to not nil + // otherwise it should have already meet panic during changefeed running time. + // the extStorage may be nil in the unit test, so just set the external storage to make unit test happy. if m.extStorage == nil { - return nil + uri, err := storage.ParseRawURL(m.cfg.Storage) + redo.FixLocalScheme(uri) + if err != nil { + return err + } + m.extStorage, err = redo.InitExternalStorage(ctx, *uri) + if err != nil { + return err + } } // Write deleted mark before clean any files. deleteMarker := getDeletedChangefeedMarker(m.changeFeedID) From 99f29346200755092774a794b3c087b08b7eda7d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 20 Oct 2023 15:44:23 +0800 Subject: [PATCH 12/25] fix consumer panic since meet previous DDL --- cmd/kafka-consumer/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index b160d423b54..3a14b5e742d 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -773,10 +773,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { defer c.ddlListMu.Unlock() // DDL CommitTs fallback, just crash it to indicate the bug. if c.ddlWithMaxCommitTs != nil && ddl.CommitTs < c.ddlWithMaxCommitTs.CommitTs { - log.Panic("DDL CommitTs < maxCommitTsDDL.CommitTs", + log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.Uint64("maxCommitTs", c.ddlWithMaxCommitTs.CommitTs), zap.Any("DDL", ddl)) + return } // A rename tables DDL job contains multiple DDL events with same CommitTs. From 57890c07b3f24a718b43c875b657ae91a6f9dcb6 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 20 Oct 2023 16:32:51 +0800 Subject: [PATCH 13/25] also return the failpoint error. --- cdc/redo/manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index e1e2427bb08..1069a73bfac 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -254,6 +255,9 @@ func newLogManager( // Run implements pkg/util.Runnable. func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error { + failpoint.Inject("ChangefeedNewRedoManagerError", func() { + failpoint.Return(errors.New("changefeed new redo manager injected error")) + }) if !m.Enabled() { return nil } From 0eda809a1a6359324af0212927d42bd6f70385ae Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 20 Oct 2023 17:38:35 +0800 Subject: [PATCH 14/25] fix redo failpoint error. --- tests/integration_tests/changefeed_error/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 5065d147875..f48b72fbf4d 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -108,7 +108,7 @@ function run() { cleanup_process $CDC_BINARY # make sure initialize changefeed error will not stuck the owner - export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)' + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/redo/ChangefeedNewRedoManagerError=2*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY changefeedid_3="changefeed-initialize-error" From 2c86265e057a0d383b3604940aa25b7c26d1343d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 20 Oct 2023 18:02:00 +0800 Subject: [PATCH 15/25] also consider whether the storage location is write read able. --- cdc/api/v2/api_helpers.go | 12 ------------ pkg/redo/config.go | 13 +++++++++++-- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 2da4238aa43..f84037747f6 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -23,7 +23,6 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/storage" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/controller" "github.com/pingcap/tiflow/cdc/entry" @@ -35,7 +34,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" - "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/txnutil/gc" @@ -233,16 +231,6 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( } } - if redo.IsConsistentEnabled(replicaCfg.Consistent.Level) { - uri, err := storage.ParseRawURL(replicaCfg.Consistent.Storage) - if err != nil { - return nil, errors.Trace(err) - } - if err := redo.ValidateStorage(uri); err != nil { - return nil, errors.Trace(err) - } - } - // verify sink if err := validator.Validate(ctx, model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID}, diff --git a/pkg/redo/config.go b/pkg/redo/config.go index cd019696400..c5ba1716bf7 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -215,15 +215,24 @@ func ValidateStorage(uri *url.URL) error { return err } + // todo: the default dir mode is set as 755, is this ok? err := os.MkdirAll(uri.Path, DefaultDirMode) if err != nil { return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, fmt.Sprintf("can't make dir for new redo log: %+v", uri))) } - // todo: add read and write permission check - // read, write, list, mkdir, rmdir + file := filepath.Join(uri.Path, "file.test") + if err := os.WriteFile(file, []byte(""), DefaultFileMode); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't write file for new redo log: %+v", uri))) + } + if _, err := os.ReadFile(file); err != nil { + return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, + fmt.Sprintf("can't read file for new redo log: %+v", uri))) + } + _ = os.Remove(file) return nil } From ce4968265584fd3405389ccc06d572b7bde85af7 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 20 Oct 2023 18:03:52 +0800 Subject: [PATCH 16/25] remove todo. --- cdc/redo/reader/file.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index 0ed90284e06..07744a884c5 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -232,7 +232,6 @@ func sortAndWriteFile( fileName string, cfg *readerConfig, ) error { sortedName := getSortedFileName(fileName) - // todo: add uri writerCfg := &writer.LogWriterConfig{ Dir: cfg.dir, MaxLogSizeInBytes: math.MaxInt32, From 0a7b94d2d710c4adf2d56cc4776e6b581e6e6b33 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 23 Oct 2023 16:05:23 +0800 Subject: [PATCH 17/25] fix redo. --- cdc/redo/meta_manager.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 65c262d6514..a3c6c3f6b8b 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -130,15 +130,17 @@ func (m *metaManager) preStart(ctx context.Context) error { m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram. WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID) - if err := m.preCleanupExtStorage(ctx); err != nil { - log.Warn("pre clean redo logs fail", + err = m.preCleanupExtStorage(ctx) + if err != nil { + log.Warn("redo: pre clean redo logs fail", zap.String("namespace", m.changeFeedID.Namespace), zap.String("changefeed", m.changeFeedID.ID), zap.Error(err)) return err } - if err := m.initMeta(ctx); err != nil { - log.Warn("init redo meta fail", + err = m.initMeta(ctx) + if err != nil { + log.Warn("redo: init redo meta fail", zap.String("namespace", m.changeFeedID.Namespace), zap.String("changefeed", m.changeFeedID.ID), zap.Error(err)) @@ -207,7 +209,10 @@ func (m *metaManager) initMeta(ctx context.Context) error { } var toRemoveMetaFiles []string err := m.extStorage.WalkDir(ctx, nil, func(path string, size int64) error { - log.Info("redo: meta manager walk dir", zap.String("path", path), zap.Int64("size", size)) + log.Info("redo: meta manager walk dir", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Int64("size", size)) // TODO: use prefix to accelerate traverse operation if !strings.HasSuffix(path, redo.MetaEXT) { return nil @@ -216,7 +221,10 @@ func (m *metaManager) initMeta(ctx context.Context) error { data, err := m.extStorage.ReadFile(ctx, path) if err != nil { - log.Warn("redo: read meta file failed", zap.String("path", path), zap.Error(err)) + log.Warn("redo: read meta file failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.String("path", path), zap.Error(err)) if !util.IsNotExistInExtStorage(err) { return err } @@ -225,7 +233,10 @@ func (m *metaManager) initMeta(ctx context.Context) error { var meta common.LogMeta _, err = meta.UnmarshalMsg(data) if err != nil { - log.Error("redo: unmarshal meta data failed", zap.Error(err), zap.ByteString("data", data)) + log.Error("redo: unmarshal meta data failed", + zap.String("namespace", m.changeFeedID.Namespace), + zap.String("changefeed", m.changeFeedID.ID), + zap.Error(err), zap.ByteString("data", data)) return err } metas = append(metas, &meta) From f832f2bbf84d38690e6bb3ce0daa89d6bbce080b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 25 Oct 2023 16:22:01 +0800 Subject: [PATCH 18/25] add logs to help debug. --- cdc/scheduler/internal/v3/agent/agent.go | 8 ++++++++ cdc/scheduler/internal/v3/agent/table.go | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 79b640ea6e0..5d761362f79 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -279,6 +279,14 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (*schedule status.State = tablepb.TableStateStopping } result = append(result, status) + + log.Info("schedulerv3: agent collect table status", + zap.String("namespace", a.ChangeFeedID.Namespace), + zap.String("changefeed", a.ChangeFeedID.ID), + zap.Int64("tableID", span.TableID), + zap.Uint64("checkpoint", status.Checkpoint.CheckpointTs), + zap.Uint64("resolved", status.Checkpoint.ResolvedTs), + zap.Any("state", status.State)) return true }) for _, span := range request.GetSpans() { diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index be1f7ec929b..af69d954534 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -311,7 +311,7 @@ func newTableSpanManager( func (tm *tableSpanManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) var err error - toBeDropped := []tablepb.Span{} + var toBeDropped []tablepb.Span tm.tables.Ascend(func(span tablepb.Span, table *tableSpan) bool { message, err1 := table.poll(ctx) if err != nil { From eeb4a7e2c71b22e4af4c5ebafc67ca70be4140ab Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 26 Oct 2023 17:30:50 +0800 Subject: [PATCH 19/25] revert set resolved ts. --- cdc/owner/changefeed.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index a1225c5e74d..8cb782a772d 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -369,13 +369,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, default: } - if c.redoMetaMgr.Enabled() { - if !c.redoMetaMgr.Running() { - return 0, 0, nil - } - c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs - } - // TODO: pass table checkpointTs when we support concurrent process ddl allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { From 42eebf2138ff2bab0633030bdfce7c16301dc54c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 26 Oct 2023 17:41:54 +0800 Subject: [PATCH 20/25] remove debug log. --- cdc/scheduler/internal/v3/agent/agent.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 5d761362f79..79b640ea6e0 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -279,14 +279,6 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (*schedule status.State = tablepb.TableStateStopping } result = append(result, status) - - log.Info("schedulerv3: agent collect table status", - zap.String("namespace", a.ChangeFeedID.Namespace), - zap.String("changefeed", a.ChangeFeedID.ID), - zap.Int64("tableID", span.TableID), - zap.Uint64("checkpoint", status.Checkpoint.CheckpointTs), - zap.Uint64("resolved", status.Checkpoint.ResolvedTs), - zap.Any("state", status.State)) return true }) for _, span := range request.GetSpans() { From 4305ebd88792e748dd6a66c26ac840f226ccfa0f Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 26 Oct 2023 18:21:53 +0800 Subject: [PATCH 21/25] fix --- cdc/owner/changefeed_test.go | 1 - cdc/redo/manager.go | 2 +- cdc/redo/writer/memory/encoding_worker.go | 2 +- cdc/redo/writer/memory/file_worker.go | 2 +- pkg/redo/config.go | 3 +-- 5 files changed, 4 insertions(+), 6 deletions(-) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 14585660e64..ffb4725f0c5 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -594,7 +594,6 @@ func testChangefeedReleaseResource( CfID: cf.id, Type: model.AdminRemove, }) - cf.isReleased = false // changefeed tick will release resources cf.Tick(ctx, state.Info, state.Status, captures) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 1069a73bfac..5efe7034a65 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -546,7 +546,7 @@ func (m *logManager) close() { atomic.StoreInt32(&m.closed, 1) m.logBuffer.CloseAndDrain() - if err := m.writer.Close(); err != nil { + if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled { log.Error("redo manager fails to close writer", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), zap.String("changefeed", m.cfg.ChangeFeedID.ID), diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 337379f3b50..57b15ce45f3 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -124,7 +124,7 @@ func newEncodingWorkerGroup(workerNum int) *encodingWorkerGroup { func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { defer func() { close(e.closed) - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo fileWorkerGroup closed with error", zap.Error(err)) } }() diff --git a/cdc/redo/writer/memory/file_worker.go b/cdc/redo/writer/memory/file_worker.go index 771888a418d..45b366117c0 100644 --- a/cdc/redo/writer/memory/file_worker.go +++ b/cdc/redo/writer/memory/file_worker.go @@ -138,7 +138,7 @@ func (f *fileWorkerGroup) Run( ) (err error) { defer func() { f.close() - if err != nil { + if err != nil && errors.Cause(err) != context.Canceled { log.Warn("redo file workers closed with error", zap.Error(err)) } }() diff --git a/pkg/redo/config.go b/pkg/redo/config.go index c5ba1716bf7..fc4b6203a47 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -214,8 +214,7 @@ func ValidateStorage(uri *url.URL) error { _, err := initExternalStorageForTest(ctx, *uri) return err } - - // todo: the default dir mode is set as 755, is this ok? + err := os.MkdirAll(uri.Path, DefaultDirMode) if err != nil { return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, From 2477544f111c703b7278e3b0e9777e8cf81acf1e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 27 Oct 2023 12:02:42 +0800 Subject: [PATCH 22/25] fix make fmt. --- pkg/redo/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redo/config.go b/pkg/redo/config.go index fc4b6203a47..82c2e6f3457 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -214,7 +214,7 @@ func ValidateStorage(uri *url.URL) error { _, err := initExternalStorageForTest(ctx, *uri) return err } - + err := os.MkdirAll(uri.Path, DefaultDirMode) if err != nil { return errors.WrapError(errors.ErrStorageInitialize, errors.Annotate(err, From b497ac96069db7cbcdbaffac6ba7c1abea4ff252 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 27 Oct 2023 16:13:59 +0800 Subject: [PATCH 23/25] fix review comment. --- cdc/redo/manager.go | 12 +++++++----- cdc/redo/meta_manager.go | 15 --------------- .../v3/replication/replication_manager_test.go | 4 ---- 3 files changed, 7 insertions(+), 24 deletions(-) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 5efe7034a65..1c6f2bd3610 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -546,11 +546,13 @@ func (m *logManager) close() { atomic.StoreInt32(&m.closed, 1) m.logBuffer.CloseAndDrain() - if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled { - log.Error("redo manager fails to close writer", - zap.String("namespace", m.cfg.ChangeFeedID.Namespace), - zap.String("changefeed", m.cfg.ChangeFeedID.ID), - zap.Error(err)) + if m.writer != nil { + if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled { + log.Error("redo manager fails to close writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Error(err)) + } } log.Info("redo manager closed", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index a3c6c3f6b8b..b0366954c65 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/uuid" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -47,9 +46,6 @@ type MetaManager interface { // Cleanup deletes all redo logs, which are only called from the owner // when changefeed is deleted. Cleanup(ctx context.Context) error - - // Running return true if the meta manager is running or not. - Running() bool } type metaManager struct { @@ -57,9 +53,6 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool - // running means the meta manager now running normally. - running atomic.Bool - metaCheckpointTs statefulRts metaResolvedTs statefulRts @@ -107,12 +100,6 @@ func (m *metaManager) Enabled() bool { return m.enabled } -// Running return whether the meta manager is initialized, -// which means the external storage is accessible to the meta manager. -func (m *metaManager) Running() bool { - return m.running.Load() -} - func (m *metaManager) preStart(ctx context.Context) error { uri, err := storage.ParseRawURL(m.cfg.Storage) if err != nil { @@ -161,8 +148,6 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { eg.Go(func() error { return m.bgGC(egCtx) }) - - m.running.Store(true) return eg.Wait() } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 733e5df64cb..0584e62972c 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -586,10 +586,6 @@ type mockRedoMetaManager struct { enable bool } -func (m *mockRedoMetaManager) Running() bool { - return true -} - func (m *mockRedoMetaManager) UpdateMeta(checkpointTs, resolvedTs model.Ts) { } From 5a6465692c7bdc031a311f87a415cae2a96c7bf1 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 27 Oct 2023 16:24:54 +0800 Subject: [PATCH 24/25] still use running to skip tick. --- cdc/owner/changefeed.go | 6 ++++++ cdc/redo/meta_manager.go | 15 +++++++++++++++ .../v3/replication/replication_manager_test.go | 4 ++++ 3 files changed, 25 insertions(+) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 8cb782a772d..1dd0739111f 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -369,6 +369,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, default: } + if c.redoMetaMgr.Enabled() { + if !c.redoMetaMgr.Running() { + return 0, 0, nil + } + } + // TODO: pass table checkpointTs when we support concurrent process ddl allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index b0366954c65..a3c6c3f6b8b 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/uuid" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -46,6 +47,9 @@ type MetaManager interface { // Cleanup deletes all redo logs, which are only called from the owner // when changefeed is deleted. Cleanup(ctx context.Context) error + + // Running return true if the meta manager is running or not. + Running() bool } type metaManager struct { @@ -53,6 +57,9 @@ type metaManager struct { changeFeedID model.ChangeFeedID enabled bool + // running means the meta manager now running normally. + running atomic.Bool + metaCheckpointTs statefulRts metaResolvedTs statefulRts @@ -100,6 +107,12 @@ func (m *metaManager) Enabled() bool { return m.enabled } +// Running return whether the meta manager is initialized, +// which means the external storage is accessible to the meta manager. +func (m *metaManager) Running() bool { + return m.running.Load() +} + func (m *metaManager) preStart(ctx context.Context) error { uri, err := storage.ParseRawURL(m.cfg.Storage) if err != nil { @@ -148,6 +161,8 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { eg.Go(func() error { return m.bgGC(egCtx) }) + + m.running.Store(true) return eg.Wait() } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 0584e62972c..632a0835aa3 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -604,6 +604,10 @@ func (m *mockRedoMetaManager) Enabled() bool { return m.enable } +func (m *mockRedoMetaManager) Running() bool { + return true +} + func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { t.Parallel() r := NewReplicationManager(1, model.ChangeFeedID{}) From ce0df300887f877beefe1b012e28b0f79b9f4844 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 31 Oct 2023 15:36:01 +0800 Subject: [PATCH 25/25] fix ddl start ts not initialized --- cdc/redo/meta_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 30d734f1c6e..eb969aaa0c1 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -100,6 +100,7 @@ func NewMetaManager( uuidGenerator: uuid.NewGenerator(), enabled: true, cfg: cfg, + startTs: checkpoint, flushIntervalInMs: cfg.MetaFlushIntervalInMs, }