Skip to content

Commit

Permalink
fix conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 2, 2023
1 parent f4582af commit 713ae93
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 217 deletions.
41 changes: 5 additions & 36 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,40 +584,21 @@ LOOP2:
}
c.observerLastTick = atomic.NewTime(time.Time{})

<<<<<<< HEAD
stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
=======
c.redoDDLMgr = redo.NewDDLManager(c.id, c.latestInfo.Config.Consistent, ddlStartTs)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoDDLMgr.Run(stdCtx))
ctx.Throw(c.redoDDLMgr.Run(cancelCtx))
}()
}

<<<<<<< HEAD
c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx,
c.state.Info.Config.Consistent, checkpointTs)
if err != nil {
return err
}
=======
c.redoMetaMgr = redo.NewMetaManager(c.id, c.latestInfo.Config.Consistent, checkpointTs)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs)

Check failure on line 596 in cdc/owner/changefeed.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

not enough arguments in call to redo.NewMetaManager

Check failure on line 596 in cdc/owner/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

not enough arguments in call to redo.NewMetaManager
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoMetaMgr.Run(stdCtx))
ctx.Throw(c.redoMetaMgr.Run(cancelCtx))
}()
}
log.Info("owner creates redo manager",
Expand Down Expand Up @@ -779,19 +760,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
<<<<<<< HEAD
redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
return
}
c.redoMetaMgr = redoMetaMgr
=======
c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, 0)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0)

Check failure on line 763 in cdc/owner/changefeed.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

not enough arguments in call to redo.NewMetaManager

Check failure on line 763 in cdc/owner/changefeed.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

not enough arguments in call to redo.NewMetaManager
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,14 +670,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
}
p.changefeed.Info.Config.Sink.TiDBSourceID = sourceID

<<<<<<< HEAD
p.redo.r, err = redo.NewDMLManager(stdCtx, p.changefeed.Info.Config.Consistent)
if err != nil {
return err
}
=======
p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent)

Check failure on line 673 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

p.latestInfo undefined (type *processor has no field or method latestInfo)

Check failure on line 673 in cdc/processor/processor.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

p.latestInfo undefined (type *processor has no field or method latestInfo)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
p.redo.name = "RedoManager"
p.redo.spawn(stdCtx)

Expand Down
48 changes: 2 additions & 46 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
<<<<<<< HEAD
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
=======
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/common"
Expand Down Expand Up @@ -70,19 +65,10 @@ func NewDisabledDDLManager() *ddlManager {

// NewDDLManager creates a new ddl Manager.
func NewDDLManager(
<<<<<<< HEAD
ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) (*ddlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoDDLLogFileType)
if err != nil {
return nil, err
}
=======
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
span := spanz.TableIDToComparableSpan(0)
m.AddTable(span, ddlStartTs)
return &ddlManager{
Expand Down Expand Up @@ -126,18 +112,11 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
<<<<<<< HEAD
func NewDMLManager(ctx context.Context, cfg *config.ConsistentConfig) (*dmlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoRowLogFileType)
if err != nil {
return nil, err
=======
func NewDMLManager(changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig,
func NewDMLManager(
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig,
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
}
}

Expand Down Expand Up @@ -244,36 +223,14 @@ type logManager struct {
}

func newLogManager(
<<<<<<< HEAD
ctx context.Context, cfg *config.ConsistentConfig, logType string,
) (*logManager, error) {
=======
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, logType string,
) *logManager {
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &logManager{enabled: false}
}

<<<<<<< HEAD
uri, err := storage.ParseRawURL(cfg.Storage)
if err != nil {
return nil, err
}
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
m := &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: contextutil.CaptureAddrFromCtx(ctx),
ChangeFeedID: changefeedID,
URI: *uri,
UseExternalStorage: redo.IsExternalStorage(uri.Scheme),
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
=======
return &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
Expand All @@ -282,7 +239,6 @@ func newLogManager(
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
},
logBuffer: chann.NewAutoDrainChann[cacheEvents](),
rtsMap: spanz.SyncMap{},
Expand Down
46 changes: 0 additions & 46 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,11 @@ func TestLogManagerInProcessor(t *testing.T) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
<<<<<<< HEAD
dmlMgr, err := NewDMLManager(ctx, cfg)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
dmlMgr.Run(ctx)
}()

=======
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
// check emit row changed events can move forward resolved ts
spans := []tablepb.Span{
spanz.TableIDToComparableSpan(53),
Expand Down Expand Up @@ -240,23 +228,12 @@ func TestLogManagerInOwner(t *testing.T) {
UseFileBackend: useFileBackend,
}
startTs := model.Ts(10)
<<<<<<< HEAD
ddlMgr, err := NewDDLManager(ctx, cfg, startTs)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ddlMgr.Run(ctx)
}()
=======
ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs)

var eg errgroup.Group
eg.Go(func() error {
return ddlMgr.Run(ctx)
})
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))

require.Equal(t, startTs, ddlMgr.GetResolvedTs())
ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"}
Expand Down Expand Up @@ -294,28 +271,11 @@ func TestLogManagerError(t *testing.T) {
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
}
<<<<<<< HEAD
logMgr, err := NewDMLManager(ctx, cfg)
require.NoError(t, err)
err = logMgr.writer.Close()
require.NoError(t, err)
logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := logMgr.Run(ctx)
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}()
=======
logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return logMgr.Run(ctx)
})
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))

testCases := []struct {
span tablepb.Span
Expand Down Expand Up @@ -363,14 +323,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
<<<<<<< HEAD
dmlMgr, err := NewDMLManager(ctx, cfg)
require.Nil(b, err)
eg := errgroup.Group{}
=======
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
Expand Down
68 changes: 9 additions & 59 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -75,8 +74,6 @@ type metaManager struct {
lastFlushTime time.Time
cfg *config.ConsistentConfig
metricFlushLogDuration prometheus.Observer

flushIntervalInMs int64
}

// NewDisabledMetaManager creates a disabled Meta Manager.
Expand All @@ -86,72 +83,25 @@ func NewDisabledMetaManager() *metaManager {
}
}

<<<<<<< HEAD
// NewMetaManagerWithInit creates a new Manager and initializes the meta.
func NewMetaManagerWithInit(
ctx context.Context, cfg *config.ConsistentConfig, startTs model.Ts,
) (*metaManager, error) {
m, err := NewMetaManager(ctx, cfg)
if err != nil {
return nil, err
}

// There is no need to perform initialize operation if metaMgr is disabled
// or the scheme is blackhole.
if m.extStorage != nil {
m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
if err = m.preCleanupExtStorage(ctx); err != nil {
log.Warn("pre clean redo logs fail",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Error(err))
return nil, err
}
if err = m.initMeta(ctx, startTs); err != nil {
log.Warn("init redo meta fail",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Error(err))
return nil, err
}
}

return m, nil
}

// NewMetaManager creates a new meta Manager.
func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaManager, error) {
=======
// NewMetaManager creates a new meta Manager.
func NewMetaManager(
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts,
changefeedID model.ChangeFeedID, captureID string,
cfg *config.ConsistentConfig, checkpoint model.Ts,
) *metaManager {
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &metaManager{enabled: false}
}

m := &metaManager{
captureID: contextutil.CaptureAddrFromCtx(ctx),
changeFeedID: contextutil.ChangefeedIDFromCtx(ctx),
uuidGenerator: uuid.NewGenerator(),
enabled: true,
<<<<<<< HEAD
flushIntervalInMs: cfg.FlushIntervalInMs,
=======
cfg: cfg,
startTs: checkpoint,
flushIntervalInMs: cfg.MetaFlushIntervalInMs,
}

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
captureID: captureID,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
cfg: cfg,
startTs: checkpoint,
}

return m
}

Expand Down
Loading

0 comments on commit 713ae93

Please sign in to comment.