From 1d386f6f189feb4a9b83799efa593474b62e5aa3 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 22 Apr 2024 21:25:39 +0800 Subject: [PATCH 1/5] tests: avoid panic in cluster test (#8114) close tikv/pd#8113 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/cluster/cluster_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ecd579d8881..945e354bb6c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3655,7 +3655,8 @@ func TestInterval(t *testing.T) { func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + if r := stream.Recv(); r != nil { + res = r.(*pdpb.RegionHeartbeatResponse) return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3671,7 +3672,8 @@ func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + if r := stream.Recv(); r != nil { + res = r.(*pdpb.RegionHeartbeatResponse) return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3688,7 +3690,8 @@ func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStr func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + if r := stream.Recv(); r != nil { + res = r.(*pdpb.RegionHeartbeatResponse) return res.GetRegionId() == region.GetID() && res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && res.GetChangePeer().GetPeer().GetStoreId() == storeID @@ -3704,7 +3707,8 @@ func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.Eventually(re, func() bool { - if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + if r := stream.Recv(); r != nil { + res = r.(*pdpb.RegionHeartbeatResponse) if res.GetRegionId() == region.GetID() { for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { if peer.GetStoreId() == storeID { From 1e65f9dda7d9b93e68178139ec3b4c31539a223d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 24 Apr 2024 14:18:10 +0800 Subject: [PATCH 2/5] *: fix follower cannot handle member request (#8122) ref tikv/pd#7519 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/apiv2/handlers/micro_service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 209a4c95445..fd44665530f 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -26,7 +26,6 @@ import ( // RegisterMicroService registers microservice handler to the router. func RegisterMicroService(r *gin.RouterGroup) { router := r.Group("ms") - router.Use(middlewares.BootstrapChecker()) router.GET("members/:service", GetMembers) router.GET("primary/:service", GetPrimary) } From 141186e958dcd3b88e3dd289c9294f6623c3eba3 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 24 Apr 2024 14:54:11 +0800 Subject: [PATCH 3/5] *: optimize heartbeat process with concurrent runner - part 2 (#8052) ref tikv/pd#7897 Optimize heartbeat process - Split the statics updates on the subtree Signed-off-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region.go | 173 +++++++++++++++++++++++++++ pkg/core/region_test.go | 86 ++++++++++++- pkg/mcs/scheduling/server/cluster.go | 28 ++--- pkg/ratelimit/metrics.go | 52 ++++++++ pkg/ratelimit/runner.go | 18 +++ pkg/schedule/config/config.go | 2 +- server/cluster/cluster.go | 36 +++--- 7 files changed, 354 insertions(+), 41 deletions(-) create mode 100644 pkg/ratelimit/metrics.go diff --git a/pkg/core/region.go b/pkg/core/region.go index 41bfb4d31ad..713e82cc36d 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -856,6 +856,24 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } } +// RegionHeartbeatStageName is the name of the stage of the region heartbeat. +const ( + HandleStatsAsync = "HandleStatsAsync" + ObserveRegionStatsAsync = "ObserveRegionStatsAsync" + UpdateSubTree = "UpdateSubTree" + HandleOverlaps = "HandleOverlaps" + CollectRegionStatsAsync = "CollectRegionStatsAsync" + SaveRegionToKV = "SaveRegionToKV" +) + +// ExtraTaskOpts returns the task options for the task. +func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts { + return ratelimit.TaskOpts{ + TaskName: name, + Limit: ctx.Limiter, + } +} + // RWLockStats is a read-write lock with statistics. type RWLockStats struct { syncutil.RWMutex @@ -1004,6 +1022,161 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R return overlaps, nil } +// CheckAndPutRootTree checks if the region is valid to put to the root, if valid then return error. +// Usually used with CheckAndPutSubTree together. +func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) { + tracer := ctx.Tracer + r.t.Lock() + var ols []*regionItem + origin := r.getRegionLocked(region.GetID()) + if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { + ols = r.tree.overlaps(®ionItem{RegionInfo: region}) + } + tracer.OnCheckOverlapsFinished() + err := check(region, origin, convertItemsToRegions(ols)) + if err != nil { + r.t.Unlock() + tracer.OnValidateRegionFinished() + return nil, err + } + tracer.OnValidateRegionFinished() + _, overlaps, _ := r.setRegionLocked(region, true, ols...) + r.t.Unlock() + tracer.OnSetRegionFinished() + return overlaps, nil +} + +// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error. +// Usually used with CheckAndPutRootTree together. +func (r *RegionsInfo) CheckAndPutSubTree(region *RegionInfo) { + // new region get from root tree again + var newRegion *RegionInfo + newRegion = r.GetRegion(region.GetID()) + if newRegion == nil { + newRegion = region + } + r.UpdateSubTreeOrderInsensitive(newRegion) +} + +// UpdateSubTreeOrderInsensitive updates the subtree. +// It's can used to update the subtree concurrently. +// because it can use concurrently, check region version to make sure the order. +// 1. if the version is stale, drop this update. +// 2. if the version is same, then only some statistic info need to be updated. +// in this situation, the order of update is not important. +// +// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic. +func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) { + var origin *RegionInfo + r.st.Lock() + defer r.st.Unlock() + originItem, ok := r.subRegions[region.GetID()] + if ok { + origin = originItem.RegionInfo + } + rangeChanged := true + + if origin != nil { + re := region.GetRegionEpoch() + oe := origin.GetRegionEpoch() + isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() + if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() { + // Region meta is stale, skip. + return + } + rangeChanged = !origin.rangeEqualsTo(region) + + if rangeChanged || !origin.peersEqualTo(region) { + // If the range or peers have changed, the sub regionTree needs to be cleaned up. + // TODO: Improve performance by deleting only the different peers. + r.removeRegionFromSubTreeLocked(origin) + } else { + // The region tree and the subtree update is not atomic and the region tree is updated first. + // If there are two thread needs to update region tree, + // t1: thread-A update region tree + // t2: thread-B: update region tree again + // t3: thread-B: update subtree + // t4: thread-A: update region subtree + // to keep region tree consistent with subtree, we need to drop this update. + if tree, ok := r.subRegions[region.GetID()]; ok { + r.updateSubTreeStat(origin, region) + tree.RegionInfo = region + } + return + } + } + + if rangeChanged { + overlaps := r.getOverlapRegionFromSubTreeLocked(region) + for _, re := range overlaps { + r.removeRegionFromSubTreeLocked(re) + } + } + + item := ®ionItem{region} + r.subRegions[region.GetID()] = item + // It has been removed and all information needs to be updated again. + // Set peers then. + setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) { + store, ok := peersMap[storeID] + if !ok { + store = newRegionTree() + peersMap[storeID] = store + } + store.update(item, false) + } + + // Add to leaders and followers. + for _, peer := range region.GetVoters() { + storeID := peer.GetStoreId() + if peer.GetId() == region.leader.GetId() { + // Add leader peer to leaders. + setPeer(r.leaders, storeID, item) + } else { + // Add follower peer to followers. + setPeer(r.followers, storeID, item) + } + } + + setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) { + for _, peer := range peers { + storeID := peer.GetStoreId() + setPeer(peersMap, storeID, item) + } + } + // Add to learners. + setPeers(r.learners, region.GetLearners()) + // Add to witnesses. + setPeers(r.witnesses, region.GetWitnesses()) + // Add to PendingPeers + setPeers(r.pendingPeers, region.GetPendingPeers()) +} + +func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo { + it := ®ionItem{RegionInfo: region} + overlaps := make([]*RegionInfo, 0) + overlapsMap := make(map[uint64]struct{}) + collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) { + if tree, ok := peersMap[storeID]; ok { + items := tree.overlaps(it) + for _, item := range items { + if _, ok := overlapsMap[item.GetID()]; !ok { + overlapsMap[item.GetID()] = struct{}{} + overlaps = append(overlaps, item.RegionInfo) + } + } + } + } + for _, peer := range region.GetMeta().GetPeers() { + storeID := peer.GetStoreId() + collectFromItemSlice(r.leaders, storeID) + collectFromItemSlice(r.followers, storeID) + collectFromItemSlice(r.learners, storeID) + collectFromItemSlice(r.witnesses, storeID) + } + return overlaps +} + // GetRelevantRegions returns the relevant regions for a given region. func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { r.t.RLock() diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index ae91886369f..88683968f3f 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -789,14 +789,15 @@ func randomBytes(n int) []byte { return bytes } -func newRegionInfoID(idAllocator id.Allocator) *RegionInfo { +func newRegionInfoIDRandom(idAllocator id.Allocator) *RegionInfo { var ( peers []*metapb.Peer leader *metapb.Peer ) + storeNum := 10 for i := 0; i < 3; i++ { id, _ := idAllocator.Alloc() - p := &metapb.Peer{Id: id, StoreId: id} + p := &metapb.Peer{Id: id, StoreId: uint64(i%storeNum + 1)} if i == 0 { leader = p } @@ -811,6 +812,8 @@ func newRegionInfoID(idAllocator id.Allocator) *RegionInfo { Peers: peers, }, leader, + SetApproximateSize(10), + SetApproximateKeys(10), ) } @@ -819,7 +822,7 @@ func BenchmarkAddRegion(b *testing.B) { idAllocator := mockid.NewIDAllocator() var items []*RegionInfo for i := 0; i < 10000000; i++ { - items = append(items, newRegionInfoID(idAllocator)) + items = append(items, newRegionInfoIDRandom(idAllocator)) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -858,3 +861,80 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) { RegionFromHeartbeat(regionReq) } } + +func TestUpdateRegionEquivalence(t *testing.T) { + re := require.New(t) + regionsOld := NewRegionsInfo() + regionsNew := NewRegionsInfo() + storeNums := 5 + items := generateTestRegions(1000, storeNums) + + updateRegion := func(item *RegionInfo) { + // old way + ctx := ContextTODO() + regionsOld.AtomicCheckAndPutRegion(ctx, item) + // new way + ctx = ContextTODO() + regionsNew.CheckAndPutRootTree(ctx, item) + regionsNew.CheckAndPutSubTree(item) + } + checksEquivalence := func() { + re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte(""))) + re.Equal(regionsOld.GetRegionSizeByRange([]byte(""), []byte("")), regionsNew.GetRegionSizeByRange([]byte(""), []byte(""))) + checkRegions(re, regionsOld) + checkRegions(re, regionsNew) + + for i := 1; i <= storeNums; i++ { + re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i))) + re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i))) + re.Equal(regionsOld.GetStorePendingPeerCount(uint64(i)), regionsNew.GetStorePendingPeerCount(uint64(i))) + re.Equal(regionsOld.GetStoreLearnerRegionSize(uint64(i)), regionsNew.GetStoreLearnerRegionSize(uint64(i))) + re.Equal(regionsOld.GetStoreRegionSize(uint64(i)), regionsNew.GetStoreRegionSize(uint64(i))) + re.Equal(regionsOld.GetStoreLeaderRegionSize(uint64(i)), regionsNew.GetStoreLeaderRegionSize(uint64(i))) + re.Equal(regionsOld.GetStoreFollowerRegionSize(uint64(i)), regionsNew.GetStoreFollowerRegionSize(uint64(i))) + } + } + + // Add a region. + for _, item := range items { + updateRegion(item) + } + checksEquivalence() + + // Merge regions. + itemA, itemB := items[10], items[11] + itemMergedAB := itemA.Clone(WithEndKey(itemB.GetEndKey()), WithIncVersion()) + updateRegion(itemMergedAB) + checksEquivalence() + + // Split + itemA = itemA.Clone(WithIncVersion(), WithIncVersion()) + itemB = itemB.Clone(WithIncVersion(), WithIncVersion()) + updateRegion(itemA) + updateRegion(itemB) + checksEquivalence() +} + +func generateTestRegions(count int, storeNum int) []*RegionInfo { + var items []*RegionInfo + for i := 0; i < count; i++ { + peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)} + peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)} + peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)} + if i%3 == 0 { + peer2.IsWitness = true + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer1, peer2, peer3}, + StartKey: []byte(fmt.Sprintf("%20d", i*10)), + EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100}, + }, + peer1, + SetApproximateKeys(10), + SetApproximateSize(10)) + items = append(items, region) + } + return items +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 42e8c3a35cb..94b24f4ca16 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -590,10 +590,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "HandleStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, @@ -610,10 +607,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "ObserveRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) @@ -632,16 +626,21 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // However, it can't solve the race condition of concurrent heartbeats from the same region. // Async task in next PR. - if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil { + if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "HandleOverlaps", - Limit: ctx.Limiter, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), + func(_ context.Context) { + c.CheckAndPutSubTree(region) }, + ) + tracer.OnUpdateSubTreeFinished() + ctx.TaskRunner.RunTask( + ctx, + core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, @@ -651,10 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // handle region stats ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "CollectRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go new file mode 100644 index 00000000000..3c5020554a8 --- /dev/null +++ b/pkg/ratelimit/metrics.go @@ -0,0 +1,52 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const nameStr = "runner_name" + +var ( + RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_max_waiting_duration_seconds", + Help: "The duration of tasks waiting in the runner.", + }, []string{nameStr}) + + RunnerTaskPendingTasks = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_pending_tasks", + Help: "The number of pending tasks in the runner.", + }, []string{nameStr}) + RunnerTaskFailedTasks = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_failed_tasks_total", + Help: "The number of failed tasks in the runner.", + }, []string{nameStr}) +) + +func init() { + prometheus.MustRegister(RunnerTaskMaxWaitingDuration) + prometheus.MustRegister(RunnerTaskPendingTasks) + prometheus.MustRegister(RunnerTaskFailedTasks) +} diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 7f0ef21f791..c4f2d5bc5ac 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -53,6 +54,8 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup + failedTaskCount prometheus.Counter + maxWaitingDuration prometheus.Gauge } // NewConcurrentRunner creates a new ConcurrentRunner. @@ -62,6 +65,8 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), + failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), + maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } @@ -77,6 +82,7 @@ type TaskOpts struct { func (s *ConcurrentRunner) Start() { s.stopChan = make(chan struct{}) s.wg.Add(1) + ticker := time.NewTicker(5 * time.Second) go func() { defer s.wg.Done() for { @@ -92,8 +98,19 @@ func (s *ConcurrentRunner) Start() { go s.run(task.Ctx, task.f, nil) } case <-s.stopChan: + s.pendingMu.Lock() + s.pendingTasks = make([]*Task, 0, initialCapacity) + s.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", s.name)) return + case <-ticker.C: + maxDuration := time.Duration(0) + s.pendingMu.Lock() + if len(s.pendingTasks) > 0 { + maxDuration = time.Since(s.pendingTasks[0].submittedAt) + } + s.pendingMu.Unlock() + s.maxWaitingDuration.Set(maxDuration.Seconds()) } } }() @@ -144,6 +161,7 @@ func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(con if len(s.pendingTasks) > 0 { maxWait := time.Since(s.pendingTasks[0].submittedAt) if maxWait > s.maxPendingDuration { + s.failedTaskCount.Inc() return ErrMaxWaitingTasksExceeded } } diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 1f370176383..5a67a547483 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -52,7 +52,7 @@ const ( defaultEnableJointConsensus = true defaultEnableTiKVSplitRegion = true defaultEnableHeartbeatBreakdownMetrics = true - defaultEnableHeartbeatConcurrentRunner = false + defaultEnableHeartbeatConcurrentRunner = true defaultEnableCrossTableMerge = true defaultEnableDiagnostic = true defaultStrictlyMatchLabel = false diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dd59b63240f..dbc6a6cadf3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1015,10 +1015,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "HandleStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, @@ -1039,10 +1036,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "ObserveRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) @@ -1065,17 +1059,23 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.AtomicCheckAndPutRegion(ctx, region); err != nil { + if overlaps, err = c.core.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } + ctx.TaskRunner.RunTask( + ctx, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), + func(_ context.Context) { + c.CheckAndPutSubTree(region) + }, + ) + tracer.OnUpdateSubTreeFinished() + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "HandleOverlaps", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, @@ -1088,10 +1088,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // handle region stats ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "CollectRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. @@ -1105,10 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if saveKV { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "SaveRegionToKV", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.SaveRegionToKV), func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. From fa7841b212b42508467dd9d256fb8f55b745d2c5 Mon Sep 17 00:00:00 2001 From: Wallace Wu Date: Wed, 24 Apr 2024 15:42:11 +0800 Subject: [PATCH 4/5] Fix typo in keyspace config (#8109) ref tikv/pd#4399 Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: ShuNing --- conf/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/config.toml b/conf/config.toml index 8b80a5044f1..20f664a4c85 100644 --- a/conf/config.toml +++ b/conf/config.toml @@ -201,7 +201,7 @@ ## When enabled, usage data will be sent to PingCAP for improving user experience. # enable-telemetry = false -[keyspaces] +[keyspace] ## pre-alloc is used to pre-allocate keyspaces during pd bootstrap. ## Its value should be a list of strings, denotting the name of the keyspaces. ## Example: From b4f292d746b2bde1c6a7a1c7b5708a11c9a81536 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 24 Apr 2024 16:18:11 +0800 Subject: [PATCH 5/5] client/tso: organize the methods of TSO dispatcher (#8121) ref tikv/pd#8047 Organize the methods of the TSO dispatcher. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/tso_client.go | 334 +++++++++++++++++++++++++++++++++++++- client/tso_dispatcher.go | 343 ++------------------------------------- 2 files changed, 341 insertions(+), 336 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 8185b99d1d0..347d1f6ec0a 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -18,14 +18,22 @@ import ( "context" "fmt" "math/rand" + "runtime/trace" "sync" "time" + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" ) // TSOClient is the client used to get timestamps. @@ -127,18 +135,36 @@ func (c *tsoClient) Close() { c.wg.Wait() log.Info("close tso client") - c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { - if dispatcherInterface != nil { - dispatcher := dispatcherInterface.(*tsoDispatcher) - dispatcher.dispatcherCancel() - dispatcher.tsoBatchController.clear() - } - return true - }) - + c.closeTSODispatcher() log.Info("tso client is closed") } +func (c *tsoClient) scheduleCheckTSDeadline() { + select { + case c.checkTSDeadlineCh <- struct{}{}: + default: + } +} + +func (c *tsoClient) scheduleCheckTSODispatcher() { + select { + case c.checkTSODispatcherCh <- struct{}{}: + default: + } +} + +func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { + select { + case c.updateTSOConnectionCtxsCh <- struct{}{}: + default: + } +} + +// TSO Follower Proxy only supports the Global TSO proxy now. +func (c *tsoClient) allowTSOFollowerProxy(dc string) bool { + return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() +} + func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { req := c.tsoReqPool.Get().(*tsoRequest) // Set needed fields in the request before using it. @@ -279,3 +305,293 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { } return nil, "" } + +type tsoConnectionContext struct { + streamURL string + // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, + // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster + stream tsoStream + ctx context.Context + cancel context.CancelFunc +} + +func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { + // Normal connection creating, it will be affected by the `enableForwarding`. + createTSOConnection := c.tryConnectToTSO + if c.allowTSOFollowerProxy(dc) { + createTSOConnection = c.tryConnectToTSOWithProxy + } + if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { + log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + return false + } + return true +} + +// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable +// and enableForwarding is true, it will create a new connection to a follower to do the forwarding, +// while a new daemon will be created also to switch back to a normal leader connection ASAP the +// connection comes back to normal. +func (c *tsoClient) tryConnectToTSO( + dispatcherCtx context.Context, + dc string, + connectionCtxs *sync.Map, +) error { + var ( + networkErrNum uint64 + err error + stream tsoStream + url string + cc *grpc.ClientConn + ) + updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { + if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { + // If the previous connection still exists, we should close it first. + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Store(newURL, connectionCtx) + } + connectionCtxs.Range(func(url, cc any) bool { + if url.(string) != newURL { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(url) + } + return true + }) + } + // retry several times before falling back to the follower when the network problem happens + + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() + for i := 0; i < maxRetryTimes; i++ { + c.svcDiscovery.ScheduleCheckMemberChanged() + cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) + if cc != nil { + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + failpoint.Inject("unreachableNetwork", func() { + stream = nil + err = status.New(codes.Unavailable, "unavailable").Err() + }) + if stream != nil && err == nil { + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return nil + } + + if err != nil && c.option.enableForwarding { + // The reason we need to judge if the error code is equal to "Canceled" here is that + // when we create a stream we use a goroutine to manually control the timeout of the connection. + // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. + // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. + // And actually the `Canceled` error can be regarded as a kind of network error in some way. + if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { + networkErrNum++ + } + } + cancel() + } else { + networkErrNum++ + } + select { + case <-dispatcherCtx.Done(): + return err + case <-ticker.C: + } + } + + if networkErrNum == maxRetryTimes { + // encounter the network error + backupClientConn, backupURL := c.backupClientConn() + if backupClientConn != nil { + log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL)) + forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + + // create the follower stream + cctx, cancel := context.WithCancel(dispatcherCtx) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) + if err == nil { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addr := trimHTTPPrefix(backupURL) + // the goroutine is used to check the network and change back to the original stream + go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) + requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) + updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) + return nil + } + cancel() + } + } + return err +} + +// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as +// a TSO proxy to reduce the pressure of the main serving service endpoint. +func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { + tsoStreamBuilders := c.getAllTSOStreamBuilders() + leaderAddr := c.svcDiscovery.GetServingURL() + forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + // GC the stale one. + connectionCtxs.Range(func(addr, cc any) bool { + addrStr := addr.(string) + if _, ok := tsoStreamBuilders[addrStr]; !ok { + log.Info("[tso] remove the stale tso stream", + zap.String("dc", dc), + zap.String("addr", addrStr)) + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(addr) + } + return true + }) + // Update the missing one. + for addr, tsoStreamBuilder := range tsoStreamBuilders { + if _, ok = connectionCtxs.Load(addr); ok { + continue + } + log.Info("[tso] try to create tso stream", + zap.String("dc", dc), zap.String("addr", addr)) + cctx, cancel := context.WithCancel(dispatcherCtx) + // Do not proxy the leader client. + if addr != leaderAddr { + log.Info("[tso] use follower to forward tso stream to do the proxy", + zap.String("dc", dc), zap.String("addr", addr)) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + } + // Create the TSO stream. + stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) + if err == nil { + if addr != leaderAddr { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addrTrim := trimHTTPPrefix(addr) + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) + } + connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + continue + } + log.Error("[tso] create the tso stream failed", + zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) + cancel() + } + return nil +} + +// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers +// or of keyspace group primary/secondaries. +func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { + var ( + addrs = c.svcDiscovery.GetServiceURLs() + streamBuilders = make(map[string]tsoStreamBuilder, len(addrs)) + cc *grpc.ClientConn + err error + ) + for _, addr := range addrs { + if len(addrs) == 0 { + continue + } + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + continue + } + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc) + } + } + return streamBuilders +} + +type tsoInfo struct { + tsoServer string + reqKeyspaceGroupID uint32 + respKeyspaceGroupID uint32 + respReceivedAt time.Time + physical int64 + logical int64 +} + +func (c *tsoClient) processRequests( + stream tsoStream, dcLocation string, tbc *tsoBatchController, +) error { + requests := tbc.getCollectedRequests() + // nolint + for _, req := range requests { + defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End() + if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil { + span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + } + + count := int64(len(requests)) + reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() + respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( + c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, + dcLocation, count, tbc.batchStartTime) + if err != nil { + tbc.finishCollectedRequests(0, 0, 0, err) + return err + } + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) + curTSOInfo := &tsoInfo{ + tsoServer: stream.getServerURL(), + reqKeyspaceGroupID: reqKeyspaceGroupID, + respKeyspaceGroupID: respKeyspaceGroupID, + respReceivedAt: time.Now(), + physical: physical, + logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), + } + c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) + tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) + return nil +} + +func (c *tsoClient) compareAndSwapTS( + dcLocation string, + curTSOInfo *tsoInfo, + physical, firstLogical int64, +) { + val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) + if !loaded { + return + } + lastTSOInfo := val.(*tsoInfo) + if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { + log.Info("[tso] keyspace group changed", + zap.String("dc-location", dcLocation), + zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) + } + + // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical + // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then + // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // last time. + if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dcLocation), + zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + } + lastTSOInfo.tsoServer = curTSOInfo.tsoServer + lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID + lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID + lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt + lastTSOInfo.physical = curTSOInfo.physical + lastTSOInfo.logical = curTSOInfo.logical +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index d02fdd52af8..7528293a733 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -22,36 +22,16 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" - "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/timerpool" - "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" ) -type tsoDispatcher struct { - dispatcherCancel context.CancelFunc - tsoBatchController *tsoBatchController -} - -type tsoInfo struct { - tsoServer string - reqKeyspaceGroupID uint32 - respKeyspaceGroupID uint32 - respReceivedAt time.Time - physical int64 - logical int64 -} - const ( tsLoopDCCheckInterval = time.Minute defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst @@ -59,18 +39,9 @@ const ( maxRetryTimes = 6 ) -func (c *tsoClient) scheduleCheckTSODispatcher() { - select { - case c.checkTSODispatcherCh <- struct{}{}: - default: - } -} - -func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { - select { - case c.updateTSOConnectionCtxsCh <- struct{}{}: - default: - } +type tsoDispatcher struct { + dispatcherCancel context.CancelFunc + tsoBatchController *tsoBatchController } func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { @@ -115,6 +86,17 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { return false, nil } +func (c *tsoClient) closeTSODispatcher() { + c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { + if dispatcherInterface != nil { + dispatcher := dispatcherInterface.(*tsoDispatcher) + dispatcher.dispatcherCancel() + dispatcher.tsoBatchController.clear() + } + return true + }) +} + func (c *tsoClient) updateTSODispatcher() { // Set up the new TSO dispatcher and batch controller. c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { @@ -212,13 +194,6 @@ func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { } } -func (c *tsoClient) scheduleCheckTSDeadline() { - select { - case c.checkTSDeadlineCh <- struct{}{}: - default: - } -} - func (c *tsoClient) tsoDispatcherCheckLoop() { defer c.wg.Done() @@ -445,7 +420,7 @@ tsoBatchLoop: // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { - connectionCtx := c.chooseStream(&connectionCtxs) + connectionCtx := chooseStream(&connectionCtxs) if connectionCtx != nil { streamURL, stream, streamCtx, cancel = connectionCtx.streamURL, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel } @@ -541,14 +516,9 @@ tsoBatchLoop: } } -// TSO Follower Proxy only supports the Global TSO proxy now. -func (c *tsoClient) allowTSOFollowerProxy(dc string) bool { - return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() -} - // chooseStream uses the reservoir sampling algorithm to randomly choose a connection. // connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. -func (*tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { +func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { idx := 0 connectionCtxs.Range(func(_, cc any) bool { j := rand.Intn(idx + 1) @@ -560,284 +530,3 @@ func (*tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConn }) return connectionCtx } - -type tsoConnectionContext struct { - streamURL string - // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, - // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster - stream tsoStream - ctx context.Context - cancel context.CancelFunc -} - -func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { - // Normal connection creating, it will be affected by the `enableForwarding`. - createTSOConnection := c.tryConnectToTSO - if c.allowTSOFollowerProxy(dc) { - createTSOConnection = c.tryConnectToTSOWithProxy - } - if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { - log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) - return false - } - return true -} - -// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable -// and enableForwarding is true, it will create a new connection to a follower to do the forwarding, -// while a new daemon will be created also to switch back to a normal leader connection ASAP the -// connection comes back to normal. -func (c *tsoClient) tryConnectToTSO( - dispatcherCtx context.Context, - dc string, - connectionCtxs *sync.Map, -) error { - var ( - networkErrNum uint64 - err error - stream tsoStream - url string - cc *grpc.ClientConn - ) - updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { - if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { - // If the previous connection still exists, we should close it first. - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Store(newURL, connectionCtx) - } - connectionCtxs.Range(func(url, cc any) bool { - if url.(string) != newURL { - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(url) - } - return true - }) - } - // retry several times before falling back to the follower when the network problem happens - - ticker := time.NewTicker(retryInterval) - defer ticker.Stop() - for i := 0; i < maxRetryTimes; i++ { - c.svcDiscovery.ScheduleCheckMemberChanged() - cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) - if cc != nil { - cctx, cancel := context.WithCancel(dispatcherCtx) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) - failpoint.Inject("unreachableNetwork", func() { - stream = nil - err = status.New(codes.Unavailable, "unavailable").Err() - }) - if stream != nil && err == nil { - updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) - return nil - } - - if err != nil && c.option.enableForwarding { - // The reason we need to judge if the error code is equal to "Canceled" here is that - // when we create a stream we use a goroutine to manually control the timeout of the connection. - // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. - // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. - // And actually the `Canceled` error can be regarded as a kind of network error in some way. - if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { - networkErrNum++ - } - } - cancel() - } else { - networkErrNum++ - } - select { - case <-dispatcherCtx.Done(): - return err - case <-ticker.C: - } - } - - if networkErrNum == maxRetryTimes { - // encounter the network error - backupClientConn, backupURL := c.backupClientConn() - if backupClientConn != nil { - log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL)) - forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) - } - - // create the follower stream - cctx, cancel := context.WithCancel(dispatcherCtx) - cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) - if err == nil { - forwardedHostTrim := trimHTTPPrefix(forwardedHost) - addr := trimHTTPPrefix(backupURL) - // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) - requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) - updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) - return nil - } - cancel() - } - } - return err -} - -// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers -// or of keyspace group primary/secondaries. -func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { - var ( - addrs = c.svcDiscovery.GetServiceURLs() - streamBuilders = make(map[string]tsoStreamBuilder, len(addrs)) - cc *grpc.ClientConn - err error - ) - for _, addr := range addrs { - if len(addrs) == 0 { - continue - } - if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { - continue - } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) - resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) - healthCancel() - if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc) - } - } - return streamBuilders -} - -// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as -// a TSO proxy to reduce the pressure of the main serving service endpoint. -func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { - tsoStreamBuilders := c.getAllTSOStreamBuilders() - leaderAddr := c.svcDiscovery.GetServingURL() - forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) - } - // GC the stale one. - connectionCtxs.Range(func(addr, cc any) bool { - addrStr := addr.(string) - if _, ok := tsoStreamBuilders[addrStr]; !ok { - log.Info("[tso] remove the stale tso stream", - zap.String("dc", dc), - zap.String("addr", addrStr)) - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(addr) - } - return true - }) - // Update the missing one. - for addr, tsoStreamBuilder := range tsoStreamBuilders { - if _, ok = connectionCtxs.Load(addr); ok { - continue - } - log.Info("[tso] try to create tso stream", - zap.String("dc", dc), zap.String("addr", addr)) - cctx, cancel := context.WithCancel(dispatcherCtx) - // Do not proxy the leader client. - if addr != leaderAddr { - log.Info("[tso] use follower to forward tso stream to do the proxy", - zap.String("dc", dc), zap.String("addr", addr)) - cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - } - // Create the TSO stream. - stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) - if err == nil { - if addr != leaderAddr { - forwardedHostTrim := trimHTTPPrefix(forwardedHost) - addrTrim := trimHTTPPrefix(addr) - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) - } - connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) - continue - } - log.Error("[tso] create the tso stream failed", - zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) - cancel() - } - return nil -} - -func (c *tsoClient) processRequests( - stream tsoStream, dcLocation string, tbc *tsoBatchController, -) error { - requests := tbc.getCollectedRequests() - // nolint - for _, req := range requests { - defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End() - if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil { - span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - } - - count := int64(len(requests)) - reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() - respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( - c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, - dcLocation, count, tbc.batchStartTime) - if err != nil { - tbc.finishCollectedRequests(0, 0, 0, err) - return err - } - // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) - curTSOInfo := &tsoInfo{ - tsoServer: stream.getServerURL(), - reqKeyspaceGroupID: reqKeyspaceGroupID, - respKeyspaceGroupID: respKeyspaceGroupID, - respReceivedAt: time.Now(), - physical: physical, - logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), - } - c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) - tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) - return nil -} - -func (c *tsoClient) compareAndSwapTS( - dcLocation string, - curTSOInfo *tsoInfo, - physical, firstLogical int64, -) { - val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) - if !loaded { - return - } - lastTSOInfo := val.(*tsoInfo) - if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { - log.Info("[tso] keyspace group changed", - zap.String("dc-location", dcLocation), - zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), - zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) - } - - // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical - // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then - // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned - // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { - log.Panic("[tso] timestamp fallback", - zap.String("dc-location", dcLocation), - zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), - zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), - zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), - zap.String("last-tso-server", lastTSOInfo.tsoServer), - zap.String("cur-tso-server", curTSOInfo.tsoServer), - zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), - zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), - zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), - zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), - zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) - } - lastTSOInfo.tsoServer = curTSOInfo.tsoServer - lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID - lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID - lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt - lastTSOInfo.physical = curTSOInfo.physical - lastTSOInfo.logical = curTSOInfo.logical -}