Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dr-autosync: add recover timeout (#6295) #8776

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ const (
type drAutoSyncStatus struct {
State string `json:"state,omitempty"`
StateID uint64 `json:"state_id,omitempty"`
AsyncStartTime *time.Time `json:"async_start,omitempty"`
RecoverStartTime *time.Time `json:"recover_start,omitempty"`
TotalRegions int `json:"total_regions,omitempty"`
SyncedRegions int `json:"synced_regions,omitempty"`
Expand Down Expand Up @@ -262,7 +263,8 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
}
dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores}
now := time.Now()
dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores, AsyncStartTime: &now}
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -272,6 +274,15 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
return nil
}

func (m *ModeManager) drDurationSinceAsyncStart() time.Duration {
m.RLock()
defer m.RUnlock()
if m.drAutoSync.AsyncStartTime == nil {
return 0
}
return time.Since(*m.drAutoSync.AsyncStartTime)
}

func (m *ModeManager) drSwitchToSyncRecover() error {
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -477,7 +488,7 @@ func (m *ModeManager) tickUpdateState() {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateAsync:
if canSync {
if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration {
m.drSwitchToSyncRecover()
break
}
Expand Down
60 changes: 41 additions & 19 deletions pkg/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package replication

import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
Expand Down Expand Up @@ -159,6 +160,20 @@ func newMockReplicator(ids []uint64) *mockFileReplicator {
}
}

func assertLastData(t *testing.T, data string, state string, stateID uint64, availableStores []uint64) {
type status struct {
State string `json:"state"`
StateID uint64 `json:"state_id"`
AvailableStores []uint64 `json:"available_stores"`
}
var s status
err := json.Unmarshal([]byte(data), &s)
require.NoError(t, err)
require.Equal(t, state, s.State)
require.Equal(t, stateID, s.StateID)
require.Equal(t, availableStores, s.AvailableStores)
}

func TestStateSwitch(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -190,7 +205,7 @@ func TestStateSwitch(t *testing.T) {
stateID := rep.drAutoSync.StateID
re.NotEqual(uint64(0), stateID)
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "sync", stateID, nil)
assertStateIDUpdate := func() {
re.NotEqual(stateID, rep.drAutoSync.StateID)
stateID = rep.drAutoSync.StateID
Expand All @@ -207,7 +222,7 @@ func TestStateSwitch(t *testing.T) {
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4})

re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit())
conf.DRAutoSync.PauseRegionSplit = true
Expand All @@ -218,7 +233,7 @@ func TestStateSwitch(t *testing.T) {
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4})

// add new store in dr zone.
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})
Expand Down Expand Up @@ -268,18 +283,19 @@ func TestStateSwitch(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()

rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4})
setStoreState(cluster, "down", "up", "up", "up", "down", "down")
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{2, 3, 4})
setStoreState(cluster, "up", "down", "up", "up", "down", "down")
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 3, 4})

// async_wait -> async
rep.tickUpdateState()
Expand All @@ -291,26 +307,32 @@ func TestStateSwitch(t *testing.T) {
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4})

// async -> async
setStoreState(cluster, "up", "up", "up", "up", "down", "down")
rep.tickUpdateState()
// store 2 won't be available before it syncs status.
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4})
syncStoreStatus(1, 2, 3, 4)
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4})

// async -> sync_recover
setStoreState(cluster, "up", "up", "up", "up", "up", "up")
rep.tickUpdateState()
re.Equal(drStateSyncRecover, rep.drGetState())
assertStateIDUpdate()

rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5})
rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour)
rep.tickUpdateState()
re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout

rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(0)
setStoreState(cluster, "down", "up", "up", "up", "up", "up")
rep.tickUpdateState()
re.Equal(drStateSyncRecover, rep.drGetState())
Expand Down Expand Up @@ -387,27 +409,27 @@ func TestReplicateState(t *testing.T) {
stateID := rep.drAutoSync.StateID
// replicate after initialized
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "sync", stateID, nil)

// repliate state to new member
replicator.memberIDs = append(replicator.memberIDs, 2, 3)
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2])
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3])
assertLastData(t, replicator.lastData[2], "sync", stateID, nil)
assertLastData(t, replicator.lastData[3], "sync", stateID, nil)

// inject error
replicator.errors[2] = errors.New("failed to persist")
rep.tickUpdateState() // switch async_wait since there is only one zone
newStateID := rep.drAutoSync.StateID
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1])
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2])
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3])
assertLastData(t, replicator.lastData[1], "async_wait", newStateID, []uint64{1, 2})
assertLastData(t, replicator.lastData[2], "sync", stateID, nil)
assertLastData(t, replicator.lastData[3], "async_wait", newStateID, []uint64{1, 2})

// clear error, replicate to node 2 next time
delete(replicator.errors, 2)
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2])
assertLastData(t, replicator.lastData[2], "async_wait", newStateID, []uint64{1, 2})
}

func TestAsynctimeout(t *testing.T) {
Expand Down Expand Up @@ -637,7 +659,7 @@ func TestComplexPlacementRules(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4, 5, 6})

// reset to sync
setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up")
Expand Down Expand Up @@ -698,7 +720,7 @@ func TestComplexPlacementRules2(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4})
}

func TestComplexPlacementRules3(t *testing.T) {
Expand Down Expand Up @@ -737,7 +759,7 @@ func TestComplexPlacementRules3(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4})
}

func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo {
Expand Down
15 changes: 8 additions & 7 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,13 +826,14 @@ func NormalizeReplicationMode(m string) string {

// DRAutoSyncReplicationConfig is the configuration for auto sync mode between 2 data centers.
type DRAutoSyncReplicationConfig struct {
LabelKey string `toml:"label-key" json:"label-key"`
Primary string `toml:"primary" json:"primary"`
DR string `toml:"dr" json:"dr"`
PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"`
DRReplicas int `toml:"dr-replicas" json:"dr-replicas"`
WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"`
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
LabelKey string `toml:"label-key" json:"label-key"`
Primary string `toml:"primary" json:"primary"`
DR string `toml:"dr" json:"dr"`
PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"`
DRReplicas int `toml:"dr-replicas" json:"dr-replicas"`
WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"`
WaitRecoverTimeout typeutil.Duration `toml:"wait-recover-timeout" json:"wait-recover-timeout"`
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
}

func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
Expand Down
Loading