Skip to content

Commit

Permalink
fix redo manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 2, 2023
1 parent a9dee7c commit 4a8b5ba
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 6 deletions.
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ LOOP2:
}()
}

c.redoMetaMgr = redo.NewMetaManager(c.id, contextutil.CaptureAddrFromCtx(ctx), c.state.Info.Config.Consistent, checkpointTs)
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand Down Expand Up @@ -760,7 +760,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
c.redoMetaMgr = redo.NewMetaManager(c.id, contextutil.CaptureAddrFromCtx(ctx), c.state.Info.Config.Consistent, 0)
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0)
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewDisabledMetaManager() *metaManager {

// NewMetaManager creates a new meta Manager.
func NewMetaManager(
changefeedID model.ChangeFeedID, captureID string,
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, checkpoint model.Ts,
) *metaManager {
// return a disabled Manager if no consistent config or normal consistent level
Expand All @@ -94,7 +94,7 @@ func NewMetaManager(
}

m := &metaManager{
captureID: captureID,
captureID: config.GetGlobalServerConfig().AdvertiseAddr,
changeFeedID: changefeedID,
uuidGenerator: uuid.NewGenerator(),
enabled: true,
Expand Down Expand Up @@ -123,7 +123,6 @@ func (m *metaManager) preStart(ctx context.Context) error {
}
// "nfs" and "local" scheme are converted to "file" scheme
redo.FixLocalScheme(uri)

extStorage, err := redo.InitExternalStorage(ctx, *uri)
if err != nil {
return err
Expand Down Expand Up @@ -164,7 +163,6 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error {
eg.Go(func() error {
return m.bgGC(egCtx)
})

m.running.Store(true)
return eg.Wait()
}
Expand Down

0 comments on commit 4a8b5ba

Please sign in to comment.