diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 5c0b3363905..b6dd3085521 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -314,22 +314,21 @@ func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (* // parseJob unmarshal the job from "v". func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { - job := &timodel.Job{} - err := json.Unmarshal(v, job) + var job timodel.Job + err := json.Unmarshal(v, &job) if err != nil { return nil, errors.Trace(err) } - log.Debug("get new DDL job", zap.String("detail", job.String())) if !job.IsDone() { return nil, nil } // FinishedTS is only set when the job is synced, // but we can use the entry's ts here job.StartTS = startTs - // Since ddl in stateDone is not contain the FinishedTS, + // Since ddl in stateDone doesn't contain the FinishedTS, // we need to set it as the txn's commit ts. job.BinlogInfo.FinishedTS = CRTs - return job, nil + return &job, nil } func datum2Column( diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 0cd21fff0b4..73c00d9eef0 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -74,8 +74,8 @@ func NewSchemaStorage( ) (SchemaStorage, error) { var ( snap *schema.Snapshot - err error version int64 + err error ) if meta == nil { snap = schema.NewEmptySnapshot(forceReplicate) @@ -89,19 +89,14 @@ func NewSchemaStorage( return nil, errors.Trace(err) } } - if err != nil { - return nil, errors.Trace(err) - } - - schema := &schemaStorageImpl{ + return &schemaStorageImpl{ snaps: []*schema.Snapshot{snap}, resolvedTs: startTs, forceReplicate: forceReplicate, id: id, schemaVersion: version, role: role, - } - return schema, nil + }, nil } // getSnapshot returns the snapshot which currentTs is less than(but most close to) @@ -189,7 +184,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { // We use schemaVersion to check if an already-executed DDL job is processed for a second time. // Unexecuted DDL jobs should have largest schemaVersions. if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion { - log.Info("ignore foregone DDL", + log.Info("schemaStorage: ignore foregone DDL", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), zap.String("DDL", job.Query), @@ -205,26 +200,29 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { snap = schema.NewEmptySnapshot(s.forceReplicate) } if err := snap.HandleDDL(job); err != nil { - log.Error("handle DDL failed", + log.Error("schemaStorage: update snapshot by the DDL job failed", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), - zap.String("DDL", job.Query), - zap.Stringer("job", job), zap.Error(err), - zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), - zap.String("role", s.role.String())) + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.String("role", s.role.String()), + zap.Error(err)) return errors.Trace(err) } - log.Info("handle DDL", - zap.String("namespace", s.id.Namespace), - zap.String("changefeed", s.id.ID), - zap.String("DDL", job.Query), - zap.Stringer("job", job), - zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), - zap.String("role", s.role.String())) - s.snaps = append(s.snaps, snap) s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) + log.Info("schemaStorage: update snapshot by the DDL job", + zap.String("namespace", s.id.Namespace), + zap.String("changefeed", s.id.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.Uint64("schemaVersion", uint64(s.schemaVersion)), + zap.String("role", s.role.String())) return nil } diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index f6316582ad7..4a2829e5a23 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -881,10 +881,10 @@ func TestSchemaStorage(t *testing.T) { require.Nil(t, err) schemaStorage, err := NewSchemaStorage(nil, 0, false, model.DefaultChangeFeedID("dummy"), util.RoleTester, f) - require.Nil(t, err) + require.NoError(t, err) for _, job := range jobs { err := schemaStorage.HandleDDLJob(job) - require.Nil(t, err) + require.NoError(t, err) } for _, job := range jobs { diff --git a/cdc/model/owner.go b/cdc/model/owner.go index 3848541af96..3253c5faf13 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -74,7 +74,6 @@ type DDLJobEntry struct { Job *timodel.Job OpType OpType CRTs uint64 - Err error } // TaskPosition records the process information of a capture diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index fd29e8ee648..e6e534bcbab 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -711,21 +711,16 @@ func (p *processor) initDDLHandler(ctx context.Context) error { } meta := kv.GetSnapshotMeta(p.upstream.KVStorage, ddlStartTs) - f, err := filter.NewFilter(p.latestInfo.Config, "") - if err != nil { - return errors.Trace(err) - } schemaStorage, err := entry.NewSchemaStorage(meta, ddlStartTs, - forceReplicate, p.changefeedID, util.RoleProcessor, f) + forceReplicate, p.changefeedID, util.RoleProcessor, p.filter) if err != nil { return errors.Trace(err) } serverCfg := config.GetGlobalServerConfig() - changefeedID := model.DefaultChangeFeedID(p.changefeedID.ID + "_processor_ddl_puller") ddlPuller := puller.NewDDLJobPuller( - ctx, p.upstream, ddlStartTs, serverCfg, changefeedID, schemaStorage, f, + ctx, p.upstream, ddlStartTs, serverCfg, changefeedID, schemaStorage, p.filter, ) p.ddlHandler.r = &ddlHandler{puller: ddlPuller, schemaStorage: schemaStorage} return nil @@ -1007,10 +1002,6 @@ func (d *ddlHandler) Run(ctx context.Context, _ ...chan<- error) error { if jobEntry.OpType == model.OpTypeResolved { d.schemaStorage.AdvanceResolvedTs(jobEntry.CRTs) } - err := jobEntry.Err - if err != nil { - return errors.Trace(err) - } } }) return g.Wait() diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 2492e1a1153..8f62fb021ff 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/benbjohnson/clock" "github.com/pingcap/errors" "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/pkg/kv" @@ -32,6 +31,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/puller/memorysorter" + "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" @@ -120,25 +120,26 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model skip, err := p.handleJob(job) if err != nil { return cerror.WrapError(cerror.ErrHandleDDLFailed, - err, job.String(), job.Query, job.StartTS, job.StartTS) + err, job.Query, job.StartTS, job.StartTS) } - log.Info("handle ddl job", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.String("query", job.Query), - zap.Stringer("job", job), - zap.Uint64("startTs", job.StartTS), - zap.Bool("skip", skip)) if skip { return nil } + log.Info("a new ddl job is received", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.String("query", job.Query), + zap.Any("job", job)) } jobEntry := &model.DDLJobEntry{ Job: job, OpType: ddlRawKV.OpType, CRTs: ddlRawKV.CRTs, - Err: err, } select { case <-ctx.Done(): @@ -170,7 +171,9 @@ func (p *ddlJobPullerImpl) run(ctx context.Context) error { func (p *ddlJobPullerImpl) runMultiplexing(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { return p.multiplexingPuller.Run(ctx) }) + eg.Go(func() error { + return p.multiplexingPuller.Run(ctx) + }) eg.Go(func() error { for { var ddlRawKV *model.RawKVEntry @@ -384,6 +387,21 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { return false, nil } + if job.BinlogInfo.FinishedTS <= p.getResolvedTs() || + job.BinlogInfo.SchemaVersion <= p.schemaVersion { + log.Info("ddl job finishedTs less than puller resolvedTs,"+ + "discard the ddl job", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.String("query", job.Query), + zap.Uint64("pullerResolvedTs", p.getResolvedTs())) + return true, nil + } + defer func() { if skip && err == nil { log.Info("ddl job schema or table does not match, discard it", @@ -392,7 +410,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { zap.String("schema", job.SchemaName), zap.String("table", job.TableName), zap.String("query", job.Query), - zap.Stringer("job", job)) + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS)) } if err != nil { log.Warn("handle ddl job failed", @@ -401,35 +420,22 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { zap.String("schema", job.SchemaName), zap.String("table", job.TableName), zap.String("query", job.Query), - zap.Stringer("job", job), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), zap.Error(err)) } }() - if job.BinlogInfo.FinishedTS <= p.getResolvedTs() || - job.BinlogInfo.SchemaVersion <= p.schemaVersion { - log.Info("ddl job finishedTs less than puller resolvedTs,"+ - "discard the ddl job", - zap.Uint64("jobFinishedTS", job.BinlogInfo.FinishedTS), - zap.Uint64("pullerResolvedTs", p.getResolvedTs()), - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.String("query", job.Query), - zap.String("job", job.String())) - return true, nil - } - snap := p.schemaStorage.GetLastSnapshot() - if err := snap.FillSchemaName(job); err != nil { + if err = snap.FillSchemaName(job); err != nil { log.Info("failed to fill schema name for ddl job", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.String("schema", job.SchemaName), zap.String("table", job.TableName), zap.String("query", job.Query), - zap.Stringer("job", job), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), zap.Error(err)) discard, fErr := p.filter. ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query) @@ -446,16 +452,18 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { case timodel.ActionRenameTables: skip, err = p.handleRenameTables(job) if err != nil { + log.Warn("handle rename tables ddl job failed", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Error(err)) return true, errors.Trace(err) } case timodel.ActionRenameTable: - log.Info("rename table ddl job", - zap.Int64("newSchemaID", job.SchemaID), - zap.String("newSchemaName", job.SchemaName), - zap.Int64("tableID", job.TableID), - zap.String("oldTableName", job.BinlogInfo.TableInfo.Name.O), - zap.String("newTableName", job.TableName), - ) oldTable, ok := snap.PhysicalTableByID(job.TableID) if !ok { // 1. If we can not find the old table, and the new table name is in filter rule, return error. @@ -467,38 +475,44 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { if !discard { return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } - skip = true - } else { - log.Info("rename table ddl job", - zap.String("oldTableName", oldTable.TableName.Table), - zap.String("oldSchemaName", oldTable.TableName.Schema), + log.Warn("skip rename table ddl since cannot found the old table info", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.String("query", job.Query), - zap.Stringer("job", job)) - // since we can find the old table, we must can find the old schema. - // 2. If we can find the preTableInfo, we filter it by the old table name. - skipByOldTableName, err := p.filter.ShouldDiscardDDL(job.StartTS, - job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.Query) - if err != nil { - return true, errors.Trace(err) - } - skipByNewTableName, err := p.filter.ShouldDiscardDDL(job.StartTS, - job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query) - if err != nil { - return true, errors.Trace(err) - } - // 3. If its old table name is not in filter rule, and its new table name in filter rule, return error. - if skipByOldTableName && !skipByNewTableName { + zap.Int64("tableID", job.TableID), + zap.Int64("newSchemaID", job.SchemaID), + zap.String("newSchemaName", job.SchemaName), + zap.String("oldTableName", job.BinlogInfo.TableInfo.Name.O), + zap.String("newTableName", job.TableName)) + return true, nil + } + // since we can find the old table, it must be able to find the old schema. + // 2. If we can find the preTableInfo, we filter it by the old table name. + skipByOldTableName, err := p.filter.ShouldDiscardDDL(job.StartTS, + job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.Query) + if err != nil { + return true, errors.Trace(err) + } + skipByNewTableName, err := p.filter.ShouldDiscardDDL(job.StartTS, + job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query) + if err != nil { + return true, errors.Trace(err) + } + // 3. If its old table name is not in filter rule, and its new table name in filter rule, return error. + if skipByOldTableName { + if !skipByNewTableName { return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } - if skipByOldTableName && skipByNewTableName { - skip = true - return true, nil - } + return true, nil } + + log.Info("ddl puller receive rename table ddl job", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS)) default: // nil means it is a schema ddl job, it's no need to fill the table name. if job.BinlogInfo.TableInfo != nil { @@ -517,20 +531,11 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { err = p.schemaStorage.HandleDDLJob(job) if err != nil { - log.Error("handle ddl job failed", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.String("query", job.Query), - zap.String("schema", job.SchemaName), - zap.String("table", job.BinlogInfo.TableInfo.Name.O), - zap.String("job", job.String()), - zap.Error(err)) return true, errors.Trace(err) } p.setResolvedTs(job.BinlogInfo.FinishedTS) p.schemaVersion = job.BinlogInfo.SchemaVersion - return false, nil } @@ -654,9 +659,6 @@ type ddlPullerImpl struct { cancel context.CancelFunc changefeedID model.ChangeFeedID - - clock clock.Clock - lastResolvedTsAdvancedTime time.Time } // NewDDLPuller return a puller for DDL Event @@ -680,48 +682,40 @@ func NewDDLPuller(ctx context.Context, ddlJobPuller: puller, resolvedTS: startTs, cancel: func() {}, - clock: clock.New(), changefeedID: changefeed, } } -func (h *ddlPullerImpl) handleDDLJobEntry(jobEntry *model.DDLJobEntry) error { - if jobEntry.OpType == model.OpTypeResolved { - if jobEntry.CRTs > atomic.LoadUint64(&h.resolvedTS) { - h.lastResolvedTsAdvancedTime = h.clock.Now() - atomic.StoreUint64(&h.resolvedTS, jobEntry.CRTs) - } - return nil - } - job, err := jobEntry.Job, jobEntry.Err - if err != nil { - return errors.Trace(err) - } +func (h *ddlPullerImpl) addToPending(job *timodel.Job) { if job == nil { - return nil + return } - log.Info("[ddl] handleDDLJobEntry", zap.String("job", job.String())) if job.ID == h.lastDDLJobID { log.Warn("ignore duplicated DDL job", zap.String("namespace", h.changefeedID.Namespace), zap.String("changefeed", h.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), - zap.Int64("jobID", job.ID), - zap.Any("job", job)) - return nil + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Int64("jobID", job.ID)) + return } - log.Info("receive new ddl job", - zap.String("namespace", h.changefeedID.Namespace), - zap.String("changefeed", h.changefeedID.ID), - zap.String("query", job.Query), - zap.Int64("jobID", job.ID), - zap.Any("job", job)) - h.mu.Lock() defer h.mu.Unlock() h.pendingDDLJobs = append(h.pendingDDLJobs, job) h.lastDDLJobID = job.ID - return nil + log.Info("ddl puller receives new pending job", + zap.String("namespace", h.changefeedID.Namespace), + zap.String("changefeed", h.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Int64("jobID", job.ID)) } // Run the ddl puller to receive DDL events @@ -733,15 +727,16 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error { g.Go(func() error { return h.ddlJobPuller.Run(ctx) }) g.Go(func() error { - ticker := h.clock.Ticker(ddlPullerStuckWarnDuration) + cc := clock.New() + ticker := cc.Ticker(ddlPullerStuckWarnDuration) defer ticker.Stop() - h.lastResolvedTsAdvancedTime = h.clock.Now() + lastResolvedTsAdvancedTime := cc.Now() for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - duration := h.clock.Since(h.lastResolvedTsAdvancedTime) + duration := cc.Since(lastResolvedTsAdvancedTime) if duration > ddlPullerStuckWarnDuration { log.Warn("ddl puller resolved ts has not advanced", zap.String("namespace", h.changefeedID.Namespace), @@ -750,9 +745,14 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error { zap.Uint64("resolvedTs", atomic.LoadUint64(&h.resolvedTS))) } case e := <-h.ddlJobPuller.Output(): - if err := h.handleDDLJobEntry(e); err != nil { - return errors.Trace(err) + if e.OpType == model.OpTypeResolved { + if e.CRTs > atomic.LoadUint64(&h.resolvedTS) { + atomic.StoreUint64(&h.resolvedTS, e.CRTs) + lastResolvedTsAdvancedTime = cc.Now() + continue + } } + h.addToPending(e.Job) } } }) diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index ae7eb91b642..67b5e16bb69 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -23,9 +23,7 @@ import ( "sync" "sync/atomic" "testing" - "time" - "github.com/benbjohnson/clock" "github.com/pingcap/errors" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/parser/model" @@ -734,8 +732,6 @@ func TestResolvedTsStuck(t *testing.T) { ) require.Nil(t, err) p := NewDDLPuller(ctx, up, startTs, ctx.ChangefeedVars().ID, schemaStorage, f) - mockClock := clock.NewMock() - p.(*ddlPullerImpl).clock = mockClock p.(*ddlPullerImpl).ddlJobPuller, _ = newMockDDLJobPuller(t, mockPuller, false) var wg sync.WaitGroup @@ -760,18 +756,6 @@ func TestResolvedTsStuck(t *testing.T) { waitResolvedTsGrowing(t, p, 30) require.Equal(t, 0, logs.Len()) - mockClock.Add(2 * ddlPullerStuckWarnDuration) - for i := 0; i < 20; i++ { - mockClock.Add(time.Second) - if logs.Len() > 0 { - break - } - time.Sleep(10 * time.Millisecond) - if i == 19 { - t.Fatal("warning log not printed") - } - } - mockPuller.appendResolvedTs(40) waitResolvedTsGrowing(t, p, 40) } diff --git a/errors.toml b/errors.toml index ac264a03eac..22d605f878e 100755 --- a/errors.toml +++ b/errors.toml @@ -348,7 +348,7 @@ get tikv grpc context failed ["CDC:ErrHandleDDLFailed"] error = ''' -handle ddl failed, job: %s, query: %s, startTs: %d. If you want to skip this DDL and continue with replication, you can manually execute this DDL downstream. Afterwards, add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration. +handle ddl failed, query: %s, startTs: %d. If you want to skip this DDL and continue with replication, you can manually execute this DDL downstream. Afterwards, add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration. ''' ["CDC:ErrIllegalSorterParameter"] diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 28265aeb043..2d185396b4e 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -935,7 +935,7 @@ var ( ) ErrHandleDDLFailed = errors.Normalize( - "handle ddl failed, job: %s, query: %s, startTs: %d. "+ + "handle ddl failed, query: %s, startTs: %d. "+ "If you want to skip this DDL and continue with replication, "+ "you can manually execute this DDL downstream. Afterwards, "+ "add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.",