Skip to content

Commit

Permalink
metadata(cdc): implement sql UUID generator and refactor client (#9864)
Browse files Browse the repository at this point in the history
close #9871
  • Loading branch information
CharlesCheung96 authored Oct 14, 2023
1 parent 19f6bce commit 2bc82bd
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 68 deletions.
13 changes: 10 additions & 3 deletions cdcv2/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
// Querier is used to query information from metadata storage.
type Querier interface {
// GetChangefeeds queries some or all changefeeds.
GetChangefeeds(...ChangefeedUUID) ([]*ChangefeedInfo, error)
GetChangefeed(...ChangefeedUUID) ([]*ChangefeedInfo, error)
// GetChangefeedState queries some or all changefeed states.
GetChangefeedState(...ChangefeedUUID) ([]*ChangefeedState, error)
// GetChangefeedProgress queries some or all changefeed progresses.
GetChangefeedProgress(...ChangefeedUUID) (map[ChangefeedUUID]ChangefeedProgress, error)
}

// -------------------- About owner schedule -------------------- //
Expand Down Expand Up @@ -78,7 +82,10 @@ type CaptureObservation interface {
// OwnerChanges fetch owner modifications.
OwnerChanges() <-chan ScheduledChangefeed

// PostOwnerRemoved when an owner exits, inform the metadata storage.
// OnOwnerLaunched create an owner observation for a changefeed owner.
OnOwnerLaunched(cf ChangefeedUUID) OwnerObservation

// PostOwnerRemoved inform the metadata storage when an owner exits.
PostOwnerRemoved(cf ChangefeedUUID, taskPosition ChangefeedProgress) error
}

Expand Down Expand Up @@ -116,7 +123,7 @@ type ControllerObservation interface {
// All intrefaces are thread-safe and shares one same Context.
type OwnerObservation interface {
// Self returns the changefeed info of the owner.
Self() *ChangefeedInfo
Self() ChangefeedUUID

// UpdateChangefeed updates changefeed metadata, must be called on a paused one.
UpdateChangefeed(*ChangefeedInfo) error
Expand Down
10 changes: 5 additions & 5 deletions cdcv2/metadata/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,16 @@ func (s SchedState) toString() string {
}

// nolint
func (s SchedState) fromString(str string) error {
func (s *SchedState) fromString(str string) error {
switch str {
case "Launched":
s = SchedLaunched
*s = SchedLaunched
case "Removing":
s = SchedRemoving
*s = SchedRemoving
case "Removed":
s = SchedRemoved
*s = SchedRemoved
default:
s = SchedInvalid
*s = SchedInvalid
return errors.New("unreachable")
}
return nil
Expand Down
95 changes: 87 additions & 8 deletions cdcv2/metadata/sql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,65 @@ import (
"gorm.io/gorm"
)

const defaultMaxExecTime = 5 * time.Second

type client[T TxnContext] interface {
LeaderChecker[T]
checker[T]
}

type clientImpl[T TxnContext] struct {
LeaderChecker[T]
checker[T]

options *clientOptions
}

func newClient[T TxnContext](
leaderChecker LeaderChecker[T],
checker checker[T],
opts ...ClientOptionFunc,
) client[T] {
options := setClientOptions(opts...)

return &clientImpl[T]{
LeaderChecker: leaderChecker,
checker: checker,
options: options,
}
}

func (c *clientImpl[T]) withTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
if c.options.maxExecTime <= 0 {
return ctx, nil
}
return context.WithTimeout(ctx, c.options.maxExecTime)
}

func (c *clientImpl[T]) Txn(ctx context.Context, fn TxnAction[T]) error {
ctx, cancel := c.withTimeout(ctx)
if cancel != nil {
defer cancel()
}
return c.checker.Txn(ctx, fn)
}

func (c *clientImpl[T]) TxnWithOwnerLock(ctx context.Context, uuid metadata.ChangefeedUUID, fn TxnAction[T]) error {
ctx, cancel := c.withTimeout(ctx)
if cancel != nil {
defer cancel()
}
return c.checker.TxnWithOwnerLock(ctx, uuid, fn)
}

func (c *clientImpl[T]) TxnWithLeaderLock(ctx context.Context, leaderID string, fn func(T) error) error {
ctx, cancel := c.withTimeout(ctx)
if cancel != nil {
defer cancel()
}
return c.LeaderChecker.TxnWithLeaderLock(ctx, leaderID, fn)
}

// TxnContext is a type set that can be used as the transaction context.
type TxnContext interface {
*gorm.DB | *sql.Tx
Expand Down Expand Up @@ -55,6 +114,12 @@ type LeaderChecker[T TxnContext] interface {
type checker[T TxnContext] interface {
Txn(ctx context.Context, fn TxnAction[T]) error
TxnWithOwnerLock(ctx context.Context, uuid metadata.ChangefeedUUID, fn TxnAction[T]) error

upstreamClient[T]
changefeedInfoClient[T]
changefeedStateClient[T]
scheduleClient[T]
progressClient[T]
}

// TODO(CharlesCheung): only update changed fields to reduce the pressure on io and database.
Expand Down Expand Up @@ -85,6 +150,7 @@ type changefeedStateClient[T TxnContext] interface {
queryChangefeedStates(tx T) ([]*ChangefeedStateDO, error)
queryChangefeedStatesByUpdateAt(tx T, lastUpdateAt time.Time) ([]*ChangefeedStateDO, error)
queryChangefeedStateByUUID(tx T, uuid uint64) (*ChangefeedStateDO, error)
queryChangefeedStateByUUIDs(tx T, uuid ...uint64) ([]*ChangefeedStateDO, error)
queryChangefeedStateByUUIDWithLock(tx T, uuid uint64) (*ChangefeedStateDO, error)
}

Expand All @@ -111,16 +177,29 @@ type progressClient[T TxnContext] interface {
queryProgressByCaptureIDsWithLock(tx T, ids []string) ([]*ProgressDO, error)
}

type client[T TxnContext] interface {
checker[T]
upstreamClient[T]
changefeedInfoClient[T]
changefeedStateClient[T]
scheduleClient[T]
progressClient[T]
type clientOptions struct {
maxExecTime time.Duration
}

const defaultMaxExecTime = 5 * time.Second
func setClientOptions(opts ...ClientOptionFunc) *clientOptions {
o := &clientOptions{
maxExecTime: defaultMaxExecTime,
}
for _, opt := range opts {
opt(o)
}
return o
}

// ClientOptionFunc is the option function for the client.
type ClientOptionFunc func(*clientOptions)

// WithMaxExecTime sets the maximum execution time of the client.
func WithMaxExecTime(d time.Duration) ClientOptionFunc {
return func(o *clientOptions) {
o.maxExecTime = d
}
}

// TODO(CharlesCheung): implement a cache layer to reduce the pressure on io and database.
// nolint:unused
Expand Down
28 changes: 19 additions & 9 deletions cdcv2/metadata/sql/client_orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (
)

var (
_ client[*gorm.DB] = &ormClient{}
_ checker[*gorm.DB] = &ormClient{}

_ checker[*gorm.DB] = &ormClient{}
_ upstreamClient[*gorm.DB] = &ormClient{}
_ changefeedInfoClient[*gorm.DB] = &ormClient{}
_ changefeedStateClient[*gorm.DB] = &ormClient{}
Expand All @@ -56,16 +55,14 @@ func (c *ormClient) Txn(ctx context.Context, fn ormTxnAction) error {
}

// TxnWithOwnerLock executes the given transaction action in a transaction with owner lock.
func (c *ormClient) TxnWithOwnerLock(ctx context.Context, uuid uint64, fn ormTxnAction) error {
func (c *ormClient) TxnWithOwnerLock(ctx context.Context, uuid metadata.ChangefeedUUID, fn ormTxnAction) error {
return c.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var pr ProgressDO
ret := tx.Select("owner").
// TODO(charledCheung): use a variable to replace the hard-coded owner state.
Where("changefeed_uuid = ? and owner = ? and owner_state != removed", uuid, c.selfID).
sc := &ScheduleDO{}
ret := tx.Where("changefeed_uuid = ? and owner = ? and owner_state != ?", uuid, c.selfID, metadata.SchedRemoved).
Clauses(clause.Locking{
Strength: "SHARE",
Table: clause.Table{Name: clause.CurrentTable},
}).Limit(1).Find(&pr)
}).Limit(1).Find(&sc)
if err := handleSingleOpErr(ret, 1, "TxnWithOwnerLock"); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -229,7 +226,8 @@ func (c *ormClient) queryChangefeedInfosByUUIDs(tx *gorm.DB, uuids ...uint64) ([
infos := []*ChangefeedInfoDO{}
ret := tx.Where("uuid in (?)", uuids).Find(&infos)
if err := handleSingleOpErr(ret, int64(len(uuids)), "QueryChangefeedInfosByUUIDs"); err != nil {
return nil, errors.Trace(err)
// TODO: optimize the behavior when some uuids are not found.
return infos, errors.Trace(err)
}
return infos, nil
}
Expand Down Expand Up @@ -326,6 +324,18 @@ func (c *ormClient) queryChangefeedStateByUUID(tx *gorm.DB, uuid uint64) (*Chang
return state, nil
}

// queryChangefeedStateByUUIDs implements the changefeedStateClient interface.
// nolint:unused
func (c *ormClient) queryChangefeedStateByUUIDs(tx *gorm.DB, uuids ...uint64) ([]*ChangefeedStateDO, error) {
states := []*ChangefeedStateDO{}
ret := tx.Where("changefeed_uuid in (?)", uuids).Find(&states)
if err := handleSingleOpErr(ret, int64(len(uuids)), "QueryChangefeedInfosByUUIDs"); err != nil {
// TODO: optimize the behavior when some uuids are not found.
return states, errors.Trace(err)
}
return states, nil
}

// queryChangefeedStateByUUIDWithLock implements the changefeedStateClient interface.
// nolint:unused
func (c *ormClient) queryChangefeedStateByUUIDWithLock(tx *gorm.DB, uuid uint64) (*ChangefeedStateDO, error) {
Expand Down
Loading

0 comments on commit 2bc82bd

Please sign in to comment.