diff --git a/cdcv2/metadata/metadata.go b/cdcv2/metadata/metadata.go index dbc6efdd234..06f512c19bb 100644 --- a/cdcv2/metadata/metadata.go +++ b/cdcv2/metadata/metadata.go @@ -24,7 +24,11 @@ import ( // Querier is used to query information from metadata storage. type Querier interface { // GetChangefeeds queries some or all changefeeds. - GetChangefeeds(...ChangefeedUUID) ([]*ChangefeedInfo, error) + GetChangefeed(...ChangefeedUUID) ([]*ChangefeedInfo, error) + // GetChangefeedState queries some or all changefeed states. + GetChangefeedState(...ChangefeedUUID) ([]*ChangefeedState, error) + // GetChangefeedProgress queries some or all changefeed progresses. + GetChangefeedProgress(...ChangefeedUUID) (map[ChangefeedUUID]ChangefeedProgress, error) } // -------------------- About owner schedule -------------------- // @@ -78,7 +82,10 @@ type CaptureObservation interface { // OwnerChanges fetch owner modifications. OwnerChanges() <-chan ScheduledChangefeed - // PostOwnerRemoved when an owner exits, inform the metadata storage. + // OnOwnerLaunched create an owner observation for a changefeed owner. + OnOwnerLaunched(cf ChangefeedUUID) OwnerObservation + + // PostOwnerRemoved inform the metadata storage when an owner exits. PostOwnerRemoved(cf ChangefeedUUID, taskPosition ChangefeedProgress) error } @@ -116,7 +123,7 @@ type ControllerObservation interface { // All intrefaces are thread-safe and shares one same Context. type OwnerObservation interface { // Self returns the changefeed info of the owner. - Self() *ChangefeedInfo + Self() ChangefeedUUID // UpdateChangefeed updates changefeed metadata, must be called on a paused one. UpdateChangefeed(*ChangefeedInfo) error diff --git a/cdcv2/metadata/model.go b/cdcv2/metadata/model.go index 67f73f190b0..10c35cfb632 100644 --- a/cdcv2/metadata/model.go +++ b/cdcv2/metadata/model.go @@ -148,16 +148,16 @@ func (s SchedState) toString() string { } // nolint -func (s SchedState) fromString(str string) error { +func (s *SchedState) fromString(str string) error { switch str { case "Launched": - s = SchedLaunched + *s = SchedLaunched case "Removing": - s = SchedRemoving + *s = SchedRemoving case "Removed": - s = SchedRemoved + *s = SchedRemoved default: - s = SchedInvalid + *s = SchedInvalid return errors.New("unreachable") } return nil diff --git a/cdcv2/metadata/sql/client.go b/cdcv2/metadata/sql/client.go index 39155823f64..885f7bdd8a1 100644 --- a/cdcv2/metadata/sql/client.go +++ b/cdcv2/metadata/sql/client.go @@ -26,6 +26,65 @@ import ( "gorm.io/gorm" ) +const defaultMaxExecTime = 5 * time.Second + +type client[T TxnContext] interface { + LeaderChecker[T] + checker[T] +} + +type clientImpl[T TxnContext] struct { + LeaderChecker[T] + checker[T] + + options *clientOptions +} + +func newClient[T TxnContext]( + leaderChecker LeaderChecker[T], + checker checker[T], + opts ...ClientOptionFunc, +) client[T] { + options := setClientOptions(opts...) + + return &clientImpl[T]{ + LeaderChecker: leaderChecker, + checker: checker, + options: options, + } +} + +func (c *clientImpl[T]) withTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + if c.options.maxExecTime <= 0 { + return ctx, nil + } + return context.WithTimeout(ctx, c.options.maxExecTime) +} + +func (c *clientImpl[T]) Txn(ctx context.Context, fn TxnAction[T]) error { + ctx, cancel := c.withTimeout(ctx) + if cancel != nil { + defer cancel() + } + return c.checker.Txn(ctx, fn) +} + +func (c *clientImpl[T]) TxnWithOwnerLock(ctx context.Context, uuid metadata.ChangefeedUUID, fn TxnAction[T]) error { + ctx, cancel := c.withTimeout(ctx) + if cancel != nil { + defer cancel() + } + return c.checker.TxnWithOwnerLock(ctx, uuid, fn) +} + +func (c *clientImpl[T]) TxnWithLeaderLock(ctx context.Context, leaderID string, fn func(T) error) error { + ctx, cancel := c.withTimeout(ctx) + if cancel != nil { + defer cancel() + } + return c.LeaderChecker.TxnWithLeaderLock(ctx, leaderID, fn) +} + // TxnContext is a type set that can be used as the transaction context. type TxnContext interface { *gorm.DB | *sql.Tx @@ -55,6 +114,12 @@ type LeaderChecker[T TxnContext] interface { type checker[T TxnContext] interface { Txn(ctx context.Context, fn TxnAction[T]) error TxnWithOwnerLock(ctx context.Context, uuid metadata.ChangefeedUUID, fn TxnAction[T]) error + + upstreamClient[T] + changefeedInfoClient[T] + changefeedStateClient[T] + scheduleClient[T] + progressClient[T] } // TODO(CharlesCheung): only update changed fields to reduce the pressure on io and database. @@ -85,6 +150,7 @@ type changefeedStateClient[T TxnContext] interface { queryChangefeedStates(tx T) ([]*ChangefeedStateDO, error) queryChangefeedStatesByUpdateAt(tx T, lastUpdateAt time.Time) ([]*ChangefeedStateDO, error) queryChangefeedStateByUUID(tx T, uuid uint64) (*ChangefeedStateDO, error) + queryChangefeedStateByUUIDs(tx T, uuid ...uint64) ([]*ChangefeedStateDO, error) queryChangefeedStateByUUIDWithLock(tx T, uuid uint64) (*ChangefeedStateDO, error) } @@ -111,16 +177,29 @@ type progressClient[T TxnContext] interface { queryProgressByCaptureIDsWithLock(tx T, ids []string) ([]*ProgressDO, error) } -type client[T TxnContext] interface { - checker[T] - upstreamClient[T] - changefeedInfoClient[T] - changefeedStateClient[T] - scheduleClient[T] - progressClient[T] +type clientOptions struct { + maxExecTime time.Duration } -const defaultMaxExecTime = 5 * time.Second +func setClientOptions(opts ...ClientOptionFunc) *clientOptions { + o := &clientOptions{ + maxExecTime: defaultMaxExecTime, + } + for _, opt := range opts { + opt(o) + } + return o +} + +// ClientOptionFunc is the option function for the client. +type ClientOptionFunc func(*clientOptions) + +// WithMaxExecTime sets the maximum execution time of the client. +func WithMaxExecTime(d time.Duration) ClientOptionFunc { + return func(o *clientOptions) { + o.maxExecTime = d + } +} // TODO(CharlesCheung): implement a cache layer to reduce the pressure on io and database. // nolint:unused diff --git a/cdcv2/metadata/sql/client_orm.go b/cdcv2/metadata/sql/client_orm.go index 297f7feaab8..d8643c53242 100644 --- a/cdcv2/metadata/sql/client_orm.go +++ b/cdcv2/metadata/sql/client_orm.go @@ -27,9 +27,8 @@ import ( ) var ( - _ client[*gorm.DB] = &ormClient{} + _ checker[*gorm.DB] = &ormClient{} - _ checker[*gorm.DB] = &ormClient{} _ upstreamClient[*gorm.DB] = &ormClient{} _ changefeedInfoClient[*gorm.DB] = &ormClient{} _ changefeedStateClient[*gorm.DB] = &ormClient{} @@ -56,16 +55,14 @@ func (c *ormClient) Txn(ctx context.Context, fn ormTxnAction) error { } // TxnWithOwnerLock executes the given transaction action in a transaction with owner lock. -func (c *ormClient) TxnWithOwnerLock(ctx context.Context, uuid uint64, fn ormTxnAction) error { +func (c *ormClient) TxnWithOwnerLock(ctx context.Context, uuid metadata.ChangefeedUUID, fn ormTxnAction) error { return c.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - var pr ProgressDO - ret := tx.Select("owner"). - // TODO(charledCheung): use a variable to replace the hard-coded owner state. - Where("changefeed_uuid = ? and owner = ? and owner_state != removed", uuid, c.selfID). + sc := &ScheduleDO{} + ret := tx.Where("changefeed_uuid = ? and owner = ? and owner_state != ?", uuid, c.selfID, metadata.SchedRemoved). Clauses(clause.Locking{ Strength: "SHARE", Table: clause.Table{Name: clause.CurrentTable}, - }).Limit(1).Find(&pr) + }).Limit(1).Find(&sc) if err := handleSingleOpErr(ret, 1, "TxnWithOwnerLock"); err != nil { return errors.Trace(err) } @@ -229,7 +226,8 @@ func (c *ormClient) queryChangefeedInfosByUUIDs(tx *gorm.DB, uuids ...uint64) ([ infos := []*ChangefeedInfoDO{} ret := tx.Where("uuid in (?)", uuids).Find(&infos) if err := handleSingleOpErr(ret, int64(len(uuids)), "QueryChangefeedInfosByUUIDs"); err != nil { - return nil, errors.Trace(err) + // TODO: optimize the behavior when some uuids are not found. + return infos, errors.Trace(err) } return infos, nil } @@ -326,6 +324,18 @@ func (c *ormClient) queryChangefeedStateByUUID(tx *gorm.DB, uuid uint64) (*Chang return state, nil } +// queryChangefeedStateByUUIDs implements the changefeedStateClient interface. +// nolint:unused +func (c *ormClient) queryChangefeedStateByUUIDs(tx *gorm.DB, uuids ...uint64) ([]*ChangefeedStateDO, error) { + states := []*ChangefeedStateDO{} + ret := tx.Where("changefeed_uuid in (?)", uuids).Find(&states) + if err := handleSingleOpErr(ret, int64(len(uuids)), "QueryChangefeedInfosByUUIDs"); err != nil { + // TODO: optimize the behavior when some uuids are not found. + return states, errors.Trace(err) + } + return states, nil +} + // queryChangefeedStateByUUIDWithLock implements the changefeedStateClient interface. // nolint:unused func (c *ormClient) queryChangefeedStateByUUIDWithLock(tx *gorm.DB, uuid uint64) (*ChangefeedStateDO, error) { diff --git a/cdcv2/metadata/sql/observation.go b/cdcv2/metadata/sql/observation.go index e703cec1e47..e1cdba99fb3 100644 --- a/cdcv2/metadata/sql/observation.go +++ b/cdcv2/metadata/sql/observation.go @@ -23,7 +23,7 @@ import ( "sync" "time" - "github.com/ngaut/log" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdcv2/metadata" ormUtil "github.com/pingcap/tiflow/engine/pkg/orm" @@ -37,6 +37,7 @@ import ( ) var ( + _ metadata.Querier = &CaptureOb[*gorm.DB]{} _ metadata.CaptureObservation = &CaptureOb[*gorm.DB]{} _ metadata.ControllerObservation = &ControllerOb[*gorm.DB]{} _ metadata.OwnerObservation = &OwnerOb[*gorm.DB]{} @@ -47,11 +48,11 @@ type CaptureOb[T TxnContext] struct { // election related fields. metadata.Elector selfInfo *model.CaptureInfo - // TODO(CharlesCheung): handle ctx properly. - egCtx context.Context + // TODO(CharlesCheung): handle ctx properly. + egCtx context.Context client client[T] - leaderChecker LeaderChecker[T] + uuidGenerator uuidGenerator tasks *entity[metadata.ChangefeedUUID, *ScheduleDO] @@ -62,7 +63,7 @@ type CaptureOb[T TxnContext] struct { // NewCaptureObservation creates a capture observation. func NewCaptureObservation( - backendDB *sql.DB, selfInfo *model.CaptureInfo, + backendDB *sql.DB, selfInfo *model.CaptureInfo, opts ...ClientOptionFunc, ) (*CaptureOb[*gorm.DB], error) { db, err := ormUtil.NewGormDB(backendDB, "mysql") if err != nil { @@ -72,14 +73,15 @@ func NewCaptureObservation( if err != nil { return nil, errors.Trace(err) } + ormClient := NewORMClient(selfInfo.ID, db) if err := AutoMigrate(db); err != nil { return nil, errors.Trace(err) } return &CaptureOb[*gorm.DB]{ selfInfo: selfInfo, - client: NewORMClient(selfInfo.ID, db), - leaderChecker: electionStorage, + client: newClient(electionStorage, ormClient, opts...), + uuidGenerator: NewUUIDGenerator("orm", db), tasks: newEntity[metadata.ChangefeedUUID, *ScheduleDO](defaultMaxExecTime), Elector: metadata.NewElector(selfInfo, electionStorage), ownerChanges: chann.NewAutoDrainChann[metadata.ScheduledChangefeed](), @@ -136,7 +138,7 @@ func (c *CaptureOb[T]) onTakeControl( controllerCallback func(context.Context, metadata.ControllerObservation) error, ) func(context.Context) error { return func(ctx context.Context) error { - controllerOb := newControllerObservation(c.leaderChecker, c.client, c.selfInfo, c.getAllCaptures) + controllerOb := newControllerObservation(c.client, c.uuidGenerator, c.selfInfo, c.getAllCaptures) eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { @@ -203,6 +205,11 @@ func (c *CaptureOb[T]) OwnerChanges() <-chan metadata.ScheduledChangefeed { return c.ownerChanges.Out() } +// OnOwnerLaunched is called when the owner of a changefeed is launched. +func (c *CaptureOb[T]) OnOwnerLaunched(cf metadata.ChangefeedUUID) metadata.OwnerObservation { + return newOwnerObservation[T](c, cf) +} + // PostOwnerRemoved is called when the owner of a changefeed is removed. func (c *CaptureOb[T]) PostOwnerRemoved(cf metadata.ChangefeedUUID, taskPosition metadata.ChangefeedProgress) error { sc := c.tasks.get(cf) @@ -222,10 +229,14 @@ func (c *CaptureOb[T]) ProcessorChanges() <-chan metadata.ScheduledChangefeed { return c.processorChanges.Out() } -// GetChangefeeds returns the changefeeds with the given UUIDs. -func (c *CaptureOb[T]) GetChangefeeds(cfs ...metadata.ChangefeedUUID) (infos []*metadata.ChangefeedInfo, err error) { +// GetChangefeed returns the changefeeds with the given UUIDs. +func (c *CaptureOb[T]) GetChangefeed(cfs ...metadata.ChangefeedUUID) (infos []*metadata.ChangefeedInfo, err error) { var cfDOs []*ChangefeedInfoDO err = c.client.Txn(c.egCtx, func(tx T) error { + if len(cfs) == 0 { + cfDOs, err = c.client.queryChangefeedInfos(tx) + return err + } cfDOs, err = c.client.queryChangefeedInfosByUUIDs(tx, cfs...) return err }) @@ -239,6 +250,77 @@ func (c *CaptureOb[T]) GetChangefeeds(cfs ...metadata.ChangefeedUUID) (infos []* return } +// GetChangefeedState returns the state of the changefeed with the given UUID. +func (c *CaptureOb[T]) GetChangefeedState(cfs ...metadata.ChangefeedUUID) (states []*metadata.ChangefeedState, err error) { + var cfDOs []*ChangefeedStateDO + err = c.client.Txn(c.egCtx, func(tx T) error { + if len(cfs) == 0 { + cfDOs, err = c.client.queryChangefeedStates(tx) + return err + } + cfDOs, err = c.client.queryChangefeedStateByUUIDs(tx, cfs...) + return err + }) + if err != nil { + return nil, errors.Trace(err) + } + + for _, cfDO := range cfDOs { + states = append(states, &cfDO.ChangefeedState) + } + return +} + +// GetChangefeedProgress returns the progress of the changefeed with the given UUID. +func (c *CaptureOb[T]) GetChangefeedProgress( + cfs ...metadata.ChangefeedUUID, +) (progresses map[metadata.ChangefeedUUID]metadata.ChangefeedProgress, err error) { + var prDOs []*ProgressDO + var scDOs []*ScheduleDO + err = c.client.Txn(c.egCtx, func(tx T) error { + prDOs, err = c.client.queryProgresss(tx) + if err != nil { + return err + } + + scDOs, err = c.client.querySchedules(tx) + return err + }) + if err != nil { + return nil, errors.Trace(err) + } + + cfMap := make(map[metadata.ChangefeedUUID]struct{}) + for _, cf := range cfs { + cfMap[cf] = struct{}{} + } + queryAll := len(cfMap) == 0 + + progresses = make(map[metadata.ChangefeedUUID]metadata.ChangefeedProgress) + for _, prDO := range prDOs { + if prDO.Progress != nil { + for cf, pos := range *prDO.Progress { + if _, ok := cfMap[cf]; ok || queryAll { + progresses[cf] = pos + } + } + } + } + + if queryAll || len(progresses) < len(cfMap) { + for _, scDO := range scDOs { + if _, alreadyFound := progresses[scDO.ChangefeedUUID]; alreadyFound { + continue + } + if _, ok := cfMap[scDO.ChangefeedUUID]; ok || queryAll { + progresses[scDO.ChangefeedUUID] = scDO.TaskPosition + } + } + } + + return +} + func (c *CaptureOb[T]) getAllCaptures() []*model.CaptureInfo { infos, _ := c.GetCaptures() return infos @@ -246,9 +328,8 @@ func (c *CaptureOb[T]) getAllCaptures() []*model.CaptureInfo { // ControllerOb is an implement for metadata.ControllerObservation. type ControllerOb[T TxnContext] struct { - selfInfo *model.CaptureInfo - leaderChecker LeaderChecker[T] - client client[T] + selfInfo *model.CaptureInfo + client client[T] // TODO(CharlesCheung): handle ctx properly. // egCtx is the inner ctx of elector. @@ -267,17 +348,16 @@ type ControllerOb[T TxnContext] struct { } func newControllerObservation[T TxnContext]( - leaderChecker LeaderChecker[T], client client[T], + uuidGenerator uuidGenerator, selfInfo *model.CaptureInfo, getAllCaptures func() []*model.CaptureInfo, ) *ControllerOb[T] { return &ControllerOb[T]{ - leaderChecker: leaderChecker, client: client, selfInfo: selfInfo, getAllCaptures: getAllCaptures, - uuidGenerator: NewUUIDGenerator("random-crc64"), + uuidGenerator: uuidGenerator, } } @@ -298,7 +378,7 @@ func (c *ControllerOb[T]) run(ctx context.Context) error { case <-ticker.C: } - if err := c.handleAliveCaptures(ctx); err != nil { + if err := c.handleAliveCaptures(); err != nil { log.Warn("controller handle alive captures fail", zap.String("capture", c.selfInfo.ID), zap.Error(err)) return err } @@ -333,7 +413,7 @@ func (c *ControllerOb[T]) init() error { return c.onCaptureOffline(captureOfflined...) } -func (c *ControllerOb[T]) handleAliveCaptures(_ context.Context) error { +func (c *ControllerOb[T]) handleAliveCaptures() error { alives := c.getAllCaptures() hash := sortAndHashCaptureList(alives) @@ -378,6 +458,10 @@ func (c *ControllerOb[T]) upsertUpstream(tx T, up *model.UpstreamInfo) error { return nil } +func (c *ControllerOb[T]) txnWithLeaderLock(fn func(T) error) error { + return c.client.TxnWithLeaderLock(c.egCtx, c.selfInfo.ID, fn) +} + // CreateChangefeed initializes the changefeed info, schedule info and state info of the given changefeed. It also // updates or creates the upstream info depending on whether the upstream info exists. func (c *ControllerOb[T]) CreateChangefeed(cf *metadata.ChangefeedInfo, up *model.UpstreamInfo) (metadata.ChangefeedIdent, error) { @@ -386,9 +470,13 @@ func (c *ControllerOb[T]) CreateChangefeed(cf *metadata.ChangefeedInfo, up *mode cf.ChangefeedIdent, cf.UpstreamID, up.ID) return cf.ChangefeedIdent, errors.ErrMetaInvalidState.GenWithStackByArgs(errMsg) } - cf.ChangefeedIdent.UUID = c.uuidGenerator.GenChangefeedUUID() + uuid, err := c.uuidGenerator.GenChangefeedUUID(c.egCtx) + if err != nil { + return cf.ChangefeedIdent, errors.Trace(err) + } - err := c.leaderChecker.TxnWithLeaderLock(c.egCtx, c.selfInfo.ID, func(tx T) error { + cf.ChangefeedIdent.UUID = uuid + err = c.txnWithLeaderLock(func(tx T) error { if err := c.upsertUpstream(tx, up); err != nil { return errors.Trace(err) } @@ -431,7 +519,7 @@ func (c *ControllerOb[T]) CreateChangefeed(cf *metadata.ChangefeedInfo, up *mode // RemoveChangefeed removes the changefeed info func (c *ControllerOb[T]) RemoveChangefeed(cf metadata.ChangefeedUUID) error { - return c.leaderChecker.TxnWithLeaderLock(c.egCtx, c.selfInfo.ID, func(tx T) error { + return c.txnWithLeaderLock(func(tx T) error { oldInfo, err := c.client.queryChangefeedInfoByUUID(tx, cf) if err != nil { return errors.Trace(err) @@ -471,7 +559,7 @@ func (c *ControllerOb[T]) RemoveChangefeed(cf metadata.ChangefeedUUID) error { // CleanupChangefeed removes the changefeed info, schedule info and state info of the given changefeed. // Note that this function should only be called when the owner is removed and changefeed is marked as removed. func (c *ControllerOb[T]) CleanupChangefeed(cf metadata.ChangefeedUUID) error { - return c.leaderChecker.TxnWithLeaderLock(c.egCtx, c.selfInfo.ID, func(tx T) error { + return c.txnWithLeaderLock(func(tx T) error { err := c.client.deleteChangefeedInfo(tx, &ChangefeedInfoDO{ ChangefeedInfo: metadata.ChangefeedInfo{ ChangefeedIdent: metadata.ChangefeedIdent{ @@ -523,7 +611,7 @@ func (c *ControllerOb[T]) onCaptureOffline(ids ...model.CaptureID) error { // TODO(CharlesCheung): use multiple statements to reduce the number of round trips. // Note currently we only handle single capture offline, so it is not a big deal. for _, id := range ids { - err := c.leaderChecker.TxnWithLeaderLock(c.egCtx, c.selfInfo.ID, func(tx T) error { + err := c.txnWithLeaderLock(func(tx T) error { prs, err := c.client.queryProgressByCaptureIDsWithLock(tx, []model.CaptureID{id}) if err != nil { return errors.Trace(err) @@ -578,7 +666,7 @@ func (c *ControllerOb[T]) onCaptureOffline(ids ...model.CaptureID) error { // SetOwner Schedule a changefeed owner to a given target. func (c *ControllerOb[T]) SetOwner(target metadata.ScheduledChangefeed) error { - return c.leaderChecker.TxnWithLeaderLock(c.egCtx, c.selfInfo.ID, func(tx T) error { + return c.txnWithLeaderLock(func(tx T) error { old, err := c.client.queryScheduleByUUID(tx, target.ChangefeedUUID) if err != nil { return errors.Trace(err) @@ -633,13 +721,21 @@ func (c *ControllerOb[T]) ScheduleSnapshot() (ss []metadata.ScheduledChangefeed, type OwnerOb[T TxnContext] struct { egCtx context.Context client client[T] - cf *metadata.ChangefeedInfo + cfUUID metadata.ChangefeedUUID +} + +func newOwnerObservation[T TxnContext](c *CaptureOb[T], cf metadata.ChangefeedUUID) *OwnerOb[T] { + return &OwnerOb[T]{ + egCtx: c.egCtx, + client: c.client, + cfUUID: cf, + } } // Self returns the changefeed info of the owner. // nolint:unused -func (o *OwnerOb[T]) Self() *metadata.ChangefeedInfo { - return o.cf +func (o *OwnerOb[T]) Self() metadata.ChangefeedUUID { + return o.cfUUID } func (o *OwnerOb[T]) updateChangefeedState( @@ -647,15 +743,15 @@ func (o *OwnerOb[T]) updateChangefeedState( cfErr *model.RunningError, cfWarn *model.RunningError, ) error { - return o.client.TxnWithOwnerLock(o.egCtx, o.cf.UUID, func(tx T) error { - oldState, err := o.client.queryChangefeedStateByUUID(tx, o.cf.UUID) + return o.client.TxnWithOwnerLock(o.egCtx, o.cfUUID, func(tx T) error { + oldState, err := o.client.queryChangefeedStateByUUID(tx, o.cfUUID) if err != nil { return errors.Trace(err) } newState := &ChangefeedStateDO{ ChangefeedState: metadata.ChangefeedState{ - ChangefeedUUID: o.cf.UUID, + ChangefeedUUID: o.cfUUID, State: state, Error: oldState.Error, Warning: oldState.Warning, @@ -676,8 +772,8 @@ func (o *OwnerOb[T]) updateChangefeedState( // UpdateChangefeed updates changefeed metadata, must be called on a paused one. // nolint:unused func (o *OwnerOb[T]) UpdateChangefeed(info *metadata.ChangefeedInfo) error { - return o.client.TxnWithOwnerLock(o.egCtx, o.cf.UUID, func(tx T) error { - state, err := o.client.queryChangefeedStateByUUIDWithLock(tx, o.cf.UUID) + return o.client.TxnWithOwnerLock(o.egCtx, o.cfUUID, func(tx T) error { + state, err := o.client.queryChangefeedStateByUUIDWithLock(tx, o.cfUUID) if err != nil { return errors.Trace(err) } @@ -687,7 +783,7 @@ func (o *OwnerOb[T]) UpdateChangefeed(info *metadata.ChangefeedInfo) error { ) } - oldInfo, err := o.client.queryChangefeedInfoByUUID(tx, o.cf.UUID) + oldInfo, err := o.client.queryChangefeedInfoByUUID(tx, o.cfUUID) if err != nil { return errors.Trace(err) } diff --git a/cdcv2/metadata/sql/uuid.go b/cdcv2/metadata/sql/uuid.go index d66d4543024..702f1abaf9e 100644 --- a/cdcv2/metadata/sql/uuid.go +++ b/cdcv2/metadata/sql/uuid.go @@ -14,21 +14,27 @@ package sql import ( + "context" "hash" "hash/crc64" "hash/fnv" + "sync" "sync/atomic" + "time" "github.com/pingcap/tiflow/cdcv2/metadata" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) type uuidGenerator interface { - GenChangefeedUUID() metadata.ChangefeedUUID + GenChangefeedUUID(ctx context.Context) (metadata.ChangefeedUUID, error) } // NewUUIDGenerator creates a new UUID generator. -func NewUUIDGenerator(config string) uuidGenerator { +func NewUUIDGenerator(config string, db *gorm.DB) uuidGenerator { switch config { case "mock": return newMockUUIDGenerator() @@ -37,8 +43,9 @@ func NewUUIDGenerator(config string) uuidGenerator { return newRandomUUIDGenerator(hasher) case "random-fnv64": return newRandomUUIDGenerator(fnv.New64()) + default: + return newORMUUIDGenerator(taskCDCChangefeedUUID, db) } - return nil } type mockUUIDGenerator struct { @@ -50,8 +57,8 @@ func newMockUUIDGenerator() uuidGenerator { } // GenChangefeedUUID implements uuidGenerator interface. -func (g *mockUUIDGenerator) GenChangefeedUUID() metadata.ChangefeedUUID { - return g.epoch.Add(1) +func (g *mockUUIDGenerator) GenChangefeedUUID(ctx context.Context) (metadata.ChangefeedUUID, error) { + return g.epoch.Add(1), nil } type randomUUIDGenerator struct { @@ -67,11 +74,87 @@ func newRandomUUIDGenerator(hasher hash.Hash64) uuidGenerator { } // GenChangefeedUUID implements uuidGenerator interface. -func (g *randomUUIDGenerator) GenChangefeedUUID() metadata.ChangefeedUUID { - g.hasher = crc64.New(crc64.MakeTable(crc64.ISO)) +func (g *randomUUIDGenerator) GenChangefeedUUID(ctx context.Context) (metadata.ChangefeedUUID, error) { g.hasher.Reset() g.hasher.Write([]byte(g.uuidGen.NewString())) - return g.hasher.Sum64() + return g.hasher.Sum64(), nil } // TODO: implement sql based UUID generator. + +const ( + tableNameLogicEpoch = "logic_epoch" + taskCDCChangefeedUUID = "cdc-changefeed-uuid" +) + +type logicEpochDO struct { + TaskID string `gorm:"column:task_id;type:varchar(128); primaryKey" json:"task_id"` + Epoch uint64 `gorm:"column:epoch;type:bigint(20) unsigned;not null" json:"epoch"` + + CreatedAt time.Time `json:"created-at"` + UpdatedAt time.Time `json:"updated-at"` +} + +func (l *logicEpochDO) TableName() string { + return tableNameLogicEpoch +} + +type ormUUIDGenerator struct { + db *gorm.DB + taskID string + + once sync.Once +} + +func newORMUUIDGenerator(taskID string, db *gorm.DB) uuidGenerator { + return &ormUUIDGenerator{ + db: db, + taskID: taskID, + } +} + +func (g *ormUUIDGenerator) initlize(ctx context.Context) error { + if err := g.db.AutoMigrate(&logicEpochDO{}); err != nil { + return errors.WrapError(errors.ErrMetaOpFailed, err, "ormUUIDGeneratorInitlize") + } + // Do nothing on conflict + if err := g.db.WithContext(ctx).Clauses(clause.OnConflict{DoNothing: true}). + Create(&logicEpochDO{ + TaskID: g.taskID, + Epoch: 0, + }).Error; err != nil { + return errors.WrapError(errors.ErrMetaOpFailed, err, "ormUUIDGeneratorInitlize") + } + return nil +} + +func (g *ormUUIDGenerator) GenChangefeedUUID(ctx context.Context) (metadata.ChangefeedUUID, error) { + var err error + g.once.Do(func() { + err = g.initlize(ctx) + }) + if err != nil { + return 0, errors.Trace(err) + } + + uuidDO := &logicEpochDO{} + // every job owns its logic epoch + err = g.db.WithContext(ctx). + Where("task_id = ?", g.taskID). + Transaction(func(tx *gorm.DB) error { + if err := tx.Model(uuidDO). + Update("epoch", gorm.Expr("epoch + ?", 1)).Error; err != nil { + // return any error will rollback + return errors.WrapError(errors.ErrMetaOpFailed, err, "GenChangefeedUUID") + } + + if err := tx.Select("epoch").Limit(1).Find(uuidDO).Error; err != nil { + return errors.WrapError(errors.ErrMetaOpFailed, err, "GenChangefeedUUID") + } + return nil + }) + if err != nil { + return 0, errors.Trace(err) + } + return uuidDO.Epoch, nil +} diff --git a/cdcv2/metadata/sql/uuid_test.go b/cdcv2/metadata/sql/uuid_test.go new file mode 100644 index 00000000000..95507358fe6 --- /dev/null +++ b/cdcv2/metadata/sql/uuid_test.go @@ -0,0 +1,109 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "context" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestORMUUIDGeneratorInitFail(t *testing.T) { + t.Parallel() + + backendDB, db, mock := newMockDB(t) + defer backendDB.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // auto migrate + mock.ExpectQuery("SELECT SCHEMA_NAME from Information_schema.SCHEMATA " + + "where SCHEMA_NAME LIKE ? ORDER BY SCHEMA_NAME=? DESC limit 1").WillReturnRows( + sqlmock.NewRows([]string{"SCHEMA_NAME"})) + mock.ExpectExec("CREATE TABLE `logic_epoch` (`task_id` varchar(128),`epoch` bigint(20) unsigned NOT NULL," + + "`created_at` datetime(3) NULL,`updated_at` datetime(3) NULL,PRIMARY KEY (`task_id`))"). + WillReturnResult(sqlmock.NewResult(0, 0)) + + // insert first record fail + mock.ExpectExec("INSERT INTO `logic_epoch` (`task_id`,`epoch`,`created_at`,`updated_at`) VALUES " + + "(?,?,?,?) ON DUPLICATE KEY UPDATE `task_id`=`task_id`"). + WillReturnError(&mysql.MySQLError{Number: 1062, Message: "test error"}) + gen := newORMUUIDGenerator(taskCDCChangefeedUUID, db) + uuid, err := gen.GenChangefeedUUID(ctx) + require.ErrorIs(t, err, errors.ErrMetaOpFailed) + require.ErrorContains(t, err, "test error") + require.Equal(t, uint64(0), uuid) + + mock.ExpectClose() +} + +func TestORMUUIDGenerator(t *testing.T) { + t.Parallel() + + backendDB, db, mock := newMockDB(t) + defer backendDB.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + gen := newORMUUIDGenerator(taskCDCChangefeedUUID, db) + + // auto migrate + mock.ExpectQuery("SELECT SCHEMA_NAME from Information_schema.SCHEMATA " + + "where SCHEMA_NAME LIKE ? ORDER BY SCHEMA_NAME=? DESC limit 1").WillReturnRows( + sqlmock.NewRows([]string{"SCHEMA_NAME"})) + mock.ExpectExec("CREATE TABLE `logic_epoch` (`task_id` varchar(128),`epoch` bigint(20) unsigned NOT NULL," + + "`created_at` datetime(3) NULL,`updated_at` datetime(3) NULL,PRIMARY KEY (`task_id`))"). + WillReturnResult(sqlmock.NewResult(0, 0)) + + // insert first record + mock.ExpectExec("INSERT INTO `logic_epoch` (`task_id`,`epoch`,`created_at`,`updated_at`) VALUES " + + "(?,?,?,?) ON DUPLICATE KEY UPDATE `task_id`=`task_id`"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + // case 1: update success + mock.ExpectBegin() + mock.ExpectExec("UPDATE `logic_epoch` SET `epoch`=epoch + ?,`updated_at`=? WHERE task_id = ?"). + WithArgs(1, sqlmock.AnyArg(), taskCDCChangefeedUUID). + WillReturnResult(sqlmock.NewResult(1, 1)) + + // select + mock.ExpectQuery("SELECT `epoch` FROM `logic_epoch` WHERE task_id = ? LIMIT 1"). + WithArgs(taskCDCChangefeedUUID). + WillReturnRows(sqlmock.NewRows([]string{"epoch"}).AddRow(11)) + mock.ExpectCommit() + + uuid, err := gen.GenChangefeedUUID(ctx) + require.NoError(t, err) + require.Equal(t, uint64(11), uuid) + + // case 2: update fail + failedErr := errors.New("gen epoch error") + mock.ExpectBegin() + mock.ExpectExec("UPDATE `logic_epoch` SET `epoch`=epoch + ?,`updated_at`=? WHERE task_id = ?"). + WithArgs(1, sqlmock.AnyArg(), taskCDCChangefeedUUID). + WillReturnError(failedErr) + mock.ExpectRollback() + uuid, err = gen.GenChangefeedUUID(ctx) + require.ErrorContains(t, err, failedErr.Error()) + require.Equal(t, uint64(0), uuid) + + mock.ExpectClose() +} diff --git a/errors.toml b/errors.toml index 3bd625bde78..1563e43c05e 100755 --- a/errors.toml +++ b/errors.toml @@ -1336,6 +1336,11 @@ error = ''' meta operation fail ''' +["DFLOW:ErrMetaOpFailed"] +error = ''' +meta operation %s is failed +''' + ["DFLOW:ErrMetaOptionConflict"] error = ''' WithRange/WithPrefix/WithFromKey, more than one option are used diff --git a/go.mod b/go.mod index 7e9e2380390..ee2f2b3ac3f 100644 --- a/go.mod +++ b/go.mod @@ -276,7 +276,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/ncw/directio v1.0.5 // indirect - github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb + github.com/ngaut/log v0.0.0-20210830112240-0124ec040aeb // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 // indirect github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 4af9a91a897..22b60205b79 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -954,7 +954,7 @@ var ( ) ErrMetaOpFailed = errors.Normalize( "meta operation %s is failed", - errors.RFCCodeText("DFLOW:ErrMetaOpFail"), + errors.RFCCodeText("DFLOW:ErrMetaOpFailed"), ) ErrMetaInvalidState = errors.Normalize( "meta state is invalid: %s",