Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed(ticdc): refactor initialize the ddl puller and schema storage related steps #10199

Merged
merged 9 commits into from
Dec 6, 2023
Merged
15 changes: 5 additions & 10 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,7 @@ func TestCreateSnapFromMeta(t *testing.T) {
tk.MustExec("create table test2.simple_test5 (a bigint)")
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
require.Nil(t, err)
meta := kv.GetSnapshotMeta(store, ver.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f)
Expand Down Expand Up @@ -729,14 +728,12 @@ func TestExplicitTables(t *testing.T) {
tk.MustExec("create table test2.simple_test5 (a varchar(20))")
ver2, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
require.Nil(t, err)
meta1 := kv.GetSnapshotMeta(store, ver1.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap1, err := schema.NewSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */, f)
require.Nil(t, err)
meta2, err := kv.GetSnapshotMeta(store, ver2.Ver)
require.Nil(t, err)
meta2 := kv.GetSnapshotMeta(store, ver2.Ver)
snap2, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */, f)
require.Nil(t, err)
snap3, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */, f)
Expand Down Expand Up @@ -892,8 +889,7 @@ func TestSchemaStorage(t *testing.T) {

for _, job := range jobs {
ts := job.BinlogInfo.FinishedTS
meta, err := kv.GetSnapshotMeta(store, ts)
require.Nil(t, err)
meta := kv.GetSnapshotMeta(store, ts)
snapFromMeta, err := schema.NewSnapshotFromMeta(meta, ts, false, f)
require.Nil(t, err)
snapFromSchemaStore, err := schemaStorage.GetSnapshot(ctx, ts)
Expand Down Expand Up @@ -973,8 +969,7 @@ func TestHandleKey(t *testing.T) {
tk.MustExec("create table test.simple_test3 (id bigint, age int)")
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
require.Nil(t, err)
meta := kv.GetSnapshotMeta(store, ver.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f)
Expand Down
5 changes: 1 addition & 4 deletions cdc/entry/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ func VerifyTables(
eligibleTables []model.TableName,
err error,
) {
meta, err := kv.GetSnapshotMeta(storage, startTs)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
meta := kv.GetSnapshotMeta(storage, startTs)
snap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, false /* explicitTables */, f)
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand Down
5 changes: 2 additions & 3 deletions cdc/kv/store_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import (
)

// GetSnapshotMeta returns tidb meta information
// TODO: Simplify the signature of this function
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) (*meta.Meta, error) {
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) *meta.Meta {
snapshot := tiStore.GetSnapshot(tidbkv.NewVersion(ts))
return meta.NewSnapshotMeta(snapshot), nil
return meta.NewSnapshotMeta(snapshot)
}

// CreateTiStore creates a tikv storage client
Expand Down
33 changes: 10 additions & 23 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,12 @@ type changefeed struct {
observerLastTick *atomic.Time

newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error)
) puller.DDLPuller

newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
Expand Down Expand Up @@ -209,13 +208,12 @@ func newChangefeed4Test(
cfStatus *model.ChangeFeedStatus,
cfstateManager FeedStateManager, up *upstream.Upstream,
newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error),
) puller.DDLPuller,
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
Expand Down Expand Up @@ -331,11 +329,10 @@ func (c *changefeed) handleWarning(err error) {
})
}

func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context,
cfInfo *model.ChangeFeedInfo,
checkpointTs uint64,
func (c *changefeed) checkStaleCheckpointTs(
ctx cdcContext.Context, checkpointTs uint64,
) error {
if cfInfo.NeedBlockGC() {
if c.latestInfo.NeedBlockGC() {
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrStartTsBeforeGC.FastGen("InjectChangefeedFastFailError")
})
Expand All @@ -355,7 +352,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
preCheckpointTs := c.latestInfo.GetCheckpointTs(c.latestStatus)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, c.latestInfo, preCheckpointTs); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil {
return 0, 0, errors.Trace(err)
}

Expand Down Expand Up @@ -385,8 +382,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand Down Expand Up @@ -568,7 +564,7 @@ LOOP2:
}
c.barriers.Update(finishBarrier, c.latestInfo.GetTargetTs())

filter, err := filter.NewFilter(c.latestInfo.Config, "")
f, err := filter.NewFilter(c.latestInfo.Config, "")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -577,7 +573,7 @@ LOOP2:
ddlStartTs,
c.latestInfo.Config,
c.id,
filter)
f)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -604,16 +600,7 @@ LOOP2:
})
c.ddlSink.run(cancelCtx)

c.ddlPuller, err = c.newDDLPuller(cancelCtx,
c.latestInfo.Config,
c.upstream, ddlStartTs,
c.id,
c.schema,
filter)
if err != nil {
return errors.Trace(err)
}

c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, f)
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down
9 changes: 4 additions & 5 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,13 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
state.Info, state.Status, newFeedStateManager(up, state), up,
// new ddl puller
func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error) {
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
) puller.DDLPuller {
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
Expand Down Expand Up @@ -641,7 +640,7 @@ func TestBarrierAdvance(t *testing.T) {
if i == 1 {
cf.ddlManager.ddlResolvedTs += 10
}
_, barrier, err := cf.ddlManager.tick(ctx, state.Status.CheckpointTs, nil)
_, barrier, err := cf.ddlManager.tick(ctx, state.Status.CheckpointTs)

require.Nil(t, err)

Expand All @@ -668,7 +667,7 @@ func TestBarrierAdvance(t *testing.T) {

// Then the last tick barrier must be advanced correctly.
cf.ddlManager.ddlResolvedTs += 1000000000000
_, barrier, err = cf.ddlManager.tick(ctx, state.Status.CheckpointTs+10, nil)
_, barrier, err = cf.ddlManager.tick(ctx, state.Status.CheckpointTs+10)
require.Nil(t, err)
err = cf.handleBarrier(ctx, state.Info, state.Status, barrier)

Expand Down
Loading
Loading