From 0035391778b1a4d95de062475e7783f34513984b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 5 Dec 2024 16:00:53 +0800 Subject: [PATCH 1/2] dr-autosync: add recover timeout (#6295) (#8776) close tikv/pd#4939 Signed-off-by: husharp Co-authored-by: disksing --- pkg/replication/replication_mode.go | 15 +++++- pkg/replication/replication_mode_test.go | 60 ++++++++++++++++-------- server/config/config.go | 15 +++--- 3 files changed, 62 insertions(+), 28 deletions(-) diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 30b34e4596a..9093f911901 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -212,6 +212,7 @@ const ( type drAutoSyncStatus struct { State string `json:"state,omitempty"` StateID uint64 `json:"state_id,omitempty"` + AsyncStartTime *time.Time `json:"async_start,omitempty"` RecoverStartTime *time.Time `json:"recover_start,omitempty"` TotalRegions int `json:"total_regions,omitempty"` SyncedRegions int `json:"synced_regions,omitempty"` @@ -262,7 +263,8 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } - dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores} + now := time.Now() + dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores, AsyncStartTime: &now} if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -272,6 +274,15 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { return nil } +func (m *ModeManager) drDurationSinceAsyncStart() time.Duration { + m.RLock() + defer m.RUnlock() + if m.drAutoSync.AsyncStartTime == nil { + return 0 + } + return time.Since(*m.drAutoSync.AsyncStartTime) +} + func (m *ModeManager) drSwitchToSyncRecover() error { m.Lock() defer m.Unlock() @@ -477,7 +488,7 @@ func (m *ModeManager) tickUpdateState() { m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: - if canSync { + if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration { m.drSwitchToSyncRecover() break } diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index e01fb7a0b9a..5cf9f1a1450 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -16,6 +16,7 @@ package replication import ( "context" + "encoding/json" "errors" "fmt" "testing" @@ -159,6 +160,20 @@ func newMockReplicator(ids []uint64) *mockFileReplicator { } } +func assertLastData(t *testing.T, data string, state string, stateID uint64, availableStores []uint64) { + type status struct { + State string `json:"state"` + StateID uint64 `json:"state_id"` + AvailableStores []uint64 `json:"available_stores"` + } + var s status + err := json.Unmarshal([]byte(data), &s) + require.NoError(t, err) + require.Equal(t, state, s.State) + require.Equal(t, stateID, s.StateID) + require.Equal(t, availableStores, s.AvailableStores) +} + func TestStateSwitch(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -190,7 +205,7 @@ func TestStateSwitch(t *testing.T) { stateID := rep.drAutoSync.StateID re.NotEqual(uint64(0), stateID) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "sync", stateID, nil) assertStateIDUpdate := func() { re.NotEqual(stateID, rep.drAutoSync.StateID) stateID = rep.drAutoSync.StateID @@ -207,7 +222,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) conf.DRAutoSync.PauseRegionSplit = true @@ -218,7 +233,7 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // add new store in dr zone. cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) @@ -268,18 +283,19 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() + rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) setStoreState(cluster, "down", "up", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{2, 3, 4}) setStoreState(cluster, "up", "down", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 3, 4}) // async_wait -> async rep.tickUpdateState() @@ -291,26 +307,32 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) // async -> async setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() // store 2 won't be available before it syncs status. rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) syncStoreStatus(1, 2, 3, 4) rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // async -> sync_recover setStoreState(cluster, "up", "up", "up", "up", "up", "up") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() + rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour) + rep.tickUpdateState() + re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout + + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(0) setStoreState(cluster, "down", "up", "up", "up", "up", "up") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) @@ -387,27 +409,27 @@ func TestReplicateState(t *testing.T) { stateID := rep.drAutoSync.StateID // replicate after initialized rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "sync", stateID, nil) // repliate state to new member replicator.memberIDs = append(replicator.memberIDs, 2, 3) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3]) + assertLastData(t, replicator.lastData[2], "sync", stateID, nil) + assertLastData(t, replicator.lastData[3], "sync", stateID, nil) // inject error replicator.errors[2] = errors.New("failed to persist") rep.tickUpdateState() // switch async_wait since there is only one zone newStateID := rep.drAutoSync.StateID rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1]) - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3]) + assertLastData(t, replicator.lastData[1], "async_wait", newStateID, []uint64{1, 2}) + assertLastData(t, replicator.lastData[2], "sync", stateID, nil) + assertLastData(t, replicator.lastData[3], "async_wait", newStateID, []uint64{1, 2}) // clear error, replicate to node 2 next time delete(replicator.errors, 2) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2]) + assertLastData(t, replicator.lastData[2], "async_wait", newStateID, []uint64{1, 2}) } func TestAsynctimeout(t *testing.T) { @@ -637,7 +659,7 @@ func TestComplexPlacementRules(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4, 5, 6}) // reset to sync setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") @@ -698,7 +720,7 @@ func TestComplexPlacementRules2(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4}) } func TestComplexPlacementRules3(t *testing.T) { @@ -737,7 +759,7 @@ func TestComplexPlacementRules3(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4}) } func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo { diff --git a/server/config/config.go b/server/config/config.go index e835567ac5b..c6e147c8f25 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -826,13 +826,14 @@ func NormalizeReplicationMode(m string) string { // DRAutoSyncReplicationConfig is the configuration for auto sync mode between 2 data centers. type DRAutoSyncReplicationConfig struct { - LabelKey string `toml:"label-key" json:"label-key"` - Primary string `toml:"primary" json:"primary"` - DR string `toml:"dr" json:"dr"` - PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` - DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` - WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` - PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` + LabelKey string `toml:"label-key" json:"label-key"` + Primary string `toml:"primary" json:"primary"` + DR string `toml:"dr" json:"dr"` + PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` + DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` + WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` + WaitRecoverTimeout typeutil.Duration `toml:"wait-recover-timeout" json:"wait-recover-timeout"` + PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` } func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { From 81a0619dcf482031f178b38783c19b7669686bea Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 6 Dec 2024 14:03:17 +0800 Subject: [PATCH 2/2] *: add test for syncer (#8873) (#8875) ref tikv/pd#8823 Signed-off-by: Ryan Leung --- pkg/mock/mockserver/mockserver.go | 88 ++++++++++++++++++++++++++++ pkg/syncer/client.go | 20 ++++++- pkg/syncer/client_test.go | 69 +++++----------------- tests/server/cluster/cluster_test.go | 44 ++++++++++++++ 4 files changed, 165 insertions(+), 56 deletions(-) create mode 100644 pkg/mock/mockserver/mockserver.go diff --git a/pkg/mock/mockserver/mockserver.go b/pkg/mock/mockserver/mockserver.go new file mode 100644 index 00000000000..d79d79ffa03 --- /dev/null +++ b/pkg/mock/mockserver/mockserver.go @@ -0,0 +1,88 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockserver + +import ( + "context" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/grpcutil" +) + +// MockServer is used to mock Server for test use. +type MockServer struct { + ctx context.Context + member, leader *pdpb.Member + storage storage.Storage + bc *core.BasicCluster +} + +// NewMockServer creates a new MockServer. +func NewMockServer(ctx context.Context, member, leader *pdpb.Member, storage storage.Storage, bc *core.BasicCluster) *MockServer { + return &MockServer{ + ctx: ctx, + member: member, + leader: leader, + storage: storage, + bc: bc, + } +} + +// LoopContext returns the context of the server. +func (s *MockServer) LoopContext() context.Context { + return s.ctx +} + +// ClusterID returns the cluster ID of the server. +func (*MockServer) ClusterID() uint64 { + return 1 +} + +// GetMemberInfo returns the member info of the server. +func (s *MockServer) GetMemberInfo() *pdpb.Member { + return s.member +} + +// GetLeader returns the leader of the server. +func (s *MockServer) GetLeader() *pdpb.Member { + return s.leader +} + +// GetStorage returns the storage of the server. +func (s *MockServer) GetStorage() storage.Storage { + return s.storage +} + +// Name returns the name of the server. +func (*MockServer) Name() string { + return "mock-server" +} + +// GetRegions returns the regions of the server. +func (s *MockServer) GetRegions() []*core.RegionInfo { + return s.bc.GetRegions() +} + +// GetTLSConfig returns the TLS config of the server. +func (*MockServer) GetTLSConfig() *grpcutil.TLSConfig { + return &grpcutil.TLSConfig{} +} + +// GetBasicCluster returns the basic cluster of the server. +func (s *MockServer) GetBasicCluster() *core.BasicCluster { + return s.bc +} diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index af21a514c83..68ce35899fc 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -90,6 +90,8 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { go func() { defer logutil.LogPanic() defer s.wg.Done() + timer := time.NewTimer(retryInterval) + defer timer.Stop() // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() regionStorage := s.server.GetStorage() @@ -140,11 +142,18 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } } log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) + if !timer.Stop() { + select { + case <-timer.C: // try to drain from the channel + default: + } + } + timer.Reset(retryInterval) select { case <-ctx.Done(): log.Info("stop synchronizing with leader due to context canceled") return - case <-time.After(retryInterval): + case <-timer.C: } continue } @@ -157,11 +166,18 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { if err = stream.CloseSend(); err != nil { log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) } + if !timer.Stop() { + select { + case <-timer.C: // try to drain from the channel + default: + } + } + timer.Reset(retryInterval) select { case <-ctx.Done(): log.Info("stop synchronizing with leader due to context canceled") return - case <-time.After(retryInterval): + case <-timer.C: } break } diff --git a/pkg/syncer/client_test.go b/pkg/syncer/client_test.go index 34c2b383ab3..8301c1c7567 100644 --- a/pkg/syncer/client_test.go +++ b/pkg/syncer/client_test.go @@ -21,9 +21,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockserver" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/grpcutil" "google.golang.org/grpc/codes" @@ -37,11 +37,13 @@ func TestLoadRegion(t *testing.T) { rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) - server := &mockServer{ - ctx: context.Background(), - storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), - bc: core.NewBasicCluster(), - } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) for i := 0; i < 30; i++ { rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1}) } @@ -64,11 +66,13 @@ func TestErrorCode(t *testing.T) { tempDir := t.TempDir() rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) - server := &mockServer{ - ctx: context.Background(), - storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), - bc: core.NewBasicCluster(), - } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) ctx, cancel := context.WithCancel(context.TODO()) rc := NewRegionSyncer(server) conn, err := grpcutil.GetClientConn(ctx, "http://127.0.0.1", nil) @@ -79,46 +83,3 @@ func TestErrorCode(t *testing.T) { re.True(ok) re.Equal(codes.Canceled, ev.Code()) } - -type mockServer struct { - ctx context.Context - member, leader *pdpb.Member - storage storage.Storage - bc *core.BasicCluster -} - -func (s *mockServer) LoopContext() context.Context { - return s.ctx -} - -func (s *mockServer) ClusterID() uint64 { - return 1 -} - -func (s *mockServer) GetMemberInfo() *pdpb.Member { - return s.member -} - -func (s *mockServer) GetLeader() *pdpb.Member { - return s.leader -} - -func (s *mockServer) GetStorage() storage.Storage { - return s.storage -} - -func (s *mockServer) Name() string { - return "mock-server" -} - -func (s *mockServer) GetRegions() []*core.RegionInfo { - return s.bc.GetRegions() -} - -func (s *mockServer) GetTLSConfig() *grpcutil.TLSConfig { - return &grpcutil.TLSConfig{} -} - -func (s *mockServer) GetBasicCluster() *core.BasicCluster { - return s.bc -} diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index fc6991e4179..3b70e837ac5 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/dashboard" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/mock/mockid" + "github.com/tikv/pd/pkg/mock/mockserver" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/schedulers" @@ -42,6 +43,7 @@ import ( "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -1812,3 +1814,45 @@ func TestExternalTimestamp(t *testing.T) { re.Equal(ts, resp4.GetTimestamp()) } } + +func TestFollowerExitSyncTime(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 1) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) + + tempDir := t.TempDir() + rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) + re.NoError(err) + + server := mockserver.NewMockServer( + context.Background(), + &pdpb.Member{MemberId: 1, Name: "test", ClientUrls: []string{tempurl.Alloc()}}, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) + s := syncer.NewRegionSyncer(server) + s.StartSyncWithLeader(leaderServer.GetAddr()) + time.Sleep(time.Second) + + // Record the time when exiting sync + startTime := time.Now() + + // Simulate leader change scenario + // Directly call StopSyncWithLeader to simulate exit + s.StopSyncWithLeader() + + // Calculate time difference + elapsedTime := time.Since(startTime) + + // Assert that the sync exit time is within expected range + re.Less(elapsedTime, time.Second) +}