Skip to content

Commit

Permalink
redo(ticdc): fix redo initialization block the owner (#9887) (#9991)
Browse files Browse the repository at this point in the history
close #9886
  • Loading branch information
ti-chi-bot authored Nov 14, 2023
1 parent e4a2fbc commit 21033a4
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 228 deletions.
35 changes: 11 additions & 24 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -551,32 +557,21 @@ LOOP2:
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoDDLMgr.Run(stdCtx))
ctx.Throw(c.redoDDLMgr.Run(cancelCtx))
}()
}

c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx,
c.state.Info.Config.Consistent, checkpointTs)
if err != nil {
return err
}
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoMetaMgr.Run(stdCtx))
ctx.Throw(c.redoMetaMgr.Run(cancelCtx))
}()
}
log.Info("owner creates redo manager",
Expand Down Expand Up @@ -733,15 +728,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
return
}
c.redoMetaMgr = redoMetaMgr
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0)
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func TestTableActorInterface(t *testing.T) {
require.Equal(t, model.Ts(5), table.ResolvedTs())
ctx, cancel := context.WithCancel(context.Background())
eg, egCtx := errgroup.WithContext(ctx)
table.redoDMLMgr, _ = redo.NewDMLManager(ctx, &config.ConsistentConfig{
changefeedID := model.DefaultChangeFeedID("test")
table.redoDMLMgr = redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoCfg.ConsistentLevelEventual),
FlushIntervalInMs: redoCfg.MinFlushIntervalInMs,
Storage: fmt.Sprintf("file://tmp/%s", t.TempDir()),
Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,10 +805,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
conf := config.GetGlobalServerConfig()
p.pullBasedSinking = conf.Debug.EnablePullBasedSink

p.redoDMLMgr, err = redo.NewDMLManager(stdCtx, p.changefeed.Info.Config.Consistent)
if err != nil {
return err
}
p.redoDMLMgr = redo.NewDMLManager(p.changefeedID, p.changefeed.Info.Config.Consistent)
if p.redoDMLMgr.Enabled() {
p.wg.Add(1)
go func() {
Expand Down
80 changes: 37 additions & 43 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/redo/writer"
Expand Down Expand Up @@ -62,19 +61,17 @@ func NewDisabledDDLManager() *ddlManager {

// NewDDLManager creates a new ddl Manager.
func NewDDLManager(
ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) (*ddlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoDDLLogFileType)
if err != nil {
return nil, err
}
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
tableID := int64(0)
logManager.AddTable(tableID, ddlStartTs)
m.AddTable(tableID, ddlStartTs)
return &ddlManager{
logManager: logManager,
logManager: m,
// The current fakeTableID is meaningless, find a meaningful id in the future.
fakeTableID: tableID,
}, nil
}
}

type ddlManager struct {
Expand Down Expand Up @@ -111,12 +108,12 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
func NewDMLManager(ctx context.Context, cfg *config.ConsistentConfig) (*dmlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoRowLogFileType)
if err != nil {
return nil, err
func NewDMLManager(
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig,
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
}
return &dmlManager{logManager: logManager}, nil
}

// NewDisabledDMLManager creates a disabled dml Manager.
Expand Down Expand Up @@ -222,28 +219,21 @@ type logManager struct {
}

func newLogManager(
ctx context.Context, cfg *config.ConsistentConfig, logType string,
) (*logManager, error) {
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, logType string,
) *logManager {
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &logManager{enabled: false}, nil
return &logManager{enabled: false}
}

uri, err := storage.ParseRawURL(cfg.Storage)
if err != nil {
return nil, err
}
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
m := &logManager{
return &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: contextutil.CaptureAddrFromCtx(ctx),
ChangeFeedID: changefeedID,
URI: *uri,
UseExternalStorage: redo.IsExternalStorage(uri.Scheme),
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
},
logBuffer: chann.NewDrainableChann[cacheEvents](),
metricWriteLogDuration: common.RedoWriteLogDurationHistogram.
Expand All @@ -255,15 +245,17 @@ func newLogManager(
metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
return nil, err
}
return m, nil
}

// Run implements pkg/util.Runnable.
func (m *logManager) Run(ctx context.Context) error {
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
failpoint.Return(errors.New("changefeed new redo manager injected error"))
})
if !m.Enabled() {
return nil
}

defer m.close()
start := time.Now()
w, err := factory.NewRedoLogWriter(ctx, m.cfg)
Expand Down Expand Up @@ -549,11 +541,13 @@ func (m *logManager) close() {
atomic.StoreInt32(&m.closed, 1)

m.logBuffer.CloseAndDrain()
if err := m.writer.Close(); err != nil {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
if m.writer != nil {
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}
log.Info("redo manager closed",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
Expand Down
9 changes: 4 additions & 5 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
MetaFlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, cfg)
require.Nil(b, err)
eg := errgroup.Group{}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
Expand Down Expand Up @@ -124,6 +123,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
time.Sleep(time.Millisecond * 500)
}
cancel()
err = eg.Wait()
require.ErrorIs(b, err, context.Canceled)

require.ErrorIs(b, eg.Wait(), context.Canceled)
}
Loading

0 comments on commit 21033a4

Please sign in to comment.