Skip to content

Commit

Permalink
This is an automated cherry-pick of #9887
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Nov 1, 2023
1 parent 4febedd commit 20018eb
Show file tree
Hide file tree
Showing 20 changed files with 465 additions and 116 deletions.
24 changes: 24 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return 0, 0, nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -551,6 +557,7 @@ LOOP2:
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

<<<<<<< HEAD
stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
Expand All @@ -559,6 +566,15 @@ LOOP2:
if err != nil {
return err
}
=======
c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, c.latestInfo.SinkURI, c.latestInfo.Config)
if err != nil {
return err
}
c.observerLastTick = atomic.NewTime(time.Time{})

c.redoDDLMgr = redo.NewDDLManager(c.id, c.latestInfo.Config.Consistent, ddlStartTs)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -567,11 +583,15 @@ LOOP2:
}()
}

<<<<<<< HEAD
c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx,
c.state.Info.Config.Consistent, checkpointTs)
if err != nil {
return err
}
=======
c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand Down Expand Up @@ -733,6 +753,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
<<<<<<< HEAD
redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
Expand All @@ -742,6 +763,9 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
return
}
c.redoMetaMgr = redoMetaMgr
=======
c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, 0)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
return errors.Trace(err)
}

<<<<<<< HEAD
stdCtx := contextutil.PutChangefeedIDInCtx(ctx, p.changefeedID)
stdCtx = contextutil.PutRoleInCtx(stdCtx, util.RoleProcessor)

Expand Down Expand Up @@ -817,6 +818,15 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}()
}
log.Info("processor creates redo manager",
=======
p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent)
p.redo.name = "RedoManager"
p.redo.changefeedID = p.changefeedID
p.redo.spawn(prcCtx)

sortEngine, err := p.globalVars.SortEngineFactory.Create(p.changefeedID)
log.Info("Processor creates sort engine",
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))

Expand Down
41 changes: 41 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,49 @@ func newProcessor4Test(
captureInfo,
model.ChangeFeedID4Test("processor-test", "processor-test"), up, liveness, 0)
p.lazyInit = func(ctx cdcContext.Context) error {
<<<<<<< HEAD
p.agent = &mockAgent{executor: p}
p.sinkV1 = mocksink.NewNormalMockSink()
=======
if p.initialized {
return nil
}

if !enableRedo {
p.redo.r = redo.NewDisabledDMLManager()
} else {
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
p.redo.r = dmlMgr
}
p.redo.name = "RedoManager"
p.redo.changefeedID = changefeedID
p.redo.spawn(ctx)

p.agent = &mockAgent{executor: p, liveness: liveness}
p.sinkManager.r, p.sourceManager.r, _ = sinkmanager.NewManagerWithMemEngine(
t, changefeedID, info, p.redo.r)
p.sinkManager.name = "SinkManager"
p.sinkManager.changefeedID = changefeedID
p.sinkManager.spawn(ctx)
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = changefeedID
p.sourceManager.spawn(ctx)

// NOTICE: we have to bind the sourceManager to the sinkManager
// otherwise the sinkManager will not receive the resolvedTs.
p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs)

p.initialized = true
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
return nil
}
p.redoDMLMgr = redo.NewDisabledDMLManager()
Expand Down
88 changes: 75 additions & 13 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
<<<<<<< HEAD

Check failure on line 24 in cdc/redo/manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

expected 'STRING', found '<<'

Check failure on line 24 in cdc/redo/manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

expected 'STRING', found '<<'
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
=======
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/redo/writer"
Expand Down Expand Up @@ -62,6 +66,7 @@ func NewDisabledDDLManager() *ddlManager {

// NewDDLManager creates a new ddl Manager.
func NewDDLManager(
<<<<<<< HEAD
ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) (*ddlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoDDLLogFileType)
Expand All @@ -75,6 +80,19 @@ func NewDDLManager(
// The current fakeTableID is meaningless, find a meaningful id in the future.
fakeTableID: tableID,
}, nil
=======
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
span := spanz.TableIDToComparableSpan(0)
m.AddTable(span, ddlStartTs)
return &ddlManager{
logManager: m,
// The current fakeSpan is meaningless, find a meaningful span in the future.
fakeSpan: span,
}
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
}

type ddlManager struct {
Expand Down Expand Up @@ -111,12 +129,19 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
<<<<<<< HEAD
func NewDMLManager(ctx context.Context, cfg *config.ConsistentConfig) (*dmlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoRowLogFileType)
if err != nil {
return nil, err
=======
func NewDMLManager(changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig,
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
}
return &dmlManager{logManager: logManager}, nil
}

// NewDisabledDMLManager creates a disabled dml Manager.
Expand Down Expand Up @@ -222,13 +247,20 @@ type logManager struct {
}

func newLogManager(
<<<<<<< HEAD
ctx context.Context, cfg *config.ConsistentConfig, logType string,
) (*logManager, error) {
=======
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, logType string,
) *logManager {
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &logManager{enabled: false}, nil
return &logManager{enabled: false}
}

<<<<<<< HEAD
uri, err := storage.ParseRawURL(cfg.Storage)
if err != nil {
return nil, err
Expand All @@ -244,6 +276,16 @@ func newLogManager(
URI: *uri,
UseExternalStorage: redo.IsExternalStorage(uri.Scheme),
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
=======
return &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
},
logBuffer: chann.NewDrainableChann[cacheEvents](),
metricWriteLogDuration: common.RedoWriteLogDurationHistogram.
Expand All @@ -255,16 +297,34 @@ func newLogManager(
metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
return nil, err
}
return m, nil
}

<<<<<<< HEAD
func (m *logManager) Run(ctx context.Context) error {
defer m.close()
=======
// Run implements pkg/util.Runnable.
func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
failpoint.Return(errors.New("changefeed new redo manager injected error"))
})
if !m.Enabled() {
return nil
}

defer m.close()
start := time.Now()
w, err := factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
log.Error("redo: failed to create redo log writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return err
}
m.writer = w
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
return m.bgUpdateLog(ctx)
}

Expand Down Expand Up @@ -531,11 +591,13 @@ func (m *logManager) close() {
atomic.StoreInt32(&m.closed, 1)

m.logBuffer.CloseAndDrain()
if err := m.writer.Close(); err != nil {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
if m.writer != nil {
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}
log.Info("redo manager closed",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
Expand Down
Loading

0 comments on commit 20018eb

Please sign in to comment.