Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): fix redo initialization block the owner #9887

Merged
merged 30 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c42cb1d
add some logs about read redo meta.
3AceShowHand Oct 13, 2023
93fc61c
move init meta to the run method.
3AceShowHand Oct 16, 2023
9a8222a
Merge branch 'master' into redo-block-owner
3AceShowHand Oct 18, 2023
ef5bbd9
adjust meta manager.
3AceShowHand Oct 18, 2023
c7c8af8
fix meta manager.
3AceShowHand Oct 18, 2023
c1655e3
fix redo meta unit test.
3AceShowHand Oct 19, 2023
3e3d06b
adjust redo log manager.
3AceShowHand Oct 19, 2023
767f96d
fix unit test.
3AceShowHand Oct 19, 2023
ee674b6
set resolved ts after the redo meta manager is running.
3AceShowHand Oct 19, 2023
91b075a
fix scheduler.
3AceShowHand Oct 19, 2023
3c1b288
fix one unit test.
3AceShowHand Oct 20, 2023
53ea3e5
fix redo unit tests.
3AceShowHand Oct 20, 2023
99f2934
fix consumer panic since meet previous DDL
3AceShowHand Oct 20, 2023
57890c0
also return the failpoint error.
3AceShowHand Oct 20, 2023
0eda809
fix redo failpoint error.
3AceShowHand Oct 20, 2023
2c86265
also consider whether the storage location is write read able.
3AceShowHand Oct 20, 2023
ce49682
remove todo.
3AceShowHand Oct 20, 2023
8b22cfe
Merge branch 'master' into redo-block-owner
3AceShowHand Oct 23, 2023
0a7b94d
fix redo.
3AceShowHand Oct 23, 2023
f832f2b
add logs to help debug.
3AceShowHand Oct 25, 2023
f0f751e
Merge branch 'master' into redo-block-owner
3AceShowHand Oct 25, 2023
97cabf3
Merge branch 'master' into redo-block-owner
3AceShowHand Oct 26, 2023
eeb4a7e
revert set resolved ts.
3AceShowHand Oct 26, 2023
42eebf2
remove debug log.
3AceShowHand Oct 26, 2023
4305ebd
fix
3AceShowHand Oct 26, 2023
2477544
fix make fmt.
3AceShowHand Oct 27, 2023
b497ac9
fix review comment.
3AceShowHand Oct 27, 2023
5a64656
still use running to skip tick.
3AceShowHand Oct 27, 2023
e7fc85d
Merge branch 'master' into redo-block-owner
3AceShowHand Oct 31, 2023
ce0df30
fix ddl start ts not initialized
3AceShowHand Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 9 additions & 22 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ func (c *changefeed) tick(ctx cdcContext.Context,
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return 0, 0, nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -610,13 +616,7 @@ LOOP2:
}
c.observerLastTick = atomic.NewTime(time.Time{})

c.redoDDLMgr, err = redo.NewDDLManager(cancelCtx, c.id, c.latestInfo.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
c.redoDDLMgr = redo.NewDDLManager(c.id, c.latestInfo.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -625,12 +625,7 @@ LOOP2:
}()
}

c.redoMetaMgr, err = redo.NewMetaManagerWithInit(cancelCtx,
c.id,
c.latestInfo.Config.Consistent, checkpointTs)
if err != nil {
return err
}
c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand Down Expand Up @@ -798,15 +793,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
redoMetaMgr, err := redo.NewMetaManager(ctx, c.id, cfInfo.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
return
}
c.redoMetaMgr = redoMetaMgr
c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, 0)
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
}
p.latestInfo.Config.Sink.TiDBSourceID = sourceID

p.redo.r, err = redo.NewDMLManager(prcCtx, p.changefeedID, p.latestInfo.Config.Consistent)
if err != nil {
return err
}
p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent)
p.redo.name = "RedoManager"
p.redo.changefeedID = p.changefeedID
p.redo.spawn(prcCtx)
Expand Down
3 changes: 1 addition & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,13 @@ func newProcessor4Test(
} else {
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
require.NoError(t, err)
p.redo.r = dmlMgr
}
p.redo.name = "RedoManager"
Expand Down
93 changes: 46 additions & 47 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/common"
Expand Down Expand Up @@ -65,20 +65,17 @@ func NewDisabledDDLManager() *ddlManager {

// NewDDLManager creates a new ddl Manager.
func NewDDLManager(
ctx context.Context, changefeedID model.ChangeFeedID,
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) (*ddlManager, error) {
logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoDDLLogFileType)
if err != nil {
return nil, err
}
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
span := spanz.TableIDToComparableSpan(0)
logManager.AddTable(span, ddlStartTs)
m.AddTable(span, ddlStartTs)
return &ddlManager{
logManager: logManager,
// The current fakeSpan is meaningless, find a meaningful sapn in the future.
logManager: m,
// The current fakeSpan is meaningless, find a meaningful span in the future.
fakeSpan: span,
}, nil
}
}

type ddlManager struct {
Expand Down Expand Up @@ -115,14 +112,12 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
func NewDMLManager(ctx context.Context, changefeedID model.ChangeFeedID,
func NewDMLManager(changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig,
) (*dmlManager, error) {
logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoRowLogFileType)
if err != nil {
return nil, err
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
}
return &dmlManager{logManager: logManager}, nil
}

// NewDisabledDMLManager creates a disabled dml Manager.
Expand Down Expand Up @@ -228,29 +223,22 @@ type logManager struct {
}

func newLogManager(
ctx context.Context,
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, logType string,
) (*logManager, error) {
) *logManager {
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &logManager{enabled: false}, nil
return &logManager{enabled: false}
}

uri, err := storage.ParseRawURL(cfg.Storage)
if err != nil {
return nil, err
}
m := &logManager{
return &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
URI: *uri,
UseExternalStorage: redo.IsExternalStorage(uri.Scheme),
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
},
logBuffer: chann.NewAutoDrainChann[cacheEvents](),
rtsMap: spanz.SyncMap{},
Expand All @@ -263,21 +251,30 @@ func newLogManager(
metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
return nil, err
}
return m, nil
}

// Run implements pkg/util.Runnable.
func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
if m.Enabled() {
defer m.close()
return m.bgUpdateLog(ctx)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
failpoint.Return(errors.New("changefeed new redo manager injected error"))
})
if !m.Enabled() {
return nil
}
return nil

defer m.close()
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
start := time.Now()
w, err := factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
log.Error("redo: failed to create redo log writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return err
}
m.writer = w
return m.bgUpdateLog(ctx)
}

// WaitForReady implements pkg/util.Runnable.
Expand Down Expand Up @@ -549,11 +546,13 @@ func (m *logManager) close() {
atomic.StoreInt32(&m.closed, 1)

m.logBuffer.CloseAndDrain()
if err := m.writer.Close(); err != nil {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
if m.writer != nil {
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}
log.Info("redo manager closed",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
Expand Down
72 changes: 29 additions & 43 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/redo/writer/blackhole"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
Expand Down Expand Up @@ -121,15 +120,11 @@ func TestLogManagerInProcessor(t *testing.T) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
dmlMgr.Run(ctx)
}()

dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
// check emit row changed events can move forward resolved ts
spans := []tablepb.Span{
spanz.TableIDToComparableSpan(53),
Expand Down Expand Up @@ -202,7 +197,7 @@ func TestLogManagerInProcessor(t *testing.T) {
checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs)

cancel()
wg.Wait()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}

testWriteDMLs("blackhole://", true)
Expand Down Expand Up @@ -233,26 +228,24 @@ func TestLogManagerInOwner(t *testing.T) {
UseFileBackend: useFileBackend,
}
startTs := model.Ts(10)
ddlMgr, err := NewDDLManager(ctx, model.DefaultChangeFeedID("test"), cfg, startTs)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ddlMgr.Run(ctx)
}()
ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs)

var eg errgroup.Group
eg.Go(func() error {
return ddlMgr.Run(ctx)
})

require.Equal(t, startTs, ddlMgr.GetResolvedTs())
ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"}
err = ddlMgr.EmitDDLEvent(ctx, ddl)
err := ddlMgr.EmitDDLEvent(ctx, ddl)
require.NoError(t, err)
require.Equal(t, startTs, ddlMgr.GetResolvedTs())

ddlMgr.UpdateResolvedTs(ctx, ddl.CommitTs)
checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs)

cancel()
wg.Wait()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}

testWriteDDLs("blackhole://", true)
Expand All @@ -275,23 +268,14 @@ func TestLogManagerError(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole://",
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
}
logMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.NoError(t, err)
err = logMgr.writer.Close()
require.NoError(t, err)
logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := logMgr.Run(ctx)
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}()
logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return logMgr.Run(ctx)
})

testCases := []struct {
span tablepb.Span
Expand All @@ -310,7 +294,10 @@ func TestLogManagerError(t *testing.T) {
err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...)
require.NoError(t, err)
}
wg.Wait()

err := eg.Wait()
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}

func BenchmarkBlackhole(b *testing.B) {
Expand All @@ -336,9 +323,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.Nil(b, err)
eg := errgroup.Group{}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
Expand Down Expand Up @@ -366,7 +352,7 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
go func(span tablepb.Span) {
defer wg.Done()
maxCommitTs := maxTsMap.GetV(span)
rows := []*model.RowChangedEvent{}
var rows []*model.RowChangedEvent
for i := 0; i < maxRowCount; i++ {
if i%100 == 0 {
// prepare new row change events
Expand Down Expand Up @@ -409,6 +395,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
time.Sleep(time.Millisecond * 500)
}
cancel()
err = eg.Wait()
require.ErrorIs(b, err, context.Canceled)

require.ErrorIs(b, eg.Wait(), context.Canceled)
}
Loading
Loading