Skip to content

Commit

Permalink
redo(ticdc): fix meta manager wrong flush interval bug (#10031) (#10032)
Browse files Browse the repository at this point in the history
close #10026
  • Loading branch information
ti-chi-bot authored Nov 9, 2023
1 parent a4f5874 commit beb8ddb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
1 change: 1 addition & 0 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ func (m *logManager) flushLog(
log.Debug("Flush redo log",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.String("logType", m.cfg.LogType),
zap.Any("tableRtsMap", tableRtsMap))
err := m.withLock(func(m *logManager) error {
return m.writer.FlushLog(ctx)
Expand Down
27 changes: 18 additions & 9 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type metaManager struct {
lastFlushTime time.Time
cfg *config.ConsistentConfig
metricFlushLogDuration prometheus.Observer

flushIntervalInMs int64
}

// NewDisabledMetaManager creates a disabled Meta Manager.
Expand All @@ -93,12 +95,19 @@ func NewMetaManager(
}

m := &metaManager{
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
flushIntervalInMs: cfg.MetaFlushIntervalInMs,
}

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
}
return m
}
Expand Down Expand Up @@ -157,7 +166,7 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error {
}
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
return m.bgFlushMeta(egCtx, m.cfg.FlushIntervalInMs)
return m.bgFlushMeta(egCtx)
})
eg.Go(func() error {
return m.bgGC(egCtx)
Expand Down Expand Up @@ -458,8 +467,8 @@ func (m *metaManager) Cleanup(ctx context.Context) error {
return m.deleteAllLogs(ctx)
}

func (m *metaManager) bgFlushMeta(egCtx context.Context, flushIntervalInMs int64) (err error) {
ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond)
func (m *metaManager) bgFlushMeta(egCtx context.Context) (err error) {
ticker := time.NewTicker(time.Duration(m.flushIntervalInMs) * time.Millisecond)
defer func() {
ticker.Stop()
log.Info("redo metaManager bgFlushMeta exits",
Expand Down

0 comments on commit beb8ddb

Please sign in to comment.