From 3003d43deb1ac3a6deea6b3d58b4166a5164ee2e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 25 Oct 2024 14:32:19 +0800 Subject: [PATCH 1/6] add a config to manage tso switch Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 25 ++++++++++++++++++++++++- server/config/config.go | 14 ++++++++++++-- server/server.go | 3 +-- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dcf91f71b59..804a32af36e 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -409,11 +409,30 @@ func (c *RaftCluster) checkSchedulingService() { // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { if c.isAPIServiceMode { + if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { + servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.TSOServiceName) + if err != nil || len(servers) == 0 { + if err := c.startTSOJobs(); err != nil { + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return + } + log.Info("TSO is provided by PD") + c.UnsetServiceIndependent(constant.TSOServiceName) + } else { + if err := c.stopTSOJobs(); err != nil { + log.Error("failed to stop TSO jobs", errs.ZapError(err)) + return + } + log.Info("TSO is provided by TSO server") + if !c.IsServiceIndependent(constant.TSOServiceName) { + c.SetServiceIndependent(constant.TSOServiceName) + } + } + } return } if err := c.startTSOJobs(); err != nil { - // If there is an error, need to wait for the next check. log.Error("failed to start TSO jobs", errs.ZapError(err)) return } @@ -428,6 +447,8 @@ func (c *RaftCluster) runServiceCheckJob() { schedulingTicker.Reset(time.Millisecond) }) defer schedulingTicker.Stop() + tsoTicker := time.NewTicker(tsoServiceCheckInterval) + defer tsoTicker.Stop() for { select { @@ -436,6 +457,8 @@ func (c *RaftCluster) runServiceCheckJob() { return case <-schedulingTicker.C: c.checkSchedulingService() + case <-tsoTicker.C: + c.checkTSOService() } } } diff --git a/server/config/config.go b/server/config/config.go index c64ee3831b0..d4139f4fe10 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -252,7 +252,8 @@ const ( minCheckRegionSplitInterval = 1 * time.Millisecond maxCheckRegionSplitInterval = 100 * time.Millisecond - defaultEnableSchedulingFallback = true + defaultEnableSchedulingFallback = true + defaultEnableTSODynamicSwitching = false ) // Special keys for Labels @@ -854,13 +855,17 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { // MicroServiceConfig is the configuration for micro service. type MicroServiceConfig struct { - EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"` } func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("enable-scheduling-fallback") { c.EnableSchedulingFallback = defaultEnableSchedulingFallback } + if !meta.IsDefined("enable-dynamic-tso") { + c.EnableTSODynamicSwitching = defaultEnableTSODynamicSwitching + } } // Clone returns a copy of micro service config. @@ -874,6 +879,11 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { return c.EnableSchedulingFallback } +// IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching. +func (c *MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool { + return c.EnableTSODynamicSwitching +} + // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/server.go b/server/server.go index 760b185a6ff..029c85694c3 100644 --- a/server/server.go +++ b/server/server.go @@ -1411,8 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { if s.mode == APIServiceMode && !s.IsClosed() { - // TODO: remove it after we support tso discovery - if name == constant.TSOServiceName { + if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { return true } return s.cluster.IsServiceIndependent(name) From 83378ec10dcb182403d54bff13b20546ae3f2287 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 28 Oct 2024 10:52:01 +0800 Subject: [PATCH 2/6] use an individual PR to add config Signed-off-by: Ryan Leung --- server/config/config.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index d4139f4fe10..36126572099 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -252,8 +252,7 @@ const ( minCheckRegionSplitInterval = 1 * time.Millisecond maxCheckRegionSplitInterval = 100 * time.Millisecond - defaultEnableSchedulingFallback = true - defaultEnableTSODynamicSwitching = false + defaultEnableSchedulingFallback = true ) // Special keys for Labels @@ -855,17 +854,13 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { // MicroServiceConfig is the configuration for micro service. type MicroServiceConfig struct { - EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` - EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"` + EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` } func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("enable-scheduling-fallback") { c.EnableSchedulingFallback = defaultEnableSchedulingFallback } - if !meta.IsDefined("enable-dynamic-tso") { - c.EnableTSODynamicSwitching = defaultEnableTSODynamicSwitching - } } // Clone returns a copy of micro service config. @@ -880,8 +875,9 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { } // IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching. -func (c *MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool { - return c.EnableTSODynamicSwitching +func (*MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool { + // TODO: make it configurable + return false } // KeyspaceConfig is the configuration for keyspace management. From 6fb708df0e77da21c15efb06c58dd16d1f407751 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 28 Oct 2024 15:03:29 +0800 Subject: [PATCH 3/6] forward Signed-off-by: Ryan Leung --- server/grpc_service.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/server/grpc_service.go b/server/grpc_service.go index 25d5d3ed8e7..d5fd8ae3e32 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -529,10 +529,29 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { return s.forwardTSO(stream) } + tsDeadlineCh := make(chan *tsoutil.TSDeadline, 1) + go tsoutil.WatchTSDeadline(stream.Context(), tsDeadlineCh) + var ( doneCh chan struct{} errCh chan error + // The following are tso forward stream related variables. + forwardStream tsopb.TSO_TsoClient + cancelForward context.CancelFunc + forwardCtx context.Context + tsoStreamErr error + lastForwardedHost string ) + + defer func() { + if cancelForward != nil { + cancelForward() + } + if grpcutil.NeedRebuildConnection(tsoStreamErr) { + s.closeDelegateClient(lastForwardedHost) + } + }() + ctx, cancel := context.WithCancel(stream.Context()) defer cancel() for { @@ -570,6 +589,21 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { continue } + if s.IsServiceIndependent(constant.TSOServiceName) { + if request.GetCount() == 0 { + err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive") + return status.Error(codes.Unknown, err.Error()) + } + forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, nil, request, tsDeadlineCh, lastForwardedHost, cancelForward) + if tsoStreamErr != nil { + return tsoStreamErr + } + if err != nil { + return err + } + continue + } + start := time.Now() // TSO uses leader lease to determine validity. No need to check leader here. if s.IsClosed() { From 8afbaab9ebeeb61015571e3361048d5b2ef4e56e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 30 Oct 2024 13:40:00 +0800 Subject: [PATCH 4/6] address the comment Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 804a32af36e..d8288bf93af 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -412,14 +412,14 @@ func (c *RaftCluster) checkTSOService() { if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.TSOServiceName) if err != nil || len(servers) == 0 { - if err := c.startTSOJobs(); err != nil { + if err := c.startTSOJobsIfNeeded(); err != nil { log.Error("failed to start TSO jobs", errs.ZapError(err)) return } log.Info("TSO is provided by PD") c.UnsetServiceIndependent(constant.TSOServiceName) } else { - if err := c.stopTSOJobs(); err != nil { + if err := c.startTSOJobsIfNeeded(); err != nil { log.Error("failed to stop TSO jobs", errs.ZapError(err)) return } @@ -432,7 +432,7 @@ func (c *RaftCluster) checkTSOService() { return } - if err := c.startTSOJobs(); err != nil { + if err := c.startTSOJobsIfNeeded(); err != nil { log.Error("failed to start TSO jobs", errs.ZapError(err)) return } @@ -463,7 +463,7 @@ func (c *RaftCluster) runServiceCheckJob() { } } -func (c *RaftCluster) startTSOJobs() error { +func (c *RaftCluster) startTSOJobsIfNeeded() error { allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) if err != nil { log.Error("failed to get global TSO allocator", errs.ZapError(err)) @@ -479,7 +479,7 @@ func (c *RaftCluster) startTSOJobs() error { return nil } -func (c *RaftCluster) stopTSOJobs() error { +func (c *RaftCluster) stopTSOJobsIfNeeded() error { allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) if err != nil { log.Error("failed to get global TSO allocator", errs.ZapError(err)) @@ -847,7 +847,7 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(constant.SchedulingServiceName) { c.stopSchedulingJobs() } - if err := c.stopTSOJobs(); err != nil { + if err := c.stopTSOJobsIfNeeded(); err != nil { log.Error("failed to stop tso jobs", errs.ZapError(err)) } c.heartbeatRunner.Stop() From 1200ea8cef1f8b6b135d6f5c7bc41c8f78da6903 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 30 Oct 2024 14:17:12 +0800 Subject: [PATCH 5/6] fix Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index d8288bf93af..0e14adf3402 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -410,7 +410,7 @@ func (c *RaftCluster) checkSchedulingService() { func (c *RaftCluster) checkTSOService() { if c.isAPIServiceMode { if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { - servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.TSOServiceName) + servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) if err != nil || len(servers) == 0 { if err := c.startTSOJobsIfNeeded(); err != nil { log.Error("failed to start TSO jobs", errs.ZapError(err)) From 56d1ef73152c1e3f6ed3f56fde103b7e5b706ec2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 30 Oct 2024 15:59:35 +0800 Subject: [PATCH 6/6] remove config Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 5 ++++- server/config/config.go | 6 ------ server/server.go | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 0e14adf3402..46a525a3e09 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -114,6 +114,9 @@ const ( heartbeatTaskRunner = "heartbeat-async" miscTaskRunner = "misc-async" logTaskRunner = "log-async" + + // TODO: make it configurable + IsTSODynamicSwitchingEnabled = false ) // Server is the interface for cluster. @@ -409,7 +412,7 @@ func (c *RaftCluster) checkSchedulingService() { // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { if c.isAPIServiceMode { - if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { + if IsTSODynamicSwitchingEnabled { servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) if err != nil || len(servers) == 0 { if err := c.startTSOJobsIfNeeded(); err != nil { diff --git a/server/config/config.go b/server/config/config.go index 36126572099..c64ee3831b0 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -874,12 +874,6 @@ func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { return c.EnableSchedulingFallback } -// IsTSODynamicSwitchingEnabled returns whether to enable TSO dynamic switching. -func (*MicroServiceConfig) IsTSODynamicSwitchingEnabled() bool { - // TODO: make it configurable - return false -} - // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/server.go b/server/server.go index 029c85694c3..c88871658dc 100644 --- a/server/server.go +++ b/server/server.go @@ -1411,7 +1411,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { if s.mode == APIServiceMode && !s.IsClosed() { - if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() { + if name == constant.TSOServiceName && !cluster.IsTSODynamicSwitchingEnabled { return true } return s.cluster.IsServiceIndependent(name)