Skip to content

Commit

Permalink
dr: fix store learner check
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Oct 18, 2023
1 parent a85f29c commit 59453af
Showing 1 changed file with 46 additions and 26 deletions.
72 changes: 46 additions & 26 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ func (m *ModeManager) tickUpdateState() {
if r.Role != placement.Learner {
totalVoter += r.Count
}
minimalUpPrimary := minimalUpVoters(r, stores[primaryUp], stores[primaryDown])
minimalUpDr := minimalUpVoters(r, stores[drUp], stores[drDown])
minimalUpPrimary := minimalUpVoters(r, stores[primaryUpVoter], stores[primaryDownVoter])
minimalUpDr := minimalUpVoters(r, stores[drUpVoter], stores[drDownVoter])
primaryHasVoter = primaryHasVoter || minimalUpPrimary > 0
drHasVoter = drHasVoter || minimalUpDr > 0
upVoters := minimalUpPrimary + minimalUpDr
Expand All @@ -440,10 +440,11 @@ func (m *ModeManager) tickUpdateState() {
hasMajority := totalUpVoter*2 > totalVoter

log.Debug("replication store status",
zap.Uint64s("up-primary", storeIDs(stores[primaryUp])),
zap.Uint64s("up-dr", storeIDs(stores[drUp])),
zap.Uint64s("down-primary", storeIDs(stores[primaryDown])),
zap.Uint64s("down-dr", storeIDs(stores[drDown])),
zap.Uint64s("up-primary-voter", storeIDs(stores[primaryUpVoter])),
zap.Uint64s("up-dr-voter", storeIDs(stores[drUpVoter])),
zap.Uint64s("down-primary-voter", storeIDs(stores[primaryDownVoter])),
zap.Uint64s("down-dr-voter", storeIDs(stores[drDownVoter])),
zap.Uint64s("up-primary-learner", storeIDs(stores[primaryUpLearner])),
zap.Bool("can-sync", canSync),
zap.Bool("has-majority", hasMajority),
)
Expand All @@ -465,36 +466,36 @@ func (m *ModeManager) tickUpdateState() {
+------------+ +------------+
*/

primaryUpStores := storeIDs(append(stores[primaryUpVoter], stores[primaryUpLearner]...))
switch m.drGetState() {
case drStateSync:
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority {
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
m.drSwitchToAsyncWait(primaryUpStores)
}
case drStateAsyncWait:
if canSync {
m.drSwitchToSync()
break
}
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) {
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, primaryUpStores) {
m.drSwitchToAsyncWait(primaryUpStores)
break
}
if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
if m.drCheckStoreStateUpdated(primaryUpStores) {
m.drSwitchToAsync(primaryUpStores)
}
case drStateAsync:
if canSync {
m.drSwitchToSyncRecover()
break
}
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
if !reflect.DeepEqual(m.drGetAvailableStores(), primaryUpStores) && m.drCheckStoreStateUpdated(primaryUpStores) {
m.drSwitchToAsync(primaryUpStores)
}
case drStateSyncRecover:
if !canSync && hasMajority {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
m.drSwitchToAsync(primaryUpStores)
} else {
m.updateProgress()
progress := m.estimateProgress()
Expand Down Expand Up @@ -562,10 +563,11 @@ func (m *ModeManager) tickReplicateStatus() {
}

const (
primaryUp = iota
primaryDown
drUp
drDown
primaryUpVoter = iota
primaryDownVoter
drUpVoter
drDownVoter
primaryUpLearner
storeStatusTypeCount
)

Expand All @@ -577,24 +579,28 @@ func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
if s.IsRemoved() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
if s.GetRegionCount() == s.GetLearnerCount() && m.checkLearnerStore(s) {
if labelValue == m.config.DRAutoSync.Primary && !down {
stores[primaryUpLearner] = append(stores[primaryUpLearner], s)
}
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)

if labelValue == m.config.DRAutoSync.Primary {
if down {
stores[primaryDown] = append(stores[primaryDown], s)
stores[primaryDownVoter] = append(stores[primaryDownVoter], s)
} else {
stores[primaryUp] = append(stores[primaryUp], s)
stores[primaryUpVoter] = append(stores[primaryUpVoter], s)
}
}
if labelValue == m.config.DRAutoSync.DR {
if down {
stores[drDown] = append(stores[drDown], s)
stores[drDownVoter] = append(stores[drDownVoter], s)
} else {
stores[drUp] = append(stores[drUp], s)
stores[drUpVoter] = append(stores[drUpVoter], s)
}
}
}
Expand All @@ -604,6 +610,20 @@ func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
return stores
}

// checkLearnerStore will check whether all peers of the store should be learner by placement rules.
func (m *ModeManager) checkLearnerStore(store *core.StoreInfo) bool {
for _, r := range m.cluster.GetRuleManager().GetAllRules() {
if len(r.StartKey) > 0 || len(r.EndKey) > 0 {
// All rules should be global rules. If not, skip it.
continue
}
if r.Role == placement.Learner && placement.MatchLabelConstraints(store, r.LabelConstraints) {
return true
}
}
return false
}

// UpdateStoreDRStatus saves the dr-autosync status of a store.
func (m *ModeManager) UpdateStoreDRStatus(id uint64, status *pb.StoreDRAutoSyncStatus) {
m.drStoreStatus.Store(id, status)
Expand Down

0 comments on commit 59453af

Please sign in to comment.