Skip to content

Commit

Permalink
pkg/election(ticdc): implement gorm based election (#9752)
Browse files Browse the repository at this point in the history
close #9768
  • Loading branch information
CharlesCheung96 authored Sep 22, 2023
1 parent ef7a972 commit 7ad8876
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 26 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,11 @@ error = '''
received event regionID %v, requestID %v from %v, but neither pending region nor running region was found
'''

["CDC:ErrNotLeader"]
error = '''
%s is not leader
'''

["CDC:ErrNotOwner"]
error = '''
this capture is not a owner
Expand Down
24 changes: 17 additions & 7 deletions pkg/election/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,14 @@ func (e *electorImpl) renew(ctx context.Context) (err error) {
ctx, cancel := context.WithTimeout(ctx, e.config.RenewDeadline)
defer cancel()

return e.updateRecord(ctx, func(record *Record) error {
return e.updateRecord(ctx, func(record *Record) (error, bool) {
var activeMembers []*Member
var isLeaderChanged bool
for _, m := range record.Members {
if e.isLeaseExpired(m.ID) {
if m.ID == record.LeaderID {
record.LeaderID = ""
isLeaderChanged = true
log.Info(
"leader lease expired",
zap.String("leaderID", m.ID),
Expand Down Expand Up @@ -183,19 +185,21 @@ func (e *electorImpl) renew(ctx context.Context) (err error) {
if time.Now().Before(e.resignUntil) {
if record.LeaderID == e.config.ID {
record.LeaderID = ""
isLeaderChanged = true
log.Info("try to resign leadership")
}
} else if record.LeaderID == "" {
// Elect a new leader if no leader exists.
record.LeaderID = e.config.ID
isLeaderChanged = true
log.Info(
"try to elect self as leader",
zap.String("id", e.config.ID),
zap.String("name", e.config.Name),
zap.String("address", e.config.Address),
)
}
return nil
return nil, isLeaderChanged
})
}

Expand Down Expand Up @@ -241,9 +245,11 @@ func (e *electorImpl) release(ctx context.Context, removeSelf bool) error {
ctx, cancel := context.WithTimeout(ctx, defaultReleaseTimeout)
defer cancel()

return e.updateRecord(ctx, func(record *Record) error {
return e.updateRecord(ctx, func(record *Record) (error, bool) {
var isLeaderChanged bool
if record.LeaderID == e.config.ID {
record.LeaderID = ""
isLeaderChanged = true
}
if removeSelf {
for i, m := range record.Members {
Expand All @@ -253,11 +259,14 @@ func (e *electorImpl) release(ctx context.Context, removeSelf bool) error {
}
}
}
return nil
return nil, isLeaderChanged
})
}

func (e *electorImpl) updateRecord(ctx context.Context, f func(*Record) error) error {
func (e *electorImpl) updateRecord(
ctx context.Context,
f func(*Record) (err error, isLeaderChanged bool),
) error {
// Divide 2 is for more retries.
backoffBaseDelayInMs := int64(e.config.RenewInterval/time.Millisecond) / 2
// Make sure the retry delay is less than the deadline, otherwise the retry has no chance to execute.
Expand All @@ -277,11 +286,12 @@ func (e *electorImpl) updateRecord(ctx context.Context, f func(*Record) error) e
}
e.setObservedRecord(record)

if err := f(record); err != nil {
var isLeaderChanged bool
if err, isLeaderChanged = f(record); err != nil {
return errors.Trace(err)
}

if err := s.Update(ctx, record); err != nil {
if err := s.Update(ctx, record, isLeaderChanged); err != nil {
return errors.Trace(err)
}
e.setObservedRecord(record)
Expand Down
18 changes: 9 additions & 9 deletions pkg/election/elector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestElectorBasic(t *testing.T) {
return record.Clone(), nil
})

s.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, r *election.Record) error {
s.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, r *election.Record, _ bool) error {
recordLock.Lock()
defer recordLock.Unlock()

Expand Down Expand Up @@ -183,7 +183,7 @@ func TestElectorRenewFailure(t *testing.T) {
return record.Clone(), nil
}

updateRecord := func(_ context.Context, r *election.Record) error {
updateRecord := func(_ context.Context, r *election.Record, _ bool) error {
recordLock.Lock()
defer recordLock.Unlock()

Expand All @@ -207,12 +207,12 @@ func TestElectorRenewFailure(t *testing.T) {
}
return getRecord(ctx)
})
s1.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, r *election.Record) error {
s1.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, r *election.Record, isLeaderChanged bool) error {
if err := s1Err.Load(); err != nil {
return err
}
if err := updateRecord(ctx, r); err != nil {
if err := updateRecord(ctx, r, isLeaderChanged); err != nil {
return err
}
s1LastRenew = time.Now()
Expand All @@ -221,7 +221,7 @@ func TestElectorRenewFailure(t *testing.T) {

s2 := mock.NewMockStorage(gomock.NewController(t))
s2.EXPECT().Get(gomock.Any()).AnyTimes().DoAndReturn(getRecord)
s2.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(updateRecord)
s2.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(updateRecord)

const (
leaseDuration = time.Second * 1
Expand Down Expand Up @@ -321,8 +321,8 @@ func TestLeaderCallbackUnexpectedExit(t *testing.T) {
return record.Clone(), nil
})

s.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, r *election.Record) error {
s.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, r *election.Record, _ bool) error {
recordLock.Lock()
defer recordLock.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions pkg/election/mock/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/election/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Storage interface {
// Update updates the record in the storage if the stored record version matches the given
// record version. It returns ErrRecordConflict if the stored record version does not match
// or any other error encountered.
Update(ctx context.Context, record *Record) error
Update(ctx context.Context, record *Record, isLeaderChanged bool) error
}

// Record holds the information of a leader election.
Expand Down
209 changes: 209 additions & 0 deletions pkg/election/storage_orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package election

import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"

"github.com/pingcap/log"
ormUtil "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

const (
recordRowID = 1
// leaderRowID is not used for node probing, it is only used to lock the leader row.
leaderRowID = 2
)

// Value implements the driver.Valuer interface
func (r Record) Value() (driver.Value, error) {
return json.Marshal(r)
}

// Scan implements the sql.Scanner interface
func (r *Record) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return errors.New("type assertion to []byte failed")
}

return json.Unmarshal(b, r)
}

// TableNameElection is the table name of election.
var TableNameElection = "election"

// DO mapped from table <election>
type DO struct {
ID uint32 `gorm:"column:id;type:int(10) unsigned;primaryKey" json:"id"`
LeaderID string `gorm:"column:leader_id;type:text;not null" json:"leader_id"`
Record *Record `gorm:"column:record;type:text" json:"record"`
Version uint64 `gorm:"column:version;type:bigint(20) unsigned;not null" json:"version"`
}

// TableName Election's table name
func (*DO) TableName() string {
return TableNameElection
}

// ORMStorage is a storage implementation based on SQL database.
type ORMStorage struct {
db *gorm.DB
tableName string
}

// NewORMStorageFromSQLDB creates a new ORMStorage from sql.DB.
func NewORMStorageFromSQLDB(backendDB *sql.DB, tableName string) (*ORMStorage, error) {
db, err := ormUtil.NewGormDB(backendDB, "mysql")
if err != nil {
return nil, err
}
return NewORMStorage(db, tableName)
}

// NewORMStorage creates a new ORMStorage.
func NewORMStorage(db *gorm.DB, tableName string) (*ORMStorage, error) {
TableNameElection = tableName
if err := db.AutoMigrate(&DO{}); err != nil {
return nil, errors.Trace(err)
}

return &ORMStorage{
db: db,
tableName: tableName,
}, nil
}

// Get implements Storage.Get.
func (s *ORMStorage) Get(ctx context.Context) (*Record, error) {
var do DO

ret := s.db.WithContext(ctx).Where("id = ?", recordRowID).Limit(1).Find(&do)
if ret.Error != nil {
if ret.Error == gorm.ErrRecordNotFound {
return &Record{}, nil
}
return nil, errors.Trace(ret.Error)
}
if ret.RowsAffected == 0 {
return &Record{}, nil
}

do.Record.Version = int64(do.Version)
return do.Record, nil
}

// Update implements Storage.Update.
func (s *ORMStorage) Update(ctx context.Context, record *Record, isLeaderChanged bool) error {
if record.Version == 0 {
if !isLeaderChanged {
log.Panic("invalid operation")
}
return s.create(ctx, record)
}
return s.update(ctx, record, isLeaderChanged)
}

func (s *ORMStorage) update(ctx context.Context, record *Record, isLeaderChanged bool) error {
handleRet := func(ret *gorm.DB) error {
if ret.Error != nil {
return errors.Trace(ret.Error)
}
if ret.RowsAffected != 1 {
return errors.ErrElectionRecordConflict.GenWithStackByArgs()
}
return nil
}
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// TODO: find more efficient way
ret := tx.Where("id = ? AND version = ?", recordRowID, record.Version).
Updates(DO{
LeaderID: record.LeaderID,
Record: record,
Version: uint64(record.Version) + 1,
})
if err := handleRet(ret); err != nil {
return errors.Trace(err)
}

if isLeaderChanged {
ret := tx.Where("id = ?", leaderRowID).
Updates(DO{
LeaderID: record.LeaderID,
Record: nil,
Version: uint64(record.Version) + 1,
})
return handleRet(ret)
}
return nil
})
}

func (s *ORMStorage) create(ctx context.Context, record *Record) error {
rows := []*DO{
{
ID: recordRowID,
LeaderID: record.LeaderID,
Record: record,
Version: uint64(record.Version) + 1,
},
{
ID: leaderRowID,
LeaderID: record.LeaderID,
Record: nil, /* record is not saved in leader row */
Version: uint64(record.Version) + 1, /* equals to recordRow */
},
}
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
ret := tx.WithContext(ctx).Create(rows)
if ret.Error != nil {
return errors.Trace(ret.Error)
}

if ret.RowsAffected == 0 {
return errors.ErrElectionRecordConflict.GenWithStackByArgs()
} else if ret.RowsAffected != int64(len(rows)) {
log.Panic("Transaction atomicity is broken when updating election record")
}
return nil
})
}

// TxnWithLeaderLock execute a transaction with leader row locked.
func (s *ORMStorage) TxnWithLeaderLock(ctx context.Context, leaderID string, fc func(tx *gorm.DB) error) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var do DO
ret := tx.Select("leader_id").Where("id = ? and leader_id = ?", leaderRowID, leaderID).
Clauses(clause.Locking{
Strength: "SHARE",
Table: clause.Table{Name: clause.CurrentTable},
}).Limit(1).Find(&do)
if ret.Error != nil {
if ret.Error == gorm.ErrRecordNotFound {
return errors.ErrElectorNotLeader.GenWithStackByArgs(leaderID)
}
return errors.Trace(ret.Error)
}
if ret.RowsAffected != 1 {
return errors.ErrElectorNotLeader.GenWithStackByArgs(leaderID)
}
return fc(tx)
})
}
Loading

0 comments on commit 7ad8876

Please sign in to comment.