Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 29, 2024
1 parent 0d4b723 commit e3f29e6
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (td *tsoDispatcher) handleProcessRequestError(ctx context.Context, bo *retr
td.connectionCtxs.Delete(streamURL)
streamCancelFunc()
if errs.IsServiceModeChange(err) {
svcDiscovery.ScheduleServiceModeChanged()
svcDiscovery.ScheduleCheckServiceModeChanged()
return false
}
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
Expand Down
65 changes: 65 additions & 0 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,68 @@ func TestTSOServiceSwitch(t *testing.T) {
<-ch
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
}

func TestTSOServiceWithOldClient(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastUpdateServiceMode", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", `return(true)`))
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)

leaderName := cluster.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := cluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()
re.NoError(pdLeader.BootstrapCluster())
pdClient, err := pd.NewClientWithContext(ctx, []string{backendEndpoints}, pd.SecurityOption{})
re.NoError(err)
defer pdClient.Close()
ch := make(chan struct{})
ch1 := make(chan struct{})
wg.Add(1)
go func(ctx context.Context, wg *sync.WaitGroup, ch, ch1 chan struct{}) {
defer wg.Done()
var lastPhysical, lastLogical int64
for {
select {
case <-ctx.Done():
return
default:
}
physical, logical, err := pdClient.GetTS(context.Background())
if err == nil {
re.GreaterOrEqual(physical, lastPhysical)
if physical == lastPhysical {
re.Greater(logical, lastLogical)
}
lastPhysical = physical
lastLogical = logical
select {
case <-ch1:
ch <- struct{}{}
default:
}
} else {
t.Log(err)
}
}
}(ctx, &wg, ch, ch1)
ch1 <- struct{}{}
<-ch
tsoCluster, err := tests.NewTestTSOCluster(ctx, 1, backendEndpoints)
re.NoError(err)
tsoCluster.WaitForDefaultPrimaryServing(re)
ch1 <- struct{}{}
<-ch
tsoCluster.Destroy()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode"))
}

0 comments on commit e3f29e6

Please sign in to comment.