Skip to content

Commit

Permalink
fix review comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 27, 2023
1 parent 2477544 commit b497ac9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 24 deletions.
12 changes: 7 additions & 5 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,13 @@ func (m *logManager) close() {
atomic.StoreInt32(&m.closed, 1)

m.logBuffer.CloseAndDrain()
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
if m.writer != nil {
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}
log.Info("redo manager closed",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
Expand Down
15 changes: 0 additions & 15 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -47,19 +46,13 @@ type MetaManager interface {
// Cleanup deletes all redo logs, which are only called from the owner
// when changefeed is deleted.
Cleanup(ctx context.Context) error

// Running return true if the meta manager is running or not.
Running() bool
}

type metaManager struct {
captureID model.CaptureID
changeFeedID model.ChangeFeedID
enabled bool

// running means the meta manager now running normally.
running atomic.Bool

metaCheckpointTs statefulRts
metaResolvedTs statefulRts

Expand Down Expand Up @@ -107,12 +100,6 @@ func (m *metaManager) Enabled() bool {
return m.enabled
}

// Running return whether the meta manager is initialized,
// which means the external storage is accessible to the meta manager.
func (m *metaManager) Running() bool {
return m.running.Load()
}

func (m *metaManager) preStart(ctx context.Context) error {
uri, err := storage.ParseRawURL(m.cfg.Storage)
if err != nil {
Expand Down Expand Up @@ -161,8 +148,6 @@ func (m *metaManager) Run(ctx context.Context, _ ...chan<- error) error {
eg.Go(func() error {
return m.bgGC(egCtx)
})

m.running.Store(true)
return eg.Wait()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,6 @@ type mockRedoMetaManager struct {
enable bool
}

func (m *mockRedoMetaManager) Running() bool {
return true
}

func (m *mockRedoMetaManager) UpdateMeta(checkpointTs, resolvedTs model.Ts) {
}

Expand Down

0 comments on commit b497ac9

Please sign in to comment.