diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 22edd400e42a..a06e8e400fc5 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -443,7 +443,7 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr td.connectionCtxs.Delete(streamURL) streamCancelFunc() if errs.IsServiceModeChange(err) { - svcDiscovery.ScheduleServiceModeChanged() + svcDiscovery.ScheduleCheckServiceModeChanged() return false } // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 56c71c95850c..22df2ddfc6a4 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -641,3 +641,68 @@ func TestTSOServiceSwitch(t *testing.T) { <-ch re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) } + +func TestTSOServiceWithOldClient(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", `return(true)`)) + var wg sync.WaitGroup + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestAPICluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + leaderName := cluster.WaitLeader() + re.NotEmpty(leaderName) + pdLeader := cluster.GetServer(leaderName) + backendEndpoints := pdLeader.GetAddr() + re.NoError(pdLeader.BootstrapCluster()) + pdClient, err := pd.NewClientWithContext(ctx, []string{backendEndpoints}, pd.SecurityOption{}) + re.NoError(err) + defer pdClient.Close() + ch := make(chan struct{}) + ch1 := make(chan struct{}) + wg.Add(1) + go func(ctx context.Context, wg *sync.WaitGroup, ch, ch1 chan struct{}) { + defer wg.Done() + var lastPhysical, lastLogical int64 + for { + select { + case <-ctx.Done(): + return + default: + } + physical, logical, err := pdClient.GetTS(context.Background()) + if err == nil { + re.GreaterOrEqual(physical, lastPhysical) + if physical == lastPhysical { + re.Greater(logical, lastLogical) + } + lastPhysical = physical + lastLogical = logical + select { + case <-ch1: + ch <- struct{}{} + default: + } + } else { + t.Log(err) + } + } + }(ctx, &wg, ch, ch1) + ch1 <- struct{}{} + <-ch + tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints) + re.NoError(err) + tsoCluster.WaitForDefaultPrimaryServing(re) + ch1 <- struct{}{} + <-ch + tsoCluster.Destroy() + re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode")) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) +}