Skip to content

Commit

Permalink
fix conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 2, 2023
1 parent a6c8fa9 commit eaf4c12
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 67 deletions.
14 changes: 4 additions & 10 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return 0, 0, nil
return nil
}
}

Expand Down Expand Up @@ -557,13 +557,7 @@ LOOP2:
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -572,7 +566,7 @@ LOOP2:
}()
}

c.redoMetaMgr = redo.NewMetaManager(c.id, contextutil.CaptureAddrFromCtx(ctx), c.state.Info.Config.Consistent, checkpointTs)
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand Down Expand Up @@ -734,7 +728,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
c.redoMetaMgr = redo.NewMetaManager(c.id, contextutil.CaptureAddrFromCtx(ctx), c.state.Info.Config.Consistent, 0)
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0)
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func newLogManager(
}

// Run implements pkg/util.Runnable.
func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
func (m *logManager) Run(ctx context.Context) error {
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
failpoint.Return(errors.New("changefeed new redo manager injected error"))
})
Expand Down
57 changes: 1 addition & 56 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,48 +86,10 @@ func NewDisabledMetaManager() *metaManager {
}
}

<<<<<<< HEAD
// NewMetaManagerWithInit creates a new Manager and initializes the meta.
func NewMetaManagerWithInit(
ctx context.Context, cfg *config.ConsistentConfig, startTs model.Ts,
) (*metaManager, error) {
m, err := NewMetaManager(ctx, cfg)
if err != nil {
return nil, err
}

// There is no need to perform initialize operation if metaMgr is disabled
// or the scheme is blackhole.
if m.extStorage != nil {
m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
if err = m.preCleanupExtStorage(ctx); err != nil {
log.Warn("pre clean redo logs fail",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Error(err))
return nil, err
}
if err = m.initMeta(ctx, startTs); err != nil {
log.Warn("init redo meta fail",
zap.String("namespace", m.changeFeedID.Namespace),
zap.String("changefeed", m.changeFeedID.ID),
zap.Error(err))
return nil, err
}
}

return m, nil
}

// NewMetaManager creates a new meta Manager.
func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaManager, error) {
=======
// NewMetaManager creates a new meta Manager.
func NewMetaManager(
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig, checkpoint model.Ts,
) *metaManager {
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &metaManager{enabled: false}
Expand All @@ -138,19 +100,9 @@ func NewMetaManager(
changeFeedID: contextutil.ChangefeedIDFromCtx(ctx),

Check failure on line 100 in cdc/redo/meta_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

undefined: ctx

Check failure on line 100 in cdc/redo/meta_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

undefined: ctx
uuidGenerator: uuid.NewGenerator(),
enabled: true,
<<<<<<< HEAD
flushIntervalInMs: cfg.FlushIntervalInMs,
=======
cfg: cfg,
startTs: checkpoint,
flushIntervalInMs: cfg.MetaFlushIntervalInMs,
}

if m.flushIntervalInMs < redo.MinFlushIntervalInMs {
log.Warn("redo flush interval is too small, use default value",
zap.Int64("interval", m.flushIntervalInMs))
m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
flushIntervalInMs: cfg.FlushIntervalInMs,
}
return m
}
Expand Down Expand Up @@ -203,16 +155,9 @@ func (m *metaManager) preStart(ctx context.Context) error {
}

// Run runs bgFlushMeta and bgGC.
<<<<<<< HEAD
func (m *metaManager) Run(ctx context.Context) error {
if m.extStorage == nil {
log.Warn("extStorage of redo meta manager is nil, skip running")
return nil
=======
func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error {
if err := m.preStart(ctx); err != nil {
return err
>>>>>>> 684d117c67 (redo(ticdc): fix redo initialization block the owner (#9887))
}
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
Expand Down

0 comments on commit eaf4c12

Please sign in to comment.