Skip to content

Commit

Permalink
ddl_puller.go(ticdc): fix DDLs are ignored when schema versions are o…
Browse files Browse the repository at this point in the history
…ut of order (#11733)

close #11714
  • Loading branch information
wlwilliamx authored Nov 15, 2024
1 parent 68c22f3 commit b38183b
Showing 1 changed file with 1 addition and 4 deletions.
5 changes: 1 addition & 4 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type ddlJobPullerImpl struct {
kvStorage tidbkv.Storage
schemaStorage entry.SchemaStorage
resolvedTs uint64
schemaVersion int64
filter filter.Filter
// ddlTableInfo is initialized when receive the first concurrent DDL job.
ddlTableInfo *entry.DDLTableInfo
Expand Down Expand Up @@ -317,8 +316,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return false, nil
}

if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
job.BinlogInfo.SchemaVersion <= p.schemaVersion {
if job.BinlogInfo.FinishedTS <= p.getResolvedTs() {
log.Info("ddl job finishedTs less than puller resolvedTs,"+
"discard the ddl job",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down Expand Up @@ -480,7 +478,6 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
p.setResolvedTs(job.BinlogInfo.FinishedTS)
p.schemaVersion = job.BinlogInfo.SchemaVersion

return p.checkIneligibleTableDDL(snap, job)
}
Expand Down

0 comments on commit b38183b

Please sign in to comment.