Skip to content

Commit

Permalink
refactor ddl puller handle jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 13, 2023
1 parent 5140f07 commit d9a80e2
Showing 1 changed file with 14 additions and 18 deletions.
32 changes: 14 additions & 18 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/puller/memorysorter"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -635,9 +635,6 @@ type ddlPullerImpl struct {
cancel context.CancelFunc

changefeedID model.ChangeFeedID

clock clock.Clock
lastResolvedTsAdvancedTime time.Time
}

// NewDDLPuller return a puller for DDL Event
Expand All @@ -661,19 +658,11 @@ func NewDDLPuller(ctx context.Context,
ddlJobPuller: puller,
resolvedTS: startTs,
cancel: func() {},
clock: clock.New(),
changefeedID: changefeed,
}
}

func (h *ddlPullerImpl) handleDDLJobEntry(jobEntry *model.DDLJobEntry) {
if jobEntry.OpType == model.OpTypeResolved {
if jobEntry.CRTs > atomic.LoadUint64(&h.resolvedTS) {
h.lastResolvedTsAdvancedTime = h.clock.Now()
atomic.StoreUint64(&h.resolvedTS, jobEntry.CRTs)
}
}
job := jobEntry.Job
func (h *ddlPullerImpl) add2Pending(job *timodel.Job) {
if job.ID == h.lastDDLJobID {
log.Warn("ignore duplicated DDL job",
zap.String("namespace", h.changefeedID.Namespace),
Expand Down Expand Up @@ -702,24 +691,31 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error {
g.Go(func() error { return h.ddlJobPuller.Run(ctx) })

g.Go(func() error {
ticker := h.clock.Ticker(ddlPullerStuckWarnDuration)
cc := clock.NewMock()
ticker := cc.Ticker(ddlPullerStuckWarnDuration)
defer ticker.Stop()
h.lastResolvedTsAdvancedTime = h.clock.Now()
lastResolvedTsAdvancedTime := cc.Now()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
duration := h.clock.Since(h.lastResolvedTsAdvancedTime)
duration := cc.Since(lastResolvedTsAdvancedTime)
if duration > ddlPullerStuckWarnDuration {
log.Warn("ddl puller resolved ts has not advanced",
zap.String("namespace", h.changefeedID.Namespace),
zap.String("changefeed", h.changefeedID.ID),
zap.Duration("duration", duration),
zap.Uint64("resolvedTs", atomic.LoadUint64(&h.resolvedTS)))
}
case e := <-h.ddlJobPuller.Output():
h.handleDDLJobEntry(e)
case entry := <-h.ddlJobPuller.Output():
if entry.OpType == model.OpTypeResolved {
if entry.CRTs > atomic.LoadUint64(&h.resolvedTS) {
atomic.StoreUint64(&h.resolvedTS, entry.CRTs)
lastResolvedTsAdvancedTime = cc.Now()
}
}
h.add2Pending(entry.Job)
}
}
})
Expand Down

0 comments on commit d9a80e2

Please sign in to comment.