diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 2b55481b80b..2a1e7edf493 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -115,17 +115,16 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model if err != nil { return errors.Trace(err) } - if job == nil { - return nil - } - skip, err := p.handleJob(job) - if err != nil { - return cerror.WrapError(cerror.ErrHandleDDLFailed, - err, job.Query, job.StartTS, job.StartTS) - } - if skip { - return nil + if job != nil { + skip, err := p.handleJob(job) + if err != nil { + return cerror.WrapError(cerror.ErrHandleDDLFailed, + err, job.Query, job.StartTS, job.StartTS) + } + if skip { + return nil + } } jobEntry := &model.DDLJobEntry{ @@ -169,7 +168,9 @@ func (p *ddlJobPullerImpl) run(ctx context.Context) error { func (p *ddlJobPullerImpl) runMultiplexing(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { return p.multiplexingPuller.Run(ctx) }) + eg.Go(func() error { + return p.multiplexingPuller.Run(ctx) + }) eg.Go(func() error { for { var ddlRawKV *model.RawKVEntry @@ -694,7 +695,7 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error { g.Go(func() error { return h.ddlJobPuller.Run(ctx) }) g.Go(func() error { - cc := clock.NewMock() + cc := clock.New() ticker := cc.Ticker(ddlPullerStuckWarnDuration) defer ticker.Stop() lastResolvedTsAdvancedTime := cc.Now() @@ -711,15 +712,15 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error { zap.Duration("duration", duration), zap.Uint64("resolvedTs", atomic.LoadUint64(&h.resolvedTS))) } - case entry := <-h.ddlJobPuller.Output(): - if entry.OpType == model.OpTypeResolved { - if entry.CRTs > atomic.LoadUint64(&h.resolvedTS) { - atomic.StoreUint64(&h.resolvedTS, entry.CRTs) + case e := <-h.ddlJobPuller.Output(): + if e.OpType == model.OpTypeResolved { + if e.CRTs > atomic.LoadUint64(&h.resolvedTS) { + atomic.StoreUint64(&h.resolvedTS, e.CRTs) lastResolvedTsAdvancedTime = cc.Now() continue } } - h.add2Pending(entry.Job) + h.add2Pending(e.Job) } } })