Skip to content

Commit

Permalink
adjust log.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 14, 2023
1 parent 694fca2 commit e37eb66
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,16 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model
if err != nil {
return errors.Trace(err)
}
if job == nil {
return nil
}

skip, err := p.handleJob(job)
if err != nil {
return cerror.WrapError(cerror.ErrHandleDDLFailed,
err, job.Query, job.StartTS, job.StartTS)
}
if skip {
return nil
if job != nil {
skip, err := p.handleJob(job)
if err != nil {
return cerror.WrapError(cerror.ErrHandleDDLFailed,
err, job.Query, job.StartTS, job.StartTS)
}
if skip {
return nil
}
}

jobEntry := &model.DDLJobEntry{
Expand Down Expand Up @@ -169,7 +168,9 @@ func (p *ddlJobPullerImpl) run(ctx context.Context) error {

func (p *ddlJobPullerImpl) runMultiplexing(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { return p.multiplexingPuller.Run(ctx) })
eg.Go(func() error {
return p.multiplexingPuller.Run(ctx)
})
eg.Go(func() error {
for {
var ddlRawKV *model.RawKVEntry
Expand Down Expand Up @@ -694,7 +695,7 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error {
g.Go(func() error { return h.ddlJobPuller.Run(ctx) })

g.Go(func() error {
cc := clock.NewMock()
cc := clock.New()
ticker := cc.Ticker(ddlPullerStuckWarnDuration)
defer ticker.Stop()
lastResolvedTsAdvancedTime := cc.Now()
Expand All @@ -711,15 +712,15 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error {
zap.Duration("duration", duration),
zap.Uint64("resolvedTs", atomic.LoadUint64(&h.resolvedTS)))
}
case entry := <-h.ddlJobPuller.Output():
if entry.OpType == model.OpTypeResolved {
if entry.CRTs > atomic.LoadUint64(&h.resolvedTS) {
atomic.StoreUint64(&h.resolvedTS, entry.CRTs)
case e := <-h.ddlJobPuller.Output():
if e.OpType == model.OpTypeResolved {
if e.CRTs > atomic.LoadUint64(&h.resolvedTS) {
atomic.StoreUint64(&h.resolvedTS, e.CRTs)
lastResolvedTsAdvancedTime = cc.Now()
continue
}
}
h.add2Pending(entry.Job)
h.add2Pending(e.Job)
}
}
})
Expand Down

0 comments on commit e37eb66

Please sign in to comment.