Skip to content

Commit

Permalink
replication mode: fix wrong available store list
Browse files Browse the repository at this point in the history
fix #7221

Signed-off-by: disksing <[email protected]>
  • Loading branch information
disksing committed Oct 18, 2023
1 parent a85f29c commit 7f4140c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
6 changes: 1 addition & 5 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (m *ModeManager) tickUpdateState() {
m.drSwitchToSyncRecover()
break
}
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs(stores[primaryUp])) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
}
case drStateSyncRecover:
Expand Down Expand Up @@ -577,10 +577,6 @@ func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
if s.IsRemoved() {
continue
}
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
Expand Down
43 changes: 43 additions & 0 deletions pkg/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ func TestComplexPlacementRules(t *testing.T) {
setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "down", "up", "down")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1])

// reset to sync
setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up")
Expand Down Expand Up @@ -695,6 +697,47 @@ func TestComplexPlacementRules2(t *testing.T) {
setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
}

func TestComplexPlacementRules3(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := storage.NewStorageWithMemoryBackend()
conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{
LabelKey: "zone",
Primary: "zone1",
DR: "zone2",
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(ctx, config.NewTestOptions())

Check failure on line 715 in pkg/replication/replication_mode_test.go

View workflow job for this annotation

GitHub Actions / statics

undefined: config.NewTestOptions (typecheck)

Check failure on line 715 in pkg/replication/replication_mode_test.go

View workflow job for this annotation

GitHub Actions / chunks (7)

undefined: config.NewTestOptions
replicator := newMockReplicator([]uint64{1})
rep, err := NewReplicationModeManager(conf, store, cluster, replicator)
re.NoError(err)
cluster.GetRuleManager().SetAllGroupBundles(
genPlacementRuleConfig([]ruleConfig{
{key: "logic", value: "logic1", role: placement.Voter, count: 2},
{key: "logic", value: "logic2", role: placement.Learner, count: 1},
{key: "logic", value: "logic3", role: placement.Voter, count: 1},
}), true)

cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"})
cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"})
cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"})
cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"})
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"})

// initial state is sync
re.Equal(drStateSync, rep.drGetState())

// zone2 down, switch state, available stores should contain logic2 (learner)
setStoreState(cluster, "up", "up", "up", "up", "down")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
}

func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo {
Expand Down

0 comments on commit 7f4140c

Please sign in to comment.