diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 21c2e2457ab..f43f12d78d3 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -435,6 +435,7 @@ func (m *logManager) flushLog( log.Debug("Flush redo log", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.String("logType", m.cfg.LogType), zap.Any("tableRtsMap", tableRtsMap)) err := m.withLock(func(m *logManager) error { return m.writer.FlushLog(ctx) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index f1ad1acac72..5fc437c659d 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -74,6 +74,8 @@ type metaManager struct { lastFlushTime time.Time cfg *config.ConsistentConfig metricFlushLogDuration prometheus.Observer + + flushIntervalInMs int64 } // NewDisabledMetaManager creates a disabled Meta Manager. @@ -93,12 +95,19 @@ func NewMetaManager( } m := &metaManager{ - captureID: config.GetGlobalServerConfig().AdvertiseAddr, - changeFeedID: changefeedID, - uuidGenerator: uuid.NewGenerator(), - enabled: true, - cfg: cfg, - startTs: checkpoint, + captureID: config.GetGlobalServerConfig().AdvertiseAddr, + changeFeedID: changefeedID, + uuidGenerator: uuid.NewGenerator(), + enabled: true, + cfg: cfg, + startTs: checkpoint, + flushIntervalInMs: cfg.MetaFlushIntervalInMs, + } + + if m.flushIntervalInMs < redo.MinFlushIntervalInMs { + log.Warn("redo flush interval is too small, use default value", + zap.Int64("interval", m.flushIntervalInMs)) + m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs } return m } @@ -157,7 +166,7 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error { } eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { - return m.bgFlushMeta(egCtx, m.cfg.FlushIntervalInMs) + return m.bgFlushMeta(egCtx) }) eg.Go(func() error { return m.bgGC(egCtx) @@ -458,8 +467,8 @@ func (m *metaManager) Cleanup(ctx context.Context) error { return m.deleteAllLogs(ctx) } -func (m *metaManager) bgFlushMeta(egCtx context.Context, flushIntervalInMs int64) (err error) { - ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond) +func (m *metaManager) bgFlushMeta(egCtx context.Context) (err error) { + ticker := time.NewTicker(time.Duration(m.flushIntervalInMs) * time.Millisecond) defer func() { ticker.Stop() log.Info("redo metaManager bgFlushMeta exits",