Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move tso to independent thread #8720

Merged
merged 11 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ func (k *serviceModeKeeper) close() {
k.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if k.tsoClient != nil {
k.tsoClient.close()
}
k.tsoClient.close()
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
}
Expand Down
3 changes: 3 additions & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@

// Close releases all resources.
func (c *pdServiceDiscovery) Close() {
if c == nil {
return
}

Check warning on line 639 in client/pd_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/pd_service_discovery.go#L638-L639

Added lines #L638 - L639 were not covered by tests
c.closeOnce.Do(func() {
log.Info("[pd] close pd service discovery client")
c.clientConns.Range(func(key, cc any) bool {
Expand Down
3 changes: 3 additions & 0 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@

// Close releases all resources
func (c *tsoServiceDiscovery) Close() {
if c == nil {
return
}

Check warning on line 231 in client/tso_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/tso_service_discovery.go#L230-L231

Added lines #L230 - L231 were not covered by tests
log.Info("closing tso service discovery")

c.cancel()
Expand Down
106 changes: 91 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"context"
"encoding/json"
errorspkg "errors"
"fmt"
"io"
"math"
Expand All @@ -43,6 +44,7 @@
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/ratelimit"
Expand All @@ -56,6 +58,7 @@
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
Expand Down Expand Up @@ -88,12 +91,13 @@
// nodeStateCheckJobInterval is the interval to run node state check job.
nodeStateCheckJobInterval = 10 * time.Second
// metricsCollectionJobInterval is the interval to run metrics collection job.
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
serviceCheckInterval = 10 * time.Second
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
schedulingServiceCheckInterval = 10 * time.Second
tsoServiceCheckInterval = 100 * time.Millisecond
// persistLimitRetryTimes is used to reduce the probability of the persistent error
// since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist.
persistLimitRetryTimes = 5
Expand Down Expand Up @@ -144,6 +148,7 @@
cancel context.CancelFunc

*core.BasicCluster // cached cluster info
member *member.EmbeddedEtcdMember

etcdClient *clientv3.Client
httpClient *http.Client
Expand Down Expand Up @@ -174,6 +179,7 @@
keyspaceGroupManager *keyspace.GroupManager
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
tsoAllocator *tso.AllocatorManager

// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
Expand All @@ -194,16 +200,18 @@
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client) *RaftCluster {
func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
clusterID: clusterID,
member: member,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
BasicCluster: basicCluster,
storage: storage,
tsoAllocator: tsoAllocator,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
Expand Down Expand Up @@ -314,11 +322,13 @@
if err != nil {
return err
}
c.checkTSOService()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this func next to checkSchedulingService?

Copy link
Member Author

@rleungx rleungx Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to check it ASAP

cluster, err := c.LoadClusterInfo()
if err != nil {
return err
}
if cluster == nil {
log.Warn("cluster is not bootstrapped")
return nil
}

Expand Down Expand Up @@ -351,7 +361,7 @@
return err
}
}
c.checkServices()
c.checkSchedulingService()
c.wg.Add(9)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
Expand All @@ -370,7 +380,7 @@
return nil
}

func (c *RaftCluster) checkServices() {
func (c *RaftCluster) checkSchedulingService() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
Expand All @@ -390,27 +400,90 @@
}
}

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we do not need this wrap function now. We can call startTSOJobs directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the upcomming PR

if c.isAPIServiceMode {
return
}
if c.member.IsLeader() {
if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}

Check warning on line 413 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L410-L413

Added lines #L410 - L413 were not covered by tests
} else {
// leader exits, reset the allocator group
if err := c.stopTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}

Check warning on line 420 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L417-L420

Added lines #L417 - L420 were not covered by tests

failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}

Check warning on line 426 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L425-L426

Added lines #L425 - L426 were not covered by tests
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}

Check warning on line 429 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L428-L429

Added lines #L428 - L429 were not covered by tests
})
}
}

func (c *RaftCluster) runServiceCheckJob() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(serviceCheckInterval)
schedulingTicker := time.NewTicker(schedulingServiceCheckInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Reset(time.Millisecond)
schedulingTicker.Reset(time.Millisecond)
})
defer ticker.Stop()
defer schedulingTicker.Stop()
tsoTicker := time.NewTicker(tsoServiceCheckInterval)
defer tsoTicker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("service check job is stopped")
return
case <-ticker.C:
c.checkServices()
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
}
}
}

func (c *RaftCluster) startTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}

Check warning on line 464 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L462-L464

Added lines #L462 - L464 were not covered by tests
if !allocator.IsInitialize() {
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return err
}

Check warning on line 470 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L468-L470

Added lines #L468 - L470 were not covered by tests
}
return nil
}

func (c *RaftCluster) stopTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}

Check warning on line 480 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L478-L480

Added lines #L478 - L480 were not covered by tests
if allocator.IsInitialize() {
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
}
return nil
}

// startGCTuner
func (c *RaftCluster) startGCTuner() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -757,6 +830,9 @@
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.stopSchedulingJobs()
}
if err := c.stopTSOJobs(); err != nil {
log.Error("failed to stop tso jobs", errs.ZapError(err))
}

Check warning on line 835 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L834-L835

Added lines #L834 - L835 were not covered by tests
c.heartbeatRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
Expand Down
26 changes: 1 addition & 25 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package server
import (
"bytes"
"context"
errorspkg "errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -490,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error {

s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Client: s.client,
RootPath: s.rootPath,
Expand Down Expand Up @@ -1715,29 +1714,6 @@ func (s *Server) campaignLeader() {
s.member.KeepLeader(ctx)
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))

if !s.IsAPIServiceMode() {
Copy link
Member

@okJiang okJiang Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need we remove the comments? L1714, L1703-L1706

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it still works

allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get the global TSO allocator", errs.ZapError(err))
return
}
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false)
failpoint.Inject("updateAfterResetTSO", func() {
if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}()
}
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
return err == nil
})
// Resign leader to trigger the TSO resetting.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/updateAfterResetTSO", "return(true)"))
oldLeaderName := suite.cluster.WaitLeader()
re.NotEmpty(oldLeaderName)
err := suite.cluster.GetServer(oldLeaderName).ResignLeader()
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/updateAfterResetTSO"))
newLeaderName := suite.cluster.WaitLeader()
re.NotEmpty(newLeaderName)
re.NotEqual(oldLeaderName, newLeaderName)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/tso/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
re.NotEmpty(leaderName)
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
suite.pdLeaderServer.BootstrapCluster()
backendEndpoints := suite.pdLeaderServer.GetAddr()
if suite.legacy {
suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (suite *tsoServerTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
re.NotEmpty(leaderName)
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
suite.pdLeaderServer.BootstrapCluster()
backendEndpoints := suite.pdLeaderServer.GetAddr()
if suite.legacy {
suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints)
Expand Down
6 changes: 3 additions & 3 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
Expand Down Expand Up @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) {
}
re.NoError(testStorage.Flush())

raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
raftCluster, err = raftCluster.LoadClusterInfo()
re.NoError(err)
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
Expand Down
3 changes: 3 additions & 0 deletions tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) {
re.NoError(cluster.RunInitialServers())

cluster.WaitAllLeaders(re, dcLocationConfig)
leaderServer := cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()

// Wait for all nodes becoming healthy.
time.Sleep(time.Second * 5)
Expand Down
3 changes: 3 additions & 0 deletions tests/server/tso/global_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestDelaySyncTimestamp(t *testing.T) {
var leaderServer, nextLeaderServer *tests.TestServer
leaderServer = cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
for _, s := range cluster.GetServers() {
if s.GetConfig().Name != cluster.GetLeader() {
nextLeaderServer = s
Expand Down Expand Up @@ -146,6 +147,8 @@ func TestLogicalOverflow(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())

leaderServer := cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()

Expand Down
4 changes: 3 additions & 1 deletion tests/server/tso/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) {
re.NoError(cluster.RunInitialServers())

cluster.WaitAllLeaders(re, dcLocationConfig)
leaderServer := cluster.GetLeaderServer()
leaderServer.BootstrapCluster()
requestLocalTSOs(re, cluster, dcLocationConfig)

// Reboot the cluster.
Expand All @@ -125,7 +127,7 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())

// Re-request the global TSOs.
leaderServer := cluster.GetLeaderServer()
leaderServer = cluster.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
req := &pdpb.TsoRequest{
Expand Down
Loading