Skip to content

Commit

Permalink
redo(ticdc): use meta flush interval in redo ddl manager (#9999)
Browse files Browse the repository at this point in the history
ref #9960, close #9998
  • Loading branch information
CharlesCheung96 committed Nov 6, 2023
1 parent 999e90b commit 09b6870
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
17 changes: 12 additions & 5 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,15 @@ func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
return err
}
m.writer = w
return m.bgUpdateLog(ctx)
return m.bgUpdateLog(ctx, m.getFlushDuration())
}

func (m *logManager) getFlushDuration() time.Duration {
flushIntervalInMs := m.cfg.FlushIntervalInMs
if m.cfg.LogType == redo.RedoDDLLogFileType {
flushIntervalInMs = m.cfg.MetaFlushIntervalInMs
}
return time.Duration(flushIntervalInMs) * time.Millisecond
}

// WaitForReady implements pkg/util.Runnable.
Expand Down Expand Up @@ -483,15 +491,14 @@ func (m *logManager) onResolvedTsMsg(span tablepb.Span, resolvedTs model.Ts) {
}
}

func (m *logManager) bgUpdateLog(ctx context.Context) error {
func (m *logManager) bgUpdateLog(ctx context.Context, flushDuration time.Duration) error {
m.releaseMemoryCbs = make([]func(), 0, 1024)
flushIntervalInMs := m.cfg.FlushIntervalInMs
ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond)
ticker := time.NewTicker(flushDuration)
defer ticker.Stop()
log.Info("redo manager bgUpdateLog is running",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Int64("flushIntervalInMs", flushIntervalInMs))
zap.Duration("flushIntervalInMs", flushDuration))

var err error
// logErrCh is used to retrieve errors from log flushing goroutines.
Expand Down
42 changes: 23 additions & 19 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ func TestLogManagerInProcessor(t *testing.T) {
testWriteDMLs := func(storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(ctx)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
Expand Down Expand Up @@ -221,11 +222,12 @@ func TestLogManagerInOwner(t *testing.T) {
testWriteDDLs := func(storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(ctx)
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
startTs := model.Ts(10)
ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs)
Expand Down Expand Up @@ -266,10 +268,11 @@ func TestLogManagerError(t *testing.T) {
defer cancel()

cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
}
logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
Expand Down Expand Up @@ -317,11 +320,12 @@ func BenchmarkFileWriter(b *testing.B) {
func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
ctx, cancel := context.WithCancel(context.Background())
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: storage,
FlushIntervalInMs: redo.MinFlushIntervalInMs,
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
Expand Down

0 comments on commit 09b6870

Please sign in to comment.