Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller(ticdc): refactor ddl puller remove verbose code and reduce logs #10297

Merged
merged 17 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,22 +314,21 @@ func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (*

// parseJob unmarshal the job from "v".
func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
job := &timodel.Job{}
err := json.Unmarshal(v, job)
var job timodel.Job
err := json.Unmarshal(v, &job)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get new DDL job", zap.String("detail", job.String()))
if !job.IsDone() {
return nil, nil
}
// FinishedTS is only set when the job is synced,
// but we can use the entry's ts here
job.StartTS = startTs
// Since ddl in stateDone is not contain the FinishedTS,
// Since ddl in stateDone doesn't contain the FinishedTS,
// we need to set it as the txn's commit ts.
job.BinlogInfo.FinishedTS = CRTs
return job, nil
return &job, nil
}

func datum2Column(
Expand Down
42 changes: 20 additions & 22 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func NewSchemaStorage(
) (SchemaStorage, error) {
var (
snap *schema.Snapshot
err error
version int64
err error
Copy link
Contributor

@CharlesCheung96 CharlesCheung96 Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be removed, otherwise the snap become not used, report by the linter.

)
if meta == nil {
snap = schema.NewEmptySnapshot(forceReplicate)
Expand All @@ -89,19 +89,14 @@ func NewSchemaStorage(
return nil, errors.Trace(err)
}
}
if err != nil {
return nil, errors.Trace(err)
}

schema := &schemaStorageImpl{
return &schemaStorageImpl{
snaps: []*schema.Snapshot{snap},
resolvedTs: startTs,
forceReplicate: forceReplicate,
id: id,
schemaVersion: version,
role: role,
}
return schema, nil
}, nil
}

// getSnapshot returns the snapshot which currentTs is less than(but most close to)
Expand Down Expand Up @@ -189,7 +184,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
// Unexecuted DDL jobs should have largest schemaVersions.
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
log.Info("ignore foregone DDL",
log.Info("schemaStorage: ignore foregone DDL",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
Expand All @@ -205,26 +200,29 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
snap = schema.NewEmptySnapshot(s.forceReplicate)
}
if err := snap.HandleDDL(job); err != nil {
log.Error("handle DDL failed",
log.Error("schemaStorage: update snapshot by the DDL job failed",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.Stringer("job", job), zap.Error(err),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.String("role", s.role.String()))
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("role", s.role.String()),
zap.Error(err))
return errors.Trace(err)
}
log.Info("handle DDL",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("DDL", job.Query),
zap.Stringer("job", job),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.String("role", s.role.String()))

s.snaps = append(s.snaps, snap)
s.schemaVersion = job.BinlogInfo.SchemaVersion
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
log.Info("schemaStorage: update snapshot by the DDL job",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.Uint64("schemaVersion", uint64(s.schemaVersion)),
zap.String("role", s.role.String()))
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,10 +881,10 @@ func TestSchemaStorage(t *testing.T) {
require.Nil(t, err)

schemaStorage, err := NewSchemaStorage(nil, 0, false, model.DefaultChangeFeedID("dummy"), util.RoleTester, f)
require.Nil(t, err)
require.NoError(t, err)
for _, job := range jobs {
err := schemaStorage.HandleDDLJob(job)
require.Nil(t, err)
require.NoError(t, err)
}

for _, job := range jobs {
Expand Down
1 change: 0 additions & 1 deletion cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type DDLJobEntry struct {
Job *timodel.Job
OpType OpType
CRTs uint64
Err error
}

// TaskPosition records the process information of a capture
Expand Down
13 changes: 2 additions & 11 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,21 +711,16 @@ func (p *processor) initDDLHandler(ctx context.Context) error {
}

meta := kv.GetSnapshotMeta(p.upstream.KVStorage, ddlStartTs)
f, err := filter.NewFilter(p.latestInfo.Config, "")
if err != nil {
return errors.Trace(err)
}
schemaStorage, err := entry.NewSchemaStorage(meta, ddlStartTs,
forceReplicate, p.changefeedID, util.RoleProcessor, f)
forceReplicate, p.changefeedID, util.RoleProcessor, p.filter)
if err != nil {
return errors.Trace(err)
}

serverCfg := config.GetGlobalServerConfig()

changefeedID := model.DefaultChangeFeedID(p.changefeedID.ID + "_processor_ddl_puller")
ddlPuller := puller.NewDDLJobPuller(
ctx, p.upstream, ddlStartTs, serverCfg, changefeedID, schemaStorage, f,
ctx, p.upstream, ddlStartTs, serverCfg, changefeedID, schemaStorage, p.filter,
)
p.ddlHandler.r = &ddlHandler{puller: ddlPuller, schemaStorage: schemaStorage}
return nil
Expand Down Expand Up @@ -1007,10 +1002,6 @@ func (d *ddlHandler) Run(ctx context.Context, _ ...chan<- error) error {
if jobEntry.OpType == model.OpTypeResolved {
d.schemaStorage.AdvanceResolvedTs(jobEntry.CRTs)
}
err := jobEntry.Err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this error won't be checked, because it's already checked when the error happens.

if err != nil {
return errors.Trace(err)
}
}
})
return g.Wait()
Expand Down
Loading
Loading