Skip to content

Commit

Permalink
redo(ticdc): fix redo initialization block the owner (#9887)
Browse files Browse the repository at this point in the history
close #9886
  • Loading branch information
3AceShowHand authored Nov 1, 2023
1 parent 3dc797b commit 684d117
Show file tree
Hide file tree
Showing 20 changed files with 322 additions and 267 deletions.
31 changes: 9 additions & 22 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ func (c *changefeed) tick(ctx cdcContext.Context,
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return 0, 0, nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -610,13 +616,7 @@ LOOP2:
}
c.observerLastTick = atomic.NewTime(time.Time{})

c.redoDDLMgr, err = redo.NewDDLManager(cancelCtx, c.id, c.latestInfo.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
c.redoDDLMgr = redo.NewDDLManager(c.id, c.latestInfo.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -625,12 +625,7 @@ LOOP2:
}()
}

c.redoMetaMgr, err = redo.NewMetaManagerWithInit(cancelCtx,
c.id,
c.latestInfo.Config.Consistent, checkpointTs)
if err != nil {
return err
}
c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand Down Expand Up @@ -798,15 +793,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
redoMetaMgr, err := redo.NewMetaManager(ctx, c.id, cfInfo.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
return
}
c.redoMetaMgr = redoMetaMgr
c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, 0)
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
}
p.latestInfo.Config.Sink.TiDBSourceID = sourceID

p.redo.r, err = redo.NewDMLManager(prcCtx, p.changefeedID, p.latestInfo.Config.Consistent)
if err != nil {
return err
}
p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent)
p.redo.name = "RedoManager"
p.redo.changefeedID = p.changefeedID
p.redo.spawn(prcCtx)
Expand Down
3 changes: 1 addition & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,14 @@ func newProcessor4Test(
} else {
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
require.NoError(t, err)
p.redo.r = dmlMgr
}
p.redo.name = "RedoManager"
Expand Down
93 changes: 46 additions & 47 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/common"
Expand Down Expand Up @@ -65,20 +65,17 @@ func NewDisabledDDLManager() *ddlManager {

// NewDDLManager creates a new ddl Manager.
func NewDDLManager(
ctx context.Context, changefeedID model.ChangeFeedID,
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) (*ddlManager, error) {
logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoDDLLogFileType)
if err != nil {
return nil, err
}
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
span := spanz.TableIDToComparableSpan(0)
logManager.AddTable(span, ddlStartTs)
m.AddTable(span, ddlStartTs)
return &ddlManager{
logManager: logManager,
// The current fakeSpan is meaningless, find a meaningful sapn in the future.
logManager: m,
// The current fakeSpan is meaningless, find a meaningful span in the future.
fakeSpan: span,
}, nil
}
}

type ddlManager struct {
Expand Down Expand Up @@ -115,14 +112,12 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
func NewDMLManager(ctx context.Context, changefeedID model.ChangeFeedID,
func NewDMLManager(changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig,
) (*dmlManager, error) {
logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoRowLogFileType)
if err != nil {
return nil, err
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
}
return &dmlManager{logManager: logManager}, nil
}

// NewDisabledDMLManager creates a disabled dml Manager.
Expand Down Expand Up @@ -228,29 +223,22 @@ type logManager struct {
}

func newLogManager(
ctx context.Context,
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, logType string,
) (*logManager, error) {
) *logManager {
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &logManager{enabled: false}, nil
return &logManager{enabled: false}
}

uri, err := storage.ParseRawURL(cfg.Storage)
if err != nil {
return nil, err
}
m := &logManager{
return &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
URI: *uri,
UseExternalStorage: redo.IsExternalStorage(uri.Scheme),
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
},
logBuffer: chann.NewAutoDrainChann[cacheEvents](),
rtsMap: spanz.SyncMap{},
Expand All @@ -263,21 +251,30 @@ func newLogManager(
metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
return nil, err
}
return m, nil
}

// Run implements pkg/util.Runnable.
func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
if m.Enabled() {
defer m.close()
return m.bgUpdateLog(ctx)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
failpoint.Return(errors.New("changefeed new redo manager injected error"))
})
if !m.Enabled() {
return nil
}
return nil

defer m.close()
start := time.Now()
w, err := factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
log.Error("redo: failed to create redo log writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return err
}
m.writer = w
return m.bgUpdateLog(ctx)
}

// WaitForReady implements pkg/util.Runnable.
Expand Down Expand Up @@ -549,11 +546,13 @@ func (m *logManager) close() {
atomic.StoreInt32(&m.closed, 1)

m.logBuffer.CloseAndDrain()
if err := m.writer.Close(); err != nil {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
if m.writer != nil {
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}
log.Info("redo manager closed",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
Expand Down
72 changes: 29 additions & 43 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/redo/writer/blackhole"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
Expand Down Expand Up @@ -121,15 +120,11 @@ func TestLogManagerInProcessor(t *testing.T) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
dmlMgr.Run(ctx)
}()

dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
// check emit row changed events can move forward resolved ts
spans := []tablepb.Span{
spanz.TableIDToComparableSpan(53),
Expand Down Expand Up @@ -202,7 +197,7 @@ func TestLogManagerInProcessor(t *testing.T) {
checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs)

cancel()
wg.Wait()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}

testWriteDMLs("blackhole://", true)
Expand Down Expand Up @@ -233,26 +228,24 @@ func TestLogManagerInOwner(t *testing.T) {
UseFileBackend: useFileBackend,
}
startTs := model.Ts(10)
ddlMgr, err := NewDDLManager(ctx, model.DefaultChangeFeedID("test"), cfg, startTs)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ddlMgr.Run(ctx)
}()
ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs)

var eg errgroup.Group
eg.Go(func() error {
return ddlMgr.Run(ctx)
})

require.Equal(t, startTs, ddlMgr.GetResolvedTs())
ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"}
err = ddlMgr.EmitDDLEvent(ctx, ddl)
err := ddlMgr.EmitDDLEvent(ctx, ddl)
require.NoError(t, err)
require.Equal(t, startTs, ddlMgr.GetResolvedTs())

ddlMgr.UpdateResolvedTs(ctx, ddl.CommitTs)
checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs)

cancel()
wg.Wait()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}

testWriteDDLs("blackhole://", true)
Expand All @@ -275,23 +268,14 @@ func TestLogManagerError(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole://",
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
}
logMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.NoError(t, err)
err = logMgr.writer.Close()
require.NoError(t, err)
logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := logMgr.Run(ctx)
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}()
logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return logMgr.Run(ctx)
})

testCases := []struct {
span tablepb.Span
Expand All @@ -310,7 +294,10 @@ func TestLogManagerError(t *testing.T) {
err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...)
require.NoError(t, err)
}
wg.Wait()

err := eg.Wait()
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}

func BenchmarkBlackhole(b *testing.B) {
Expand All @@ -336,9 +323,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.Nil(b, err)
eg := errgroup.Group{}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
Expand Down Expand Up @@ -366,7 +352,7 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
go func(span tablepb.Span) {
defer wg.Done()
maxCommitTs := maxTsMap.GetV(span)
rows := []*model.RowChangedEvent{}
var rows []*model.RowChangedEvent
for i := 0; i < maxRowCount; i++ {
if i%100 == 0 {
// prepare new row change events
Expand Down Expand Up @@ -409,6 +395,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
time.Sleep(time.Millisecond * 500)
}
cancel()
err = eg.Wait()
require.ErrorIs(b, err, context.Canceled)

require.ErrorIs(b, eg.Wait(), context.Canceled)
}
Loading

0 comments on commit 684d117

Please sign in to comment.