Skip to content

Commit

Permalink
update code
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Nov 22, 2023
1 parent 2898d74 commit 73bc607
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
36 changes: 21 additions & 15 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context,
}
return nil
})
checkpointTs, minTableBarrierTs, maxLastSyncTime, err := c.tick(ctx, captures)
c.lastSyncTime = maxLastSyncTime
checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures)

// The tick duration is recorded only if changefeed has completed initialization
if c.initialized {
Expand Down Expand Up @@ -352,50 +351,50 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context,
// tick returns the checkpointTs and minTableBarrierTs.
func (c *changefeed) tick(ctx cdcContext.Context,
captures map[model.CaptureID]*model.CaptureInfo,
) (model.Ts, model.Ts, model.Ts, error) {
) (model.Ts, model.Ts, error) {
adminJobPending := c.feedStateManager.Tick(c.resolvedTs, c.latestStatus, c.latestInfo)
preCheckpointTs := c.latestInfo.GetCheckpointTs(c.latestStatus)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, c.latestInfo, preCheckpointTs); err != nil {
return 0, 0, 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}

if !c.feedStateManager.ShouldRunning() {
c.isRemoved = c.feedStateManager.ShouldRemoved()
c.releaseResources(ctx)
return 0, 0, 0, nil
return 0, 0, nil
}

if adminJobPending {
return 0, 0, 0, nil
return 0, 0, nil
}

if err := c.initialize(ctx); err != nil {
return 0, 0, 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}

select {
case err := <-c.errCh:
return 0, 0, 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return 0, 0, 0, nil
return 0, 0, nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
return 0, 0, 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}

err = c.handleBarrier(ctx, c.latestInfo, c.latestStatus, barrier)
if err != nil {
return 0, 0, 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}

log.Debug("owner handles barrier",
Expand All @@ -411,14 +410,21 @@ func (c *changefeed) tick(ctx cdcContext.Context,
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
return 0, 0, 0, nil
return 0, 0, nil
}

newCheckpointTs, newResolvedTs, newLastSyncTime, err := c.scheduler.Tick(
ctx, preCheckpointTs, allPhysicalTables, captures,
barrier)
if err != nil {
return 0, 0, 0, errors.Trace(err)
return 0, 0, errors.Trace(err)
}
if c.lastSyncTime > newLastSyncTime {
c.lastSyncTime = newLastSyncTime
} else {
log.Warn("lastSyncTime should not be greater than newLastSyncTime",
zap.Uint64("c.lastSyncTime", c.lastSyncTime),
zap.Uint64("newLastSyncTime", newLastSyncTime))
}

pdTime := c.upstream.PDClock.CurrentTime()
Expand All @@ -432,7 +438,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
// advance the watermarks for now.
c.updateMetrics(currentTs, c.latestStatus.CheckpointTs, c.resolvedTs)
}
return 0, 0, 0, nil
return 0, 0, nil
}

log.Debug("owner prepares to update status",
Expand Down Expand Up @@ -465,7 +471,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
c.updateMetrics(currentTs, newCheckpointTs, c.resolvedTs)
c.tickDownstreamObserver(ctx)

return newCheckpointTs, barrier.MinTableBarrierTs, newLastSyncTime, nil
return newCheckpointTs, barrier.MinTableBarrierTs, nil
}

func (c *changefeed) initialize(ctx cdcContext.Context) (err error) {
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (o *ownerImpl) handleJobs(ctx context.Context) {

func (o *ownerImpl) handleQueries(query *Query) error {
switch query.Tp {
case QueryChangeFeedSyncedStatus:
case QueryChangeFeedStatuses:
cfReactor, ok := o.changefeeds[query.ChangeFeedID]
if !ok {
query.Data = nil
Expand All @@ -597,7 +597,7 @@ func (o *ownerImpl) handleQueries(query *Query) error {
ret.ResolvedTs = cfReactor.resolvedTs
ret.CheckpointTs = cfReactor.latestStatus.CheckpointTs
query.Data = ret
case QueryChangeFeedStatuses:
case QueryChangeFeedSyncedStatus:
cfReactor, ok := o.changefeeds[query.ChangeFeedID]
if !ok {
query.Data = nil
Expand Down

0 comments on commit 73bc607

Please sign in to comment.