diff --git a/errors.toml b/errors.toml index efb9785e784..5b4d9b06d04 100755 --- a/errors.toml +++ b/errors.toml @@ -576,6 +576,11 @@ error = ''' received event regionID %v, requestID %v from %v, but neither pending region nor running region was found ''' +["CDC:ErrNotLeader"] +error = ''' +%s is not leader +''' + ["CDC:ErrNotOwner"] error = ''' this capture is not a owner diff --git a/pkg/election/elector.go b/pkg/election/elector.go index 2ce59bb20ad..c87b803e50c 100644 --- a/pkg/election/elector.go +++ b/pkg/election/elector.go @@ -141,12 +141,14 @@ func (e *electorImpl) renew(ctx context.Context) (err error) { ctx, cancel := context.WithTimeout(ctx, e.config.RenewDeadline) defer cancel() - return e.updateRecord(ctx, func(record *Record) error { + return e.updateRecord(ctx, func(record *Record) (error, bool) { var activeMembers []*Member + var isLeaderChanged bool for _, m := range record.Members { if e.isLeaseExpired(m.ID) { if m.ID == record.LeaderID { record.LeaderID = "" + isLeaderChanged = true log.Info( "leader lease expired", zap.String("leaderID", m.ID), @@ -183,11 +185,13 @@ func (e *electorImpl) renew(ctx context.Context) (err error) { if time.Now().Before(e.resignUntil) { if record.LeaderID == e.config.ID { record.LeaderID = "" + isLeaderChanged = true log.Info("try to resign leadership") } } else if record.LeaderID == "" { // Elect a new leader if no leader exists. record.LeaderID = e.config.ID + isLeaderChanged = true log.Info( "try to elect self as leader", zap.String("id", e.config.ID), @@ -195,7 +199,7 @@ func (e *electorImpl) renew(ctx context.Context) (err error) { zap.String("address", e.config.Address), ) } - return nil + return nil, isLeaderChanged }) } @@ -241,9 +245,11 @@ func (e *electorImpl) release(ctx context.Context, removeSelf bool) error { ctx, cancel := context.WithTimeout(ctx, defaultReleaseTimeout) defer cancel() - return e.updateRecord(ctx, func(record *Record) error { + return e.updateRecord(ctx, func(record *Record) (error, bool) { + var isLeaderChanged bool if record.LeaderID == e.config.ID { record.LeaderID = "" + isLeaderChanged = true } if removeSelf { for i, m := range record.Members { @@ -253,11 +259,14 @@ func (e *electorImpl) release(ctx context.Context, removeSelf bool) error { } } } - return nil + return nil, isLeaderChanged }) } -func (e *electorImpl) updateRecord(ctx context.Context, f func(*Record) error) error { +func (e *electorImpl) updateRecord( + ctx context.Context, + f func(*Record) (err error, isLeaderChanged bool), +) error { // Divide 2 is for more retries. backoffBaseDelayInMs := int64(e.config.RenewInterval/time.Millisecond) / 2 // Make sure the retry delay is less than the deadline, otherwise the retry has no chance to execute. @@ -277,11 +286,12 @@ func (e *electorImpl) updateRecord(ctx context.Context, f func(*Record) error) e } e.setObservedRecord(record) - if err := f(record); err != nil { + var isLeaderChanged bool + if err, isLeaderChanged = f(record); err != nil { return errors.Trace(err) } - if err := s.Update(ctx, record); err != nil { + if err := s.Update(ctx, record, isLeaderChanged); err != nil { return errors.Trace(err) } e.setObservedRecord(record) diff --git a/pkg/election/elector_test.go b/pkg/election/elector_test.go index edfee8b2208..ddca2a05565 100644 --- a/pkg/election/elector_test.go +++ b/pkg/election/elector_test.go @@ -47,8 +47,8 @@ func TestElectorBasic(t *testing.T) { return record.Clone(), nil }) - s.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes(). - DoAndReturn(func(ctx context.Context, r *election.Record) error { + s.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, r *election.Record, _ bool) error { recordLock.Lock() defer recordLock.Unlock() @@ -183,7 +183,7 @@ func TestElectorRenewFailure(t *testing.T) { return record.Clone(), nil } - updateRecord := func(_ context.Context, r *election.Record) error { + updateRecord := func(_ context.Context, r *election.Record, _ bool) error { recordLock.Lock() defer recordLock.Unlock() @@ -207,12 +207,12 @@ func TestElectorRenewFailure(t *testing.T) { } return getRecord(ctx) }) - s1.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes(). - DoAndReturn(func(ctx context.Context, r *election.Record) error { + s1.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, r *election.Record, isLeaderChanged bool) error { if err := s1Err.Load(); err != nil { return err } - if err := updateRecord(ctx, r); err != nil { + if err := updateRecord(ctx, r, isLeaderChanged); err != nil { return err } s1LastRenew = time.Now() @@ -221,7 +221,7 @@ func TestElectorRenewFailure(t *testing.T) { s2 := mock.NewMockStorage(gomock.NewController(t)) s2.EXPECT().Get(gomock.Any()).AnyTimes().DoAndReturn(getRecord) - s2.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(updateRecord) + s2.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(updateRecord) const ( leaseDuration = time.Second * 1 @@ -321,8 +321,8 @@ func TestLeaderCallbackUnexpectedExit(t *testing.T) { return record.Clone(), nil }) - s.EXPECT().Update(gomock.Any(), gomock.Any()).AnyTimes(). - DoAndReturn(func(ctx context.Context, r *election.Record) error { + s.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, r *election.Record, _ bool) error { recordLock.Lock() defer recordLock.Unlock() diff --git a/pkg/election/mock/storage_mock.go b/pkg/election/mock/storage_mock.go index 6c4bc20b7b1..3f0e4dce4f4 100644 --- a/pkg/election/mock/storage_mock.go +++ b/pkg/election/mock/storage_mock.go @@ -51,15 +51,15 @@ func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call { } // Update mocks base method. -func (m *MockStorage) Update(arg0 context.Context, arg1 *election.Record) error { +func (m *MockStorage) Update(arg0 context.Context, arg1 *election.Record, arg2 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", arg0, arg1) + ret := m.ctrl.Call(m, "Update", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // Update indicates an expected call of Update. -func (mr *MockStorageMockRecorder) Update(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockStorageMockRecorder) Update(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockStorage)(nil).Update), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockStorage)(nil).Update), arg0, arg1, arg2) } diff --git a/pkg/election/storage.go b/pkg/election/storage.go index 971c12f9672..0a98f671a90 100644 --- a/pkg/election/storage.go +++ b/pkg/election/storage.go @@ -27,7 +27,7 @@ type Storage interface { // Update updates the record in the storage if the stored record version matches the given // record version. It returns ErrRecordConflict if the stored record version does not match // or any other error encountered. - Update(ctx context.Context, record *Record) error + Update(ctx context.Context, record *Record, isLeaderChanged bool) error } // Record holds the information of a leader election. diff --git a/pkg/election/storage_orm.go b/pkg/election/storage_orm.go new file mode 100644 index 00000000000..52533ccf1dc --- /dev/null +++ b/pkg/election/storage_orm.go @@ -0,0 +1,209 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package election + +import ( + "context" + "database/sql" + "database/sql/driver" + "encoding/json" + + "github.com/pingcap/log" + ormUtil "github.com/pingcap/tiflow/engine/pkg/orm" + "github.com/pingcap/tiflow/pkg/errors" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +const ( + recordRowID = 1 + // leaderRowID is not used for node probing, it is only used to lock the leader row. + leaderRowID = 2 +) + +// Value implements the driver.Valuer interface +func (r Record) Value() (driver.Value, error) { + return json.Marshal(r) +} + +// Scan implements the sql.Scanner interface +func (r *Record) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + + return json.Unmarshal(b, r) +} + +// TableNameElection is the table name of election. +var TableNameElection = "election" + +// DO mapped from table +type DO struct { + ID uint32 `gorm:"column:id;type:int(10) unsigned;primaryKey" json:"id"` + LeaderID string `gorm:"column:leader_id;type:text;not null" json:"leader_id"` + Record *Record `gorm:"column:record;type:text" json:"record"` + Version uint64 `gorm:"column:version;type:bigint(20) unsigned;not null" json:"version"` +} + +// TableName Election's table name +func (*DO) TableName() string { + return TableNameElection +} + +// ORMStorage is a storage implementation based on SQL database. +type ORMStorage struct { + db *gorm.DB + tableName string +} + +// NewORMStorageFromSQLDB creates a new ORMStorage from sql.DB. +func NewORMStorageFromSQLDB(backendDB *sql.DB, tableName string) (*ORMStorage, error) { + db, err := ormUtil.NewGormDB(backendDB, "mysql") + if err != nil { + return nil, err + } + return NewORMStorage(db, tableName) +} + +// NewORMStorage creates a new ORMStorage. +func NewORMStorage(db *gorm.DB, tableName string) (*ORMStorage, error) { + TableNameElection = tableName + if err := db.AutoMigrate(&DO{}); err != nil { + return nil, errors.Trace(err) + } + + return &ORMStorage{ + db: db, + tableName: tableName, + }, nil +} + +// Get implements Storage.Get. +func (s *ORMStorage) Get(ctx context.Context) (*Record, error) { + var do DO + + ret := s.db.WithContext(ctx).Where("id = ?", recordRowID).Limit(1).Find(&do) + if ret.Error != nil { + if ret.Error == gorm.ErrRecordNotFound { + return &Record{}, nil + } + return nil, errors.Trace(ret.Error) + } + if ret.RowsAffected == 0 { + return &Record{}, nil + } + + do.Record.Version = int64(do.Version) + return do.Record, nil +} + +// Update implements Storage.Update. +func (s *ORMStorage) Update(ctx context.Context, record *Record, isLeaderChanged bool) error { + if record.Version == 0 { + if !isLeaderChanged { + log.Panic("invalid operation") + } + return s.create(ctx, record) + } + return s.update(ctx, record, isLeaderChanged) +} + +func (s *ORMStorage) update(ctx context.Context, record *Record, isLeaderChanged bool) error { + handleRet := func(ret *gorm.DB) error { + if ret.Error != nil { + return errors.Trace(ret.Error) + } + if ret.RowsAffected != 1 { + return errors.ErrElectionRecordConflict.GenWithStackByArgs() + } + return nil + } + return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + // TODO: find more efficient way + ret := tx.Where("id = ? AND version = ?", recordRowID, record.Version). + Updates(DO{ + LeaderID: record.LeaderID, + Record: record, + Version: uint64(record.Version) + 1, + }) + if err := handleRet(ret); err != nil { + return errors.Trace(err) + } + + if isLeaderChanged { + ret := tx.Where("id = ?", leaderRowID). + Updates(DO{ + LeaderID: record.LeaderID, + Record: nil, + Version: uint64(record.Version) + 1, + }) + return handleRet(ret) + } + return nil + }) +} + +func (s *ORMStorage) create(ctx context.Context, record *Record) error { + rows := []*DO{ + { + ID: recordRowID, + LeaderID: record.LeaderID, + Record: record, + Version: uint64(record.Version) + 1, + }, + { + ID: leaderRowID, + LeaderID: record.LeaderID, + Record: nil, /* record is not saved in leader row */ + Version: uint64(record.Version) + 1, /* equals to recordRow */ + }, + } + return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + ret := tx.WithContext(ctx).Create(rows) + if ret.Error != nil { + return errors.Trace(ret.Error) + } + + if ret.RowsAffected == 0 { + return errors.ErrElectionRecordConflict.GenWithStackByArgs() + } else if ret.RowsAffected != int64(len(rows)) { + log.Panic("Transaction atomicity is broken when updating election record") + } + return nil + }) +} + +// TxnWithLeaderLock execute a transaction with leader row locked. +func (s *ORMStorage) TxnWithLeaderLock(ctx context.Context, leaderID string, fc func(tx *gorm.DB) error) error { + return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var do DO + ret := tx.Select("leader_id").Where("id = ? and leader_id = ?", leaderRowID, leaderID). + Clauses(clause.Locking{ + Strength: "SHARE", + Table: clause.Table{Name: clause.CurrentTable}, + }).Limit(1).Find(&do) + if ret.Error != nil { + if ret.Error == gorm.ErrRecordNotFound { + return errors.ErrElectorNotLeader.GenWithStackByArgs(leaderID) + } + return errors.Trace(ret.Error) + } + if ret.RowsAffected != 1 { + return errors.ErrElectorNotLeader.GenWithStackByArgs(leaderID) + } + return fc(tx) + }) +} diff --git a/pkg/election/storage_orm_test.go b/pkg/election/storage_orm_test.go new file mode 100644 index 00000000000..e8010d5b7c7 --- /dev/null +++ b/pkg/election/storage_orm_test.go @@ -0,0 +1,216 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package election + +import ( + "context" + "encoding/json" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + ormUtil "github.com/pingcap/tiflow/engine/pkg/orm" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +func newORMStorageAndMock(t *testing.T) (*ORMStorage, sqlmock.Sqlmock) { + backendDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + + mock.ExpectQuery("SELECT VERSION()"). + WillReturnRows(sqlmock.NewRows([]string{"VERSION()"}).AddRow("5.7.35-log")) + db, err := ormUtil.NewGormDB(backendDB, "mysql") + require.NoError(t, err) + + mock.ExpectQuery("SELECT SCHEMA_NAME from Information_schema.SCHEMATA " + + "where SCHEMA_NAME LIKE ? ORDER BY SCHEMA_NAME=? DESC limit 1").WillReturnRows( + sqlmock.NewRows([]string{"SCHEMA_NAME"})) + mock.ExpectExec("CREATE TABLE `test` (`id` int(10) unsigned,`leader_id` text NOT NULL," + + "`record` text,`version` bigint(20) unsigned NOT NULL,PRIMARY KEY (`id`))"). + WillReturnResult(sqlmock.NewResult(0, 0)) + + s, err := NewORMStorage(db, "test") + require.NoError(t, err) + + return s, mock +} + +func TestORMStorageGetEmptyRecord(t *testing.T) { + s, mock := newORMStorageAndMock(t) + + mock.ExpectQuery("SELECT * FROM `test` WHERE id = ? LIMIT 1"). + WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id", "leader_id", "record", "version"})) + record, err := s.Get(context.Background()) + require.NoError(t, err) + require.Equal(t, &Record{}, record) +} + +func TestORMStorageGetExistingRecord(t *testing.T) { + s, mock := newORMStorageAndMock(t) + + expectedRecord := &Record{ + LeaderID: "id1", + Members: []*Member{ + { + ID: "id1", + Name: "name1", + }, + { + ID: "id2", + Name: "name2", + }, + }, + Version: 1, + } + recordBytes, err := json.Marshal(expectedRecord) + require.NoError(t, err) + + mock.ExpectQuery("SELECT * FROM `test` WHERE id = ? LIMIT 1"). + WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id", "leader_id", "record", "version"}). + AddRow(1, "id1", recordBytes, 1)) + record, err := s.Get(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedRecord, record) +} + +func TestORMStorageInsertRecord(t *testing.T) { + s, mock := newORMStorageAndMock(t) + + record := &Record{ + LeaderID: "id1", + Members: []*Member{ + { + ID: "id1", + Name: "name1", + }, + }, + Version: 0, // 0 means record not created before. + } + recordBytes, err := json.Marshal(record) + require.NoError(t, err) + + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO `test` (`leader_id`,`record`,`version`,`id`) VALUES (?,?,?,?),(?,?,?,?)"). + WithArgs("id1", recordBytes, 1, recordRowID, "id1", nil, 1, leaderRowID). + WillReturnResult(sqlmock.NewResult(1, 2)) + mock.ExpectCommit() + + err = s.Update(context.Background(), record, true) + require.NoError(t, err) +} + +func TestORMStorageUpdateMember(t *testing.T) { + leaderNotChanged := false + s, mock := newORMStorageAndMock(t) + + record := &Record{ + LeaderID: "id1", + Members: []*Member{ + { + ID: "id1", + Name: "name1", + }, + }, + Version: 1, + } + recordBytes, err := json.Marshal(record) + require.NoError(t, err) + + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test` SET `leader_id`=?,`record`=?,`version`=? WHERE id = ? AND version = ?"). + WithArgs("id1", recordBytes, 2, recordRowID, 1).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectRollback() + err = s.Update(context.Background(), record, leaderNotChanged) + require.ErrorIs(t, err, errors.ErrElectionRecordConflict) + + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test` SET `leader_id`=?,`record`=?,`version`=? WHERE id = ? AND version = ?"). + WithArgs("id1", recordBytes, 2, recordRowID, 1).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + err = s.Update(context.Background(), record, leaderNotChanged) + require.NoError(t, err) +} + +func TestORMStorageUpdateLeader(t *testing.T) { + leaderChanged := true + s, mock := newORMStorageAndMock(t) + + record := &Record{ + LeaderID: "id1", + Members: []*Member{ + { + ID: "id1", + Name: "name1", + }, + }, + Version: 1, + } + recordBytes, err := json.Marshal(record) + require.NoError(t, err) + + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test` SET `leader_id`=?,`record`=?,`version`=? WHERE id = ? AND version = ?"). + WithArgs("id1", recordBytes, 2, recordRowID, 1).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectRollback() + err = s.Update(context.Background(), record, leaderChanged) + require.ErrorIs(t, err, errors.ErrElectionRecordConflict) + + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test` SET `leader_id`=?,`record`=?,`version`=? WHERE id = ? AND version = ?"). + WithArgs("id1", recordBytes, 2, recordRowID, 1).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("UPDATE `test` SET `leader_id`=?,`version`=? WHERE id = ?"). + WithArgs("id1", 2, leaderRowID).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectRollback() + err = s.Update(context.Background(), record, leaderChanged) + require.ErrorIs(t, err, errors.ErrElectionRecordConflict) + + mock.ExpectBegin() + mock.ExpectExec("UPDATE `test` SET `leader_id`=?,`record`=?,`version`=? WHERE id = ? AND version = ?"). + WithArgs("id1", recordBytes, 2, recordRowID, 1).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("UPDATE `test` SET `leader_id`=?,`version`=? WHERE id = ?"). + WithArgs("id1", 2, leaderRowID).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + err = s.Update(context.Background(), record, leaderChanged) + require.NoError(t, err) +} + +func TestORMStorageTxnWithLeaderCheck(t *testing.T) { + s, mock := newORMStorageAndMock(t) + + mock.ExpectBegin() + mock.ExpectQuery("SELECT `leader_id` FROM `test` WHERE id = ? and leader_id = ? LIMIT 1 LOCK IN SHARE MODE"). + WithArgs(leaderRowID, "leader1").WillReturnRows(sqlmock.NewRows([]string{"leader_id"})) + mock.ExpectRollback() + doNothing := func(*gorm.DB) error { + return nil + } + err := s.TxnWithLeaderLock(context.Background(), "leader1", doNothing) + require.ErrorIs(t, err, errors.ErrElectorNotLeader) + + mock.ExpectBegin() + mock.ExpectQuery("SELECT `leader_id` FROM `test` WHERE id = ? and leader_id = ? LIMIT 1 LOCK IN SHARE MODE"). + WithArgs(leaderRowID, "leader1"). + WillReturnRows(sqlmock.NewRows([]string{"leader_id"}).AddRow("leader1")) + mock.ExpectQuery("SELECT * FROM `test` WHERE id = ? LIMIT 1"). + WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id", "leader_id", "record", "version"})) + mock.ExpectCommit() + doTxn := func(tx *gorm.DB) error { + _, err := s.Get(context.Background()) + require.NoError(t, err) + return nil + } + err = s.TxnWithLeaderLock(context.Background(), "leader1", doTxn) + require.NoError(t, err) +} diff --git a/pkg/election/storage_sql.go b/pkg/election/storage_sql.go index af610482f38..a4e7e66b336 100644 --- a/pkg/election/storage_sql.go +++ b/pkg/election/storage_sql.go @@ -23,6 +23,8 @@ import ( "github.com/pingcap/tiflow/pkg/errors" ) +var _ Storage = &SQLStorage{} + const ( sqlRecordID = 1 sqlCreateTable = "CREATE TABLE IF NOT EXISTS %s (id int NOT NULL, version bigint NOT NULL, record text NOT NULL, PRIMARY KEY (id))" @@ -83,7 +85,7 @@ func (s *SQLStorage) Get(ctx context.Context) (*Record, error) { } // Update implements Storage.Update. -func (s *SQLStorage) Update(ctx context.Context, record *Record) error { +func (s *SQLStorage) Update(ctx context.Context, record *Record, _ bool) error { recordBytes, err := json.Marshal(&record) if err != nil { return errors.Trace(err) diff --git a/pkg/election/storage_sql_test.go b/pkg/election/storage_sql_test.go index 9cc2a7d2e13..6b4ee9d8cdf 100644 --- a/pkg/election/storage_sql_test.go +++ b/pkg/election/storage_sql_test.go @@ -100,7 +100,7 @@ func TestSQLStorageInsertRecord(t *testing.T) { mock.ExpectExec(regexp.QuoteMeta("INSERT INTO leader_election (id, version, record) VALUES (?, ?, ?)")). WithArgs(1, int64(1), recordBytes).WillReturnResult(sqlmock.NewResult(1, 1)) - err = s.Update(context.Background(), record) + err = s.Update(context.Background(), record, true) require.NoError(t, err) } @@ -124,12 +124,12 @@ func TestSQLStorageUpdateRecord(t *testing.T) { mock.ExpectExec(regexp.QuoteMeta("UPDATE leader_election SET version = ?, record = ? WHERE id = ? AND version = ?")). WithArgs(int64(2), recordBytes, 1, int64(1)).WillReturnResult(sqlmock.NewResult(0, 0)) - err = s.Update(context.Background(), record) + err = s.Update(context.Background(), record, true) require.True(t, errors.Is(err, errors.ErrElectionRecordConflict)) mock.ExpectExec(regexp.QuoteMeta("UPDATE leader_election SET version = ?, record = ? WHERE id = ? AND version = ?")). WithArgs(int64(2), recordBytes, 1, int64(1)).WillReturnResult(sqlmock.NewResult(0, 1)) - err = s.Update(context.Background(), record) + err = s.Update(context.Background(), record, true) require.NoError(t, err) } @@ -153,7 +153,7 @@ func TestInMemorySQLStorage(t *testing.T) { }, Version: 0, // 0 means record not created before. } - err = s.Update(ctx, record) + err = s.Update(ctx, record, true) require.NoError(t, err) recordRead, err := s.Get(ctx) diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 5ffbada8093..baec9558c9d 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -952,4 +952,9 @@ var ( "invalid glue schema registry config, %s", errors.RFCCodeText("CDC:ErrInvalidGlueSchemaRegistryConfig"), ) + + ErrElectorNotLeader = errors.Normalize( + "%s is not leader", + errors.RFCCodeText("CDC:ErrNotLeader"), + ) )