From f3a8c1ed44d0e4223b348c3bb06b7a5f8c5752a6 Mon Sep 17 00:00:00 2001 From: zyguan Date: Thu, 29 Feb 2024 18:40:31 +0800 Subject: [PATCH 1/2] region_cache: move store related code to a standalone file Signed-off-by: zyguan --- internal/locate/region_cache.go | 914 +----------------------------- internal/locate/store_cache.go | 963 ++++++++++++++++++++++++++++++++ 2 files changed, 965 insertions(+), 912 deletions(-) create mode 100644 internal/locate/store_cache.go diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 7e071d4c8..f269104ae 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -69,16 +69,9 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" - atomic2 "go.uber.org/atomic" + uatomic "go.uber.org/atomic" "go.uber.org/zap" - "golang.org/x/sync/singleflight" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) @@ -221,7 +214,7 @@ type regionStore struct { // accessIndex[tiKVOnly][proxyTiKVIdx] is the index of TiKV that can forward requests to the leader in stores, -1 means not using proxy. proxyTiKVIdx AccessIndex // accessIndex[tiFlashOnly][workTiFlashIdx] is the index of the current working TiFlash in stores. - workTiFlashIdx atomic2.Int32 + workTiFlashIdx uatomic.Int32 // buckets is not accurate and it can change even if the region is not changed. // It can be stale and buckets keys can be out of the region range. buckets *metapb.Buckets @@ -507,8 +500,6 @@ func (mu *regionIndexMu) refresh(r []*Region) { mu.sorted = newMu.sorted } -type livenessFunc func(ctx context.Context, s *Store) livenessState - // repeat wraps a `func()` as a schedulable fuction for `bgRunner`. func repeat(f func()) func(context.Context, time.Time) bool { return func(_ context.Context, _ time.Time) bool { @@ -2596,727 +2587,6 @@ func (r *Region) ContainsByEnd(key []byte) bool { (bytes.Compare(key, r.meta.GetEndKey()) <= 0 || len(r.meta.GetEndKey()) == 0) } -const ( - tikvSlowScoreDecayRate float64 = 20.0 / 60.0 // s^(-1), linear decaying - tikvSlowScoreSlowThreshold int64 = 80 - - tikvSlowScoreUpdateInterval = time.Millisecond * 100 - tikvSlowScoreUpdateFromPDInterval = time.Minute -) - -type StoreHealthStatus struct { - isSlow atomic.Bool - - // A statistic for counting the request latency to this store - clientSideSlowScore SlowScoreStat - - tikvSideSlowScore struct { - sync.Mutex - - // The following atomic fields is designed to be able to be read by atomic options directly, but only written - // while holding the mutex. - - hasTiKVFeedback atomic.Bool - score atomic.Int64 - lastUpdateTime atomic.Pointer[time.Time] - } -} - -type HealthStatusDetail struct { - ClientSideSlowScore int64 - TiKVSideSlowScore int64 -} - -func newStoreHealthStatus() *StoreHealthStatus { - return &StoreHealthStatus{} -} - -// IsSlow returns whether current Store is slow. -func (s *StoreHealthStatus) IsSlow() bool { - return s.isSlow.Load() -} - -// GetHealthStatusDetail gets the current detailed information about the store's health status. -func (s *StoreHealthStatus) GetHealthStatusDetail() HealthStatusDetail { - return HealthStatusDetail{ - ClientSideSlowScore: int64(s.clientSideSlowScore.getSlowScore()), - TiKVSideSlowScore: s.tikvSideSlowScore.score.Load(), - } -} - -// tick updates the health status that changes over time, such as slow score's decaying, etc. This function is expected -// to be called periodically. -func (s *StoreHealthStatus) tick(now time.Time) { - s.clientSideSlowScore.updateSlowScore() - s.updateTiKVServerSideSlowScoreOnTick(now) - s.updateSlowFlag() -} - -// recordClientSideSlowScoreStat records timecost of each request to update the client side slow score. -func (s *StoreHealthStatus) recordClientSideSlowScoreStat(timecost time.Duration) { - s.clientSideSlowScore.recordSlowScoreStat(timecost) - s.updateSlowFlag() -} - -// markAlreadySlow marks the related store already slow. -func (s *StoreHealthStatus) markAlreadySlow() { - s.clientSideSlowScore.markAlreadySlow() - s.updateSlowFlag() -} - -// updateTiKVServerSideSlowScoreOnTick updates the slow score actively, which is expected to be a periodic job. -// It skips updating if the last update time didn't elapse long enough, or it's being updated concurrently. -func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) { - if !s.tikvSideSlowScore.hasTiKVFeedback.Load() { - // Do nothing if no feedback has been received from this store yet. - return - } - lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() - if lastUpdateTime == nil || now.Sub(*lastUpdateTime) < tikvSlowScoreUpdateFromPDInterval { - // If the first feedback is - return - } - - if !s.tikvSideSlowScore.TryLock() { - // It must be being updated concurrently. - return - } - defer s.tikvSideSlowScore.Unlock() - - // Reload update time as it might be updated concurrently before acquiring mutex - lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() - elapsed := now.Sub(*lastUpdateTime) - if elapsed < tikvSlowScoreUpdateFromPDInterval { - return - } - - // TODO: Try to get store status from PD here. But it's not mandatory. - // Don't forget to update tests if getting slow score from PD is implemented here. - - // If updating from PD is not successful: decay the slow score. - score := s.tikvSideSlowScore.score.Load() - if score < 1 { - return - } - // Linear decay by time - score = max(int64(math.Round(float64(score)-tikvSlowScoreDecayRate*elapsed.Seconds())), 1) - s.tikvSideSlowScore.score.Store(score) - - newUpdateTime := new(time.Time) - *newUpdateTime = now - s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) -} - -// updateTiKVServerSideSlowScore updates the tikv side slow score with the given value. -// Ignores if the last update time didn't elapse long enough, or it's being updated concurrently. -func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime time.Time) { - defer s.updateSlowFlag() - - lastScore := s.tikvSideSlowScore.score.Load() - - if lastScore == score { - return - } - - lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() - - if lastUpdateTime != nil && currTime.Sub(*lastUpdateTime) < tikvSlowScoreUpdateInterval { - return - } - - if !s.tikvSideSlowScore.TryLock() { - // It must be being updated concurrently. Skip. - return - } - defer s.tikvSideSlowScore.Unlock() - - s.tikvSideSlowScore.hasTiKVFeedback.Store(true) - // Reload update time as it might be updated concurrently before acquiring mutex - lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() - if lastUpdateTime != nil && currTime.Sub(*lastUpdateTime) < tikvSlowScoreUpdateInterval { - return - } - - newScore := score - - newUpdateTime := new(time.Time) - *newUpdateTime = currTime - - s.tikvSideSlowScore.score.Store(newScore) - s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) -} - -func (s *StoreHealthStatus) updateSlowFlag() { - isSlow := s.clientSideSlowScore.isSlow() || s.tikvSideSlowScore.score.Load() >= tikvSlowScoreSlowThreshold - s.isSlow.Store(isSlow) -} - -// setTiKVSlowScoreLastUpdateTimeForTest force sets last update time of TiKV server side slow score to specified value. -// For test purpose only. -func (s *StoreHealthStatus) setTiKVSlowScoreLastUpdateTimeForTest(lastUpdateTime time.Time) { - s.tikvSideSlowScore.Lock() - defer s.tikvSideSlowScore.Unlock() - s.tikvSideSlowScore.lastUpdateTime.Store(&lastUpdateTime) -} - -// Store contains a kv process's address. -type Store struct { - addr string // loaded store address - peerAddr string // TiFlash Proxy use peerAddr - saddr string // loaded store status address - storeID uint64 // store's id - state uint64 // unsafe store storeState - labels []*metapb.StoreLabel // stored store labels - resolveMutex sync.Mutex // protect pd from concurrent init requests - epoch uint32 // store fail epoch, see RegionStore.storeEpochs - storeType tikvrpc.EndpointType // type of the store - tokenCount atomic2.Int64 // used store token count - - loadStats atomic2.Pointer[storeLoadStats] - - // whether the store is unreachable due to some reason, therefore requests to the store needs to be - // forwarded by other stores. this is also the flag that a health check loop is running for this store. - // this mechanism is currently only applicable for TiKV stores. - livenessState uint32 - unreachableSince time.Time - - healthStatus *StoreHealthStatus - // A statistic for counting the flows of different replicas on this store - replicaFlowsStats [numReplicaFlowsType]uint64 -} - -type resolveState uint64 - -type storeLoadStats struct { - estimatedWait time.Duration - waitTimeUpdatedAt time.Time -} - -const ( - // The store is just created and normally is being resolved. - // Store in this state will only be resolved by initResolve(). - unresolved resolveState = iota - // The store is resolved and its address is valid. - resolved - // Request failed on this store and it will be re-resolved by asyncCheckAndResolveLoop(). - needCheck - // The store's address or label is changed and marked deleted. - // There is a new store struct replaced it in the RegionCache and should - // call changeToActiveStore() to get the new struct. - deleted - // The store is a tombstone. Should invalidate the region if tries to access it. - tombstone -) - -// String implements fmt.Stringer interface. -func (s resolveState) String() string { - switch s { - case unresolved: - return "unresolved" - case resolved: - return "resolved" - case needCheck: - return "needCheck" - case deleted: - return "deleted" - case tombstone: - return "tombstone" - default: - return fmt.Sprintf("unknown-%v", uint64(s)) - } -} - -func newStore( - id uint64, - addr string, - peerAddr string, - statusAddr string, - storeType tikvrpc.EndpointType, - state resolveState, - labels []*metapb.StoreLabel, -) *Store { - return &Store{ - storeID: id, - storeType: storeType, - state: uint64(state), - labels: labels, - addr: addr, - peerAddr: peerAddr, - saddr: statusAddr, - // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(), - } -} - -// newUninitializedStore creates a `Store` instance with only storeID initialized. -func newUninitializedStore(id uint64) *Store { - return &Store{ - storeID: id, - // Make sure healthStatus field is never null. - healthStatus: newStoreHealthStatus(), - } -} - -// IsTiFlash returns true if the storeType is TiFlash -func (s *Store) IsTiFlash() bool { - return s.storeType == tikvrpc.TiFlash -} - -// StoreID returns storeID. -func (s *Store) StoreID() uint64 { - return s.storeID -} - -// initResolve resolves the address of the store that never resolved and returns an -// empty string if it's a tombstone. -func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, err error) { - s.resolveMutex.Lock() - state := s.getResolveState() - defer s.resolveMutex.Unlock() - if state != unresolved { - if state != tombstone { - addr = s.addr - } - return - } - var store *metapb.Store - for { - start := time.Now() - store, err = c.fetchStore(bo.GetCtx(), s.storeID) - metrics.LoadRegionCacheHistogramWithGetStore.Observe(time.Since(start).Seconds()) - if err != nil { - metrics.RegionCacheCounterWithGetStoreError.Inc() - } else { - metrics.RegionCacheCounterWithGetStoreOK.Inc() - } - if err := bo.GetCtx().Err(); err != nil && errors.Cause(err) == context.Canceled { - return "", errors.WithStack(err) - } - if err != nil && !isStoreNotFoundError(err) { - // TODO: more refine PD error status handle. - err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err) - if err = bo.Backoff(retry.BoPDRPC, err); err != nil { - return - } - continue - } - // The store is a tombstone. - if store == nil || (store != nil && store.GetState() == metapb.StoreState_Tombstone) { - s.setResolveState(tombstone) - return "", nil - } - addr = store.GetAddress() - if addr == "" { - return "", errors.Errorf("empty store(%d) address", s.storeID) - } - s.addr = addr - s.peerAddr = store.GetPeerAddress() - s.saddr = store.GetStatusAddress() - s.storeType = tikvrpc.GetStoreTypeByMeta(store) - s.labels = store.GetLabels() - // Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety. - s.changeResolveStateTo(unresolved, resolved) - return s.addr, nil - } -} - -// A quick and dirty solution to find out whether an err is caused by StoreNotFound. -// todo: A better solution, maybe some err-code based error handling? -func isStoreNotFoundError(err error) bool { - return strings.Contains(err.Error(), "invalid store ID") && strings.Contains(err.Error(), "not found") -} - -// reResolve try to resolve addr for store that need check. Returns false if the region is in tombstone state or is -// deleted. -func (s *Store) reResolve(c *RegionCache) (bool, error) { - var addr string - store, err := c.fetchStore(context.Background(), s.storeID) - if err != nil { - metrics.RegionCacheCounterWithGetStoreError.Inc() - } else { - metrics.RegionCacheCounterWithGetStoreOK.Inc() - } - // `err` here can mean either "load Store from PD failed" or "store not found" - // If load Store from PD is successful but PD didn't find the store - // the err should be handled by next `if` instead of here - if err != nil && !isStoreNotFoundError(err) { - logutil.BgLogger().Error("loadStore from PD failed", zap.Uint64("id", s.storeID), zap.Error(err)) - // we cannot do backoff in reResolve loop but try check other store and wait tick. - return false, err - } - if store == nil || (store != nil && store.GetState() == metapb.StoreState_Tombstone) { - // store has be removed in PD, we should invalidate all regions using those store. - logutil.BgLogger().Info("invalidate regions in removed store", - zap.Uint64("store", s.storeID), zap.String("add", s.addr)) - atomic.AddUint32(&s.epoch, 1) - s.setResolveState(tombstone) - metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() - return false, nil - } - - storeType := tikvrpc.GetStoreTypeByMeta(store) - addr = store.GetAddress() - if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { - newStore := newStore( - s.storeID, - addr, - store.GetPeerAddress(), - store.GetStatusAddress(), - storeType, - resolved, - store.GetLabels(), - ) - newStore.livenessState = atomic.LoadUint32(&s.livenessState) - newStore.unreachableSince = s.unreachableSince - if s.addr == addr { - newStore.healthStatus = s.healthStatus - } - c.putStore(newStore) - s.setResolveState(deleted) - return false, nil - } - s.changeResolveStateTo(needCheck, resolved) - return true, nil -} - -func (s *Store) getResolveState() resolveState { - if s == nil { - var state resolveState - return state - } - return resolveState(atomic.LoadUint64(&s.state)) -} - -func (s *Store) setResolveState(state resolveState) { - atomic.StoreUint64(&s.state, uint64(state)) -} - -// changeResolveStateTo changes the store resolveState from the old state to the new state. -// Returns true if it changes the state successfully, and false if the store's state -// is changed by another one. -func (s *Store) changeResolveStateTo(from, to resolveState) bool { - for { - state := s.getResolveState() - if state == to { - return true - } - if state != from { - return false - } - if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) { - logutil.BgLogger().Info("change store resolve state", - zap.Uint64("store", s.storeID), - zap.String("addr", s.addr), - zap.String("from", from.String()), - zap.String("to", to.String()), - zap.String("liveness-state", s.getLivenessState().String())) - return true - } - } -} - -// IsSameLabels returns whether the store have the same labels with target labels -func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool { - if len(s.labels) != len(labels) { - return false - } - return s.IsLabelsMatch(labels) -} - -// IsLabelsMatch return whether the store's labels match the target labels -func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool { - if len(labels) < 1 { - return true - } - for _, targetLabel := range labels { - if !isStoreContainLabel(s.labels, targetLabel.Key, targetLabel.Value) { - return false - } - } - return true -} - -// IsStoreMatch returns whether the store's id match the target ids. -func (s *Store) IsStoreMatch(stores []uint64) bool { - if len(stores) < 1 { - return true - } - for _, storeID := range stores { - if s.storeID == storeID { - return true - } - } - return false -} - -func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (res bool) { - for _, label := range labels { - if label.GetKey() == key && label.GetValue() == val { - res = true - break - } - } - return res -} - -// GetLabelValue returns the value of the label -func (s *Store) GetLabelValue(key string) (string, bool) { - for _, label := range s.labels { - if label.Key == key { - return label.Value, true - } - } - return "", false -} - -// getLivenessState gets the cached liveness state of the store. -// When it's not reachable, a goroutine will update the state in background. -// To get the accurate liveness state, use checkLiveness instead. -func (s *Store) getLivenessState() livenessState { - return livenessState(atomic.LoadUint32(&s.livenessState)) -} - -type livenessState uint32 - -func (l livenessState) injectConstantLiveness(tk testingKnobs) { - tk.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { return l }) -} - -var ( - livenessSf singleflight.Group - // storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance. - storeLivenessTimeout = time.Second -) - -// SetStoreLivenessTimeout sets storeLivenessTimeout to t. -func SetStoreLivenessTimeout(t time.Duration) { - storeLivenessTimeout = t -} - -// GetStoreLivenessTimeout returns storeLivenessTimeout. -func GetStoreLivenessTimeout() time.Duration { - return storeLivenessTimeout -} - -const ( - reachable livenessState = iota - unreachable - unknown -) - -// String implements fmt.Stringer interface. -func (s livenessState) String() string { - switch s { - case unreachable: - return "unreachable" - case reachable: - return "reachable" - case unknown: - return "unknown" - default: - return fmt.Sprintf("unknown-%v", uint32(s)) - } -} - -func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, c *RegionCache) (liveness livenessState) { - liveness = s.requestLiveness(bo.GetCtx(), c) - if liveness == reachable { - return - } - // This mechanism doesn't support non-TiKV stores currently. - if s.storeType != tikvrpc.TiKV { - logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store", - zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr)) - return - } - - // It may be already started by another thread. - if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { - s.unreachableSince = time.Now() - reResolveInterval := 30 * time.Second - if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { - if dur, err := time.ParseDuration(val.(string)); err == nil { - reResolveInterval = dur - } - } - if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil { - return - } - startHealthCheckLoop(c, s, liveness, reResolveInterval) - } - return -} - -func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reResolveInterval time.Duration) { - lastCheckPDTime := time.Now() - - c.bg.schedule(func(ctx context.Context, t time.Time) bool { - if t.Sub(lastCheckPDTime) > reResolveInterval { - lastCheckPDTime = t - - valid, err := s.reResolve(c) - if err != nil { - logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) - } else if !valid { - if s.getResolveState() != deleted { - logutil.BgLogger().Warn("[health check] store was still unhealthy at the end of health check loop", - zap.Uint64("storeID", s.storeID), - zap.String("state", s.getResolveState().String()), - zap.String("liveness", s.getLivenessState().String())) - return true - } - // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). - newStore, _ := c.getStore(s.storeID) - logutil.BgLogger().Info("[health check] store meta changed", - zap.Uint64("storeID", s.storeID), - zap.String("oldAddr", s.addr), - zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), - zap.String("newAddr", newStore.addr), - zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) - s = newStore - } - } - - liveness = s.requestLiveness(ctx, c) - atomic.StoreUint32(&s.livenessState, uint32(liveness)) - if liveness == reachable { - logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) - return true - } - return false - }, time.Second) -} - -func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) { - // It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. - if val, err := util.EvalFailpoint("injectLiveness"); err == nil { - liveness := val.(string) - if strings.Contains(liveness, " ") { - for _, item := range strings.Split(liveness, " ") { - kv := strings.Split(item, ":") - if len(kv) != 2 { - continue - } - if kv[0] == s.addr { - liveness = kv[1] - break - } - } - } - switch liveness { - case "unreachable": - return unreachable - case "reachable": - return reachable - case "unknown": - return unknown - } - } - - if tk != nil { - livenessFunc := tk.getMockRequestLiveness() - if livenessFunc != nil { - return livenessFunc(ctx, s) - } - } - - if storeLivenessTimeout == 0 { - return unreachable - } - - if s.getResolveState() != resolved { - l = unknown - return - } - addr := s.addr - rsCh := livenessSf.DoChan(addr, func() (interface{}, error) { - return invokeKVStatusAPI(addr, storeLivenessTimeout), nil - }) - select { - case rs := <-rsCh: - l = rs.Val.(livenessState) - case <-ctx.Done(): - l = unknown - return - } - return -} - -// GetAddr returns the address of the store -func (s *Store) GetAddr() string { - return s.addr -} - -// GetPeerAddr returns the peer address of the store -func (s *Store) GetPeerAddr() string { - return s.peerAddr -} - -// EstimatedWaitTime returns an optimistic estimation of how long a request will wait in the store. -// It's calculated by subtracting the time since the last update from the wait time returned from TiKV. -func (s *Store) EstimatedWaitTime() time.Duration { - loadStats := s.loadStats.Load() - if loadStats == nil { - return 0 - } - timeSinceUpdated := time.Since(loadStats.waitTimeUpdatedAt) - if loadStats.estimatedWait < timeSinceUpdated { - return 0 - } - return loadStats.estimatedWait - timeSinceUpdated -} - -func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) { - start := time.Now() - defer func() { - if l == reachable { - metrics.StatusCountWithOK.Inc() - } else { - metrics.StatusCountWithError.Inc() - } - metrics.TiKVStatusDuration.WithLabelValues(addr).Observe(time.Since(start).Seconds()) - }() - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - conn, cli, err := createKVHealthClient(ctx, addr) - if err != nil { - logutil.BgLogger().Info("[health check] create grpc connection failed", zap.String("store", addr), zap.Error(err)) - l = unreachable - return - } - defer func() { - err := conn.Close() - if err != nil { - logutil.BgLogger().Info("[health check] failed to close the grpc connection for health check", zap.String("store", addr), zap.Error(err)) - } - }() - - req := &healthpb.HealthCheckRequest{} - resp, err := cli.Check(ctx, req) - if err != nil { - logutil.BgLogger().Info("[health check] check health error", zap.String("store", addr), zap.Error(err)) - l = unreachable - return - } - - status := resp.GetStatus() - if status == healthpb.HealthCheckResponse_UNKNOWN || status == healthpb.HealthCheckResponse_SERVICE_UNKNOWN { - logutil.BgLogger().Info("[health check] check health returns unknown", zap.String("store", addr)) - l = unknown - return - } - - if status != healthpb.HealthCheckResponse_SERVING { - logutil.BgLogger().Info("[health check] service not serving", zap.Stringer("status", status)) - l = unreachable - return - } - - l = reachable - return -} - // checkAndUpdateStoreHealthStatus checks and updates health stats on each store. func (c *RegionCache) checkAndUpdateStoreHealthStatus() { defer func() { @@ -3342,28 +2612,6 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() { } } -// getReplicaFlowsStats returns the statistics on the related replicaFlowsType. -func (s *Store) getReplicaFlowsStats(destType replicaFlowsType) uint64 { - return atomic.LoadUint64(&s.replicaFlowsStats[destType]) -} - -// resetReplicaFlowsStats resets the statistics on the related replicaFlowsType. -func (s *Store) resetReplicaFlowsStats(destType replicaFlowsType) { - atomic.StoreUint64(&s.replicaFlowsStats[destType], 0) -} - -// recordReplicaFlowsStats records the statistics on the related replicaFlowsType. -func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { - atomic.AddUint64(&s.replicaFlowsStats[destType], 1) -} - -func (s *Store) recordHealthFeedback(feedback *tikvpb.HealthFeedback) { - // Note that the `FeedbackSeqNo` field of `HealthFeedback` is not used yet. It's a monotonic value that can help - // to drop out-of-order feedback messages. But it's not checked for now since it's not very necessary to receive - // only a slow score. It's prepared for possible use in the future. - s.healthStatus.updateTiKVServerSideSlowScore(int64(feedback.GetSlowScore()), time.Now()) -} - // reportStoreReplicaFlows reports the statistics on the related replicaFlowsType. func (c *RegionCache) reportStoreReplicaFlows() { c.forEachStore(func(store *Store) { @@ -3374,51 +2622,6 @@ func (c *RegionCache) reportStoreReplicaFlows() { }) } -func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) { - // Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to - // access it. - // TODO: Pass the config in a better way, or use the connArray inner the client directly rather than creating new - // connection. - - cfg := config.GetGlobalConfig() - - opt := grpc.WithTransportCredentials(insecure.NewCredentials()) - if len(cfg.Security.ClusterSSLCA) != 0 { - tlsConfig, err := cfg.Security.ToTLSConfig() - if err != nil { - return nil, nil, errors.WithStack(err) - } - opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) - } - keepAlive := cfg.TiKVClient.GrpcKeepAliveTime - keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout - conn, err := grpc.DialContext( - ctx, - addr, - opt, - grpc.WithInitialWindowSize(client.GrpcInitialWindowSize), - grpc.WithInitialConnWindowSize(client.GrpcInitialConnWindowSize), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: 100 * time.Millisecond, // Default was 1s. - Multiplier: 1.6, // Default - Jitter: 0.2, // Default - MaxDelay: 3 * time.Second, // Default was 120s. - }, - MinConnectTimeout: 5 * time.Second, - }), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Duration(keepAlive) * time.Second, - Timeout: time.Duration(keepAliveTimeout) * time.Second, - }), - ) - if err != nil { - return nil, nil, errors.WithStack(err) - } - cli := healthpb.NewHealthClient(conn) - return conn, cli, nil -} - func isSamePeer(lhs *metapb.Peer, rhs *metapb.Peer) bool { return lhs == rhs || (lhs.GetId() == rhs.GetId() && lhs.GetStoreId() == rhs.GetStoreId()) } @@ -3429,119 +2632,6 @@ func contains(startKey, endKey, key []byte) bool { (bytes.Compare(key, endKey) < 0 || len(endKey) == 0) } -type testingKnobs interface { - getMockRequestLiveness() livenessFunc - setMockRequestLiveness(f livenessFunc) -} - -func (c *RegionCache) getMockRequestLiveness() livenessFunc { - f := c.testingKnobs.mockRequestLiveness.Load() - if f == nil { - return nil - } - return *f -} - -func (c *RegionCache) setMockRequestLiveness(f livenessFunc) { - c.testingKnobs.mockRequestLiveness.Store(&f) -} - -type storeRegistry interface { - fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) - fetchAllStores(ctx context.Context) ([]*metapb.Store, error) -} - -func (c *RegionCache) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) { - return c.pdClient.GetStore(ctx, id) -} - -func (c *RegionCache) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) { - return c.pdClient.GetAllStores(ctx) -} - -func (c *RegionCache) getStore(id uint64) (store *Store, exists bool) { - c.storeMu.RLock() - store, exists = c.storeMu.stores[id] - c.storeMu.RUnlock() - return -} - -func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { - c.storeMu.Lock() - store, exists := c.storeMu.stores[id] - if !exists { - store = newUninitializedStore(id) - c.storeMu.stores[id] = store - } - c.storeMu.Unlock() - return store -} - -func (c *RegionCache) putStore(store *Store) { - c.storeMu.Lock() - c.storeMu.stores[store.storeID] = store - c.storeMu.Unlock() -} - -func (c *RegionCache) clearStores() { - c.storeMu.Lock() - c.storeMu.stores = make(map[uint64]*Store) - c.storeMu.Unlock() -} - -func (c *RegionCache) forEachStore(f func(*Store)) { - c.storeMu.RLock() - defer c.storeMu.RUnlock() - for _, s := range c.storeMu.stores { - f(s) - } -} - -func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) []*Store { - c.storeMu.RLock() - for _, store := range c.storeMu.stores { - if predicate == nil || predicate(store) { - dst = append(dst, store) - } - } - c.storeMu.RUnlock() - return dst -} - -func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bool) { - c.tiflashComputeStoreMu.RLock() - needReload = c.tiflashComputeStoreMu.needReload - stores = c.tiflashComputeStoreMu.stores - c.tiflashComputeStoreMu.RUnlock() - return -} - -func (c *RegionCache) setTiflashComputeStores(stores []*Store) { - c.tiflashComputeStoreMu.Lock() - c.tiflashComputeStoreMu.stores = stores - c.tiflashComputeStoreMu.needReload = false - c.tiflashComputeStoreMu.Unlock() -} - -func (c *RegionCache) markTiflashComputeStoresNeedReload() { - c.tiflashComputeStoreMu.Lock() - c.tiflashComputeStoreMu.needReload = true - c.tiflashComputeStoreMu.Unlock() -} - -func (c *RegionCache) markStoreNeedCheck(store *Store) { - if store.changeResolveStateTo(resolved, needCheck) { - select { - case c.notifyCheckCh <- struct{}{}: - default: - } - } -} - -func (c *RegionCache) getCheckStoreEvents() <-chan struct{} { - return c.notifyCheckCh -} - func (c *RegionCache) onHealthFeedback(feedback *tikvpb.HealthFeedback) { store, ok := c.getStore(feedback.GetStoreId()) if !ok { diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go new file mode 100644 index 000000000..0b33e95c4 --- /dev/null +++ b/internal/locate/store_cache.go @@ -0,0 +1,963 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package locate + +import ( + "context" + "fmt" + "math" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pkg/errors" + "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/config/retry" + "github.com/tikv/client-go/v2/internal/client" + "github.com/tikv/client-go/v2/internal/logutil" + "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util" + uatomic "go.uber.org/atomic" + "go.uber.org/zap" + "golang.org/x/sync/singleflight" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/keepalive" +) + +type testingKnobs interface { + getMockRequestLiveness() livenessFunc + setMockRequestLiveness(f livenessFunc) +} + +func (c *RegionCache) getMockRequestLiveness() livenessFunc { + f := c.testingKnobs.mockRequestLiveness.Load() + if f == nil { + return nil + } + return *f +} + +func (c *RegionCache) setMockRequestLiveness(f livenessFunc) { + c.testingKnobs.mockRequestLiveness.Store(&f) +} + +type storeRegistry interface { + fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) + fetchAllStores(ctx context.Context) ([]*metapb.Store, error) +} + +func (c *RegionCache) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) { + return c.pdClient.GetStore(ctx, id) +} + +func (c *RegionCache) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) { + return c.pdClient.GetAllStores(ctx) +} + +type storeCache interface { + storeRegistry + getStore(id uint64) (store *Store, exists bool) + getStoreOrInsertDefault(id uint64) *Store + putStore(store *Store) + clearStores() + forEachStore(f func(*Store)) + filterStores(dst []*Store, predicate func(*Store) bool) []*Store + listTiflashComputeStores() (stores []*Store, needReload bool) + setTiflashComputeStores(stores []*Store) + markTiflashComputeStoresNeedReload() + markStoreNeedCheck(store *Store) + getCheckStoreEvents() <-chan struct{} +} + +func (c *RegionCache) getStore(id uint64) (store *Store, exists bool) { + c.storeMu.RLock() + store, exists = c.storeMu.stores[id] + c.storeMu.RUnlock() + return +} + +func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { + c.storeMu.Lock() + store, exists := c.storeMu.stores[id] + if !exists { + store = newUninitializedStore(id) + c.storeMu.stores[id] = store + } + c.storeMu.Unlock() + return store +} + +func (c *RegionCache) putStore(store *Store) { + c.storeMu.Lock() + c.storeMu.stores[store.storeID] = store + c.storeMu.Unlock() +} + +func (c *RegionCache) clearStores() { + c.storeMu.Lock() + c.storeMu.stores = make(map[uint64]*Store) + c.storeMu.Unlock() +} + +func (c *RegionCache) forEachStore(f func(*Store)) { + c.storeMu.RLock() + defer c.storeMu.RUnlock() + for _, s := range c.storeMu.stores { + f(s) + } +} + +func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) []*Store { + c.storeMu.RLock() + for _, store := range c.storeMu.stores { + if predicate == nil || predicate(store) { + dst = append(dst, store) + } + } + c.storeMu.RUnlock() + return dst +} + +func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bool) { + c.tiflashComputeStoreMu.RLock() + needReload = c.tiflashComputeStoreMu.needReload + stores = c.tiflashComputeStoreMu.stores + c.tiflashComputeStoreMu.RUnlock() + return +} + +func (c *RegionCache) setTiflashComputeStores(stores []*Store) { + c.tiflashComputeStoreMu.Lock() + c.tiflashComputeStoreMu.stores = stores + c.tiflashComputeStoreMu.needReload = false + c.tiflashComputeStoreMu.Unlock() +} + +func (c *RegionCache) markTiflashComputeStoresNeedReload() { + c.tiflashComputeStoreMu.Lock() + c.tiflashComputeStoreMu.needReload = true + c.tiflashComputeStoreMu.Unlock() +} + +func (c *RegionCache) markStoreNeedCheck(store *Store) { + if store.changeResolveStateTo(resolved, needCheck) { + select { + case c.notifyCheckCh <- struct{}{}: + default: + } + } +} + +func (c *RegionCache) getCheckStoreEvents() <-chan struct{} { + return c.notifyCheckCh +} + +// Store contains a kv process's address. +type Store struct { + addr string // loaded store address + peerAddr string // TiFlash Proxy use peerAddr + saddr string // loaded store status address + storeID uint64 // store's id + state uint64 // unsafe store storeState + labels []*metapb.StoreLabel // stored store labels + resolveMutex sync.Mutex // protect pd from concurrent init requests + epoch uint32 // store fail epoch, see RegionStore.storeEpochs + storeType tikvrpc.EndpointType // type of the store + tokenCount uatomic.Int64 // used store token count + + loadStats uatomic.Pointer[storeLoadStats] + + // whether the store is unreachable due to some reason, therefore requests to the store needs to be + // forwarded by other stores. this is also the flag that a health check loop is running for this store. + // this mechanism is currently only applicable for TiKV stores. + livenessState uint32 + unreachableSince time.Time + + healthStatus *StoreHealthStatus + // A statistic for counting the flows of different replicas on this store + replicaFlowsStats [numReplicaFlowsType]uint64 +} + +func newStore( + id uint64, + addr string, + peerAddr string, + statusAddr string, + storeType tikvrpc.EndpointType, + state resolveState, + labels []*metapb.StoreLabel, +) *Store { + return &Store{ + storeID: id, + storeType: storeType, + state: uint64(state), + labels: labels, + addr: addr, + peerAddr: peerAddr, + saddr: statusAddr, + // Make sure healthStatus field is never null. + healthStatus: newStoreHealthStatus(), + } +} + +// newUninitializedStore creates a `Store` instance with only storeID initialized. +func newUninitializedStore(id uint64) *Store { + return &Store{ + storeID: id, + // Make sure healthStatus field is never null. + healthStatus: newStoreHealthStatus(), + } +} + +// IsTiFlash returns true if the storeType is TiFlash +func (s *Store) IsTiFlash() bool { + return s.storeType == tikvrpc.TiFlash +} + +// StoreID returns storeID. +func (s *Store) StoreID() uint64 { + return s.storeID +} + +// GetAddr returns the address of the store +func (s *Store) GetAddr() string { + return s.addr +} + +// GetPeerAddr returns the peer address of the store +func (s *Store) GetPeerAddr() string { + return s.peerAddr +} + +// IsStoreMatch returns whether the store's id match the target ids. +func (s *Store) IsStoreMatch(stores []uint64) bool { + if len(stores) < 1 { + return true + } + for _, storeID := range stores { + if s.storeID == storeID { + return true + } + } + return false +} + +// GetLabelValue returns the value of the label +func (s *Store) GetLabelValue(key string) (string, bool) { + for _, label := range s.labels { + if label.Key == key { + return label.Value, true + } + } + return "", false +} + +// IsSameLabels returns whether the store have the same labels with target labels +func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool { + if len(s.labels) != len(labels) { + return false + } + return s.IsLabelsMatch(labels) +} + +// IsLabelsMatch return whether the store's labels match the target labels +func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool { + if len(labels) < 1 { + return true + } + for _, targetLabel := range labels { + if !isStoreContainLabel(s.labels, targetLabel.Key, targetLabel.Value) { + return false + } + } + return true +} + +func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (res bool) { + for _, label := range labels { + if label.GetKey() == key && label.GetValue() == val { + res = true + break + } + } + return res +} + +type resolveState uint64 + +const ( + // The store is just created and normally is being resolved. + // Store in this state will only be resolved by initResolve(). + unresolved resolveState = iota + // The store is resolved and its address is valid. + resolved + // Request failed on this store and it will be re-resolved by asyncCheckAndResolveLoop(). + needCheck + // The store's address or label is changed and marked deleted. + // There is a new store struct replaced it in the RegionCache and should + // call changeToActiveStore() to get the new struct. + deleted + // The store is a tombstone. Should invalidate the region if tries to access it. + tombstone +) + +// String implements fmt.Stringer interface. +func (s resolveState) String() string { + switch s { + case unresolved: + return "unresolved" + case resolved: + return "resolved" + case needCheck: + return "needCheck" + case deleted: + return "deleted" + case tombstone: + return "tombstone" + default: + return fmt.Sprintf("unknown-%v", uint64(s)) + } +} + +// initResolve resolves the address of the store that never resolved and returns an +// empty string if it's a tombstone. +func (s *Store) initResolve(bo *retry.Backoffer, c storeCache) (addr string, err error) { + s.resolveMutex.Lock() + state := s.getResolveState() + defer s.resolveMutex.Unlock() + if state != unresolved { + if state != tombstone { + addr = s.addr + } + return + } + var store *metapb.Store + for { + start := time.Now() + store, err = c.fetchStore(bo.GetCtx(), s.storeID) + metrics.LoadRegionCacheHistogramWithGetStore.Observe(time.Since(start).Seconds()) + if err != nil { + metrics.RegionCacheCounterWithGetStoreError.Inc() + } else { + metrics.RegionCacheCounterWithGetStoreOK.Inc() + } + if err := bo.GetCtx().Err(); err != nil && errors.Cause(err) == context.Canceled { + return "", errors.WithStack(err) + } + if err != nil && !isStoreNotFoundError(err) { + // TODO: more refine PD error status handle. + err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err) + if err = bo.Backoff(retry.BoPDRPC, err); err != nil { + return + } + continue + } + // The store is a tombstone. + if store == nil || store.GetState() == metapb.StoreState_Tombstone { + s.setResolveState(tombstone) + return "", nil + } + addr = store.GetAddress() + if addr == "" { + return "", errors.Errorf("empty store(%d) address", s.storeID) + } + s.addr = addr + s.peerAddr = store.GetPeerAddress() + s.saddr = store.GetStatusAddress() + s.storeType = tikvrpc.GetStoreTypeByMeta(store) + s.labels = store.GetLabels() + // Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety. + s.changeResolveStateTo(unresolved, resolved) + return s.addr, nil + } +} + +// reResolve try to resolve addr for store that need check. Returns false if the region is in tombstone state or is +// deleted. +func (s *Store) reResolve(c storeCache) (bool, error) { + var addr string + store, err := c.fetchStore(context.Background(), s.storeID) + if err != nil { + metrics.RegionCacheCounterWithGetStoreError.Inc() + } else { + metrics.RegionCacheCounterWithGetStoreOK.Inc() + } + // `err` here can mean either "load Store from PD failed" or "store not found" + // If load Store from PD is successful but PD didn't find the store + // the err should be handled by next `if` instead of here + if err != nil && !isStoreNotFoundError(err) { + logutil.BgLogger().Error("loadStore from PD failed", zap.Uint64("id", s.storeID), zap.Error(err)) + // we cannot do backoff in reResolve loop but try check other store and wait tick. + return false, err + } + if store == nil || store.GetState() == metapb.StoreState_Tombstone { + // store has be removed in PD, we should invalidate all regions using those store. + logutil.BgLogger().Info("invalidate regions in removed store", + zap.Uint64("store", s.storeID), zap.String("add", s.addr)) + atomic.AddUint32(&s.epoch, 1) + s.setResolveState(tombstone) + metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() + return false, nil + } + + storeType := tikvrpc.GetStoreTypeByMeta(store) + addr = store.GetAddress() + if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { + newStore := newStore( + s.storeID, + addr, + store.GetPeerAddress(), + store.GetStatusAddress(), + storeType, + resolved, + store.GetLabels(), + ) + newStore.livenessState = atomic.LoadUint32(&s.livenessState) + newStore.unreachableSince = s.unreachableSince + if s.addr == addr { + newStore.healthStatus = s.healthStatus + } + c.putStore(newStore) + s.setResolveState(deleted) + return false, nil + } + s.changeResolveStateTo(needCheck, resolved) + return true, nil +} + +// A quick and dirty solution to find out whether an err is caused by StoreNotFound. +// todo: A better solution, maybe some err-code based error handling? +func isStoreNotFoundError(err error) bool { + return strings.Contains(err.Error(), "invalid store ID") && strings.Contains(err.Error(), "not found") +} + +func (s *Store) getResolveState() resolveState { + if s == nil { + var state resolveState + return state + } + return resolveState(atomic.LoadUint64(&s.state)) +} + +func (s *Store) setResolveState(state resolveState) { + atomic.StoreUint64(&s.state, uint64(state)) +} + +// changeResolveStateTo changes the store resolveState from the old state to the new state. +// Returns true if it changes the state successfully, and false if the store's state +// is changed by another one. +func (s *Store) changeResolveStateTo(from, to resolveState) bool { + for { + state := s.getResolveState() + if state == to { + return true + } + if state != from { + return false + } + if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) { + logutil.BgLogger().Info("change store resolve state", + zap.Uint64("store", s.storeID), + zap.String("addr", s.addr), + zap.String("from", from.String()), + zap.String("to", to.String()), + zap.String("liveness-state", s.getLivenessState().String())) + return true + } + } +} + +type livenessFunc func(ctx context.Context, s *Store) livenessState + +type livenessState uint32 + +func (l livenessState) injectConstantLiveness(tk testingKnobs) { + tk.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { return l }) +} + +var ( + livenessSf singleflight.Group + // storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance. + storeLivenessTimeout = time.Second +) + +// SetStoreLivenessTimeout sets storeLivenessTimeout to t. +func SetStoreLivenessTimeout(t time.Duration) { + storeLivenessTimeout = t +} + +// GetStoreLivenessTimeout returns storeLivenessTimeout. +func GetStoreLivenessTimeout() time.Duration { + return storeLivenessTimeout +} + +const ( + reachable livenessState = iota + unreachable + unknown +) + +// String implements fmt.Stringer interface. +func (s livenessState) String() string { + switch s { + case unreachable: + return "unreachable" + case reachable: + return "reachable" + case unknown: + return "unknown" + default: + return fmt.Sprintf("unknown-%v", uint32(s)) + } +} + +// getLivenessState gets the cached liveness state of the store. +// When it's not reachable, a goroutine will update the state in background. +// To get the accurate liveness state, use checkLiveness instead. +func (s *Store) getLivenessState() livenessState { + return livenessState(atomic.LoadUint32(&s.livenessState)) +} + +func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, c *RegionCache) (liveness livenessState) { + liveness = requestLiveness(bo.GetCtx(), s, c) + if liveness == reachable { + return + } + // This mechanism doesn't support non-TiKV stores currently. + if s.storeType != tikvrpc.TiKV { + logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store", + zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr)) + return + } + + // It may be already started by another thread. + if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { + s.unreachableSince = time.Now() + reResolveInterval := 30 * time.Second + if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { + if dur, err := time.ParseDuration(val.(string)); err == nil { + reResolveInterval = dur + } + } + if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil { + return + } + startHealthCheckLoop(c, s, liveness, reResolveInterval) + } + return +} + +func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reResolveInterval time.Duration) { + lastCheckPDTime := time.Now() + + c.bg.schedule(func(ctx context.Context, t time.Time) bool { + if t.Sub(lastCheckPDTime) > reResolveInterval { + lastCheckPDTime = t + + valid, err := s.reResolve(c) + if err != nil { + logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) + } else if !valid { + if s.getResolveState() != deleted { + logutil.BgLogger().Warn("[health check] store was still unhealthy at the end of health check loop", + zap.Uint64("storeID", s.storeID), + zap.String("state", s.getResolveState().String()), + zap.String("liveness", s.getLivenessState().String())) + return true + } + // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). + newStore, _ := c.getStore(s.storeID) + logutil.BgLogger().Info("[health check] store meta changed", + zap.Uint64("storeID", s.storeID), + zap.String("oldAddr", s.addr), + zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), + zap.String("newAddr", newStore.addr), + zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) + s = newStore + } + } + + liveness = requestLiveness(ctx, s, c) + atomic.StoreUint32(&s.livenessState, uint32(liveness)) + if liveness == reachable { + logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) + return true + } + return false + }, time.Second) +} + +func requestLiveness(ctx context.Context, s *Store, tk testingKnobs) (l livenessState) { + // It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. + if val, err := util.EvalFailpoint("injectLiveness"); err == nil { + liveness := val.(string) + if strings.Contains(liveness, " ") { + for _, item := range strings.Split(liveness, " ") { + kv := strings.Split(item, ":") + if len(kv) != 2 { + continue + } + if kv[0] == s.addr { + liveness = kv[1] + break + } + } + } + switch liveness { + case "unreachable": + return unreachable + case "reachable": + return reachable + case "unknown": + return unknown + } + } + + if tk != nil { + livenessFunc := tk.getMockRequestLiveness() + if livenessFunc != nil { + return livenessFunc(ctx, s) + } + } + + if storeLivenessTimeout == 0 { + return unreachable + } + + if s.getResolveState() != resolved { + l = unknown + return + } + addr := s.addr + rsCh := livenessSf.DoChan(addr, func() (interface{}, error) { + return invokeKVStatusAPI(addr, storeLivenessTimeout), nil + }) + select { + case rs := <-rsCh: + l = rs.Val.(livenessState) + case <-ctx.Done(): + l = unknown + return + } + return +} + +func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) { + start := time.Now() + defer func() { + if l == reachable { + metrics.StatusCountWithOK.Inc() + } else { + metrics.StatusCountWithError.Inc() + } + metrics.TiKVStatusDuration.WithLabelValues(addr).Observe(time.Since(start).Seconds()) + }() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + conn, cli, err := createKVHealthClient(ctx, addr) + if err != nil { + logutil.BgLogger().Info("[health check] create grpc connection failed", zap.String("store", addr), zap.Error(err)) + l = unreachable + return + } + defer func() { + err := conn.Close() + if err != nil { + logutil.BgLogger().Info("[health check] failed to close the grpc connection for health check", zap.String("store", addr), zap.Error(err)) + } + }() + + req := &healthpb.HealthCheckRequest{} + resp, err := cli.Check(ctx, req) + if err != nil { + logutil.BgLogger().Info("[health check] check health error", zap.String("store", addr), zap.Error(err)) + l = unreachable + return + } + + status := resp.GetStatus() + if status == healthpb.HealthCheckResponse_UNKNOWN || status == healthpb.HealthCheckResponse_SERVICE_UNKNOWN { + logutil.BgLogger().Info("[health check] check health returns unknown", zap.String("store", addr)) + l = unknown + return + } + + if status != healthpb.HealthCheckResponse_SERVING { + logutil.BgLogger().Info("[health check] service not serving", zap.Stringer("status", status)) + l = unreachable + return + } + + l = reachable + return +} + +func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) { + // Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to + // access it. + // TODO: Pass the config in a better way, or use the connArray inner the client directly rather than creating new + // connection. + + cfg := config.GetGlobalConfig() + + opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + if len(cfg.Security.ClusterSSLCA) != 0 { + tlsConfig, err := cfg.Security.ToTLSConfig() + if err != nil { + return nil, nil, errors.WithStack(err) + } + opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + } + keepAlive := cfg.TiKVClient.GrpcKeepAliveTime + keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout + conn, err := grpc.DialContext( + ctx, + addr, + opt, + grpc.WithInitialWindowSize(client.GrpcInitialWindowSize), + grpc.WithInitialConnWindowSize(client.GrpcInitialConnWindowSize), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 100 * time.Millisecond, // Default was 1s. + Multiplier: 1.6, // Default + Jitter: 0.2, // Default + MaxDelay: 3 * time.Second, // Default was 120s. + }, + MinConnectTimeout: 5 * time.Second, + }), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(keepAlive) * time.Second, + Timeout: time.Duration(keepAliveTimeout) * time.Second, + }), + ) + if err != nil { + return nil, nil, errors.WithStack(err) + } + cli := healthpb.NewHealthClient(conn) + return conn, cli, nil +} + +type storeLoadStats struct { + estimatedWait time.Duration + waitTimeUpdatedAt time.Time +} + +// EstimatedWaitTime returns an optimistic estimation of how long a request will wait in the store. +// It's calculated by subtracting the time since the last update from the wait time returned from TiKV. +func (s *Store) EstimatedWaitTime() time.Duration { + loadStats := s.loadStats.Load() + if loadStats == nil { + return 0 + } + timeSinceUpdated := time.Since(loadStats.waitTimeUpdatedAt) + if loadStats.estimatedWait < timeSinceUpdated { + return 0 + } + return loadStats.estimatedWait - timeSinceUpdated +} + +const ( + tikvSlowScoreDecayRate float64 = 20.0 / 60.0 // s^(-1), linear decaying + tikvSlowScoreSlowThreshold int64 = 80 + + tikvSlowScoreUpdateInterval = time.Millisecond * 100 + tikvSlowScoreUpdateFromPDInterval = time.Minute +) + +type StoreHealthStatus struct { + isSlow atomic.Bool + + // A statistic for counting the request latency to this store + clientSideSlowScore SlowScoreStat + + tikvSideSlowScore struct { + sync.Mutex + + // The following atomic fields is designed to be able to be read by atomic options directly, but only written + // while holding the mutex. + + hasTiKVFeedback atomic.Bool + score atomic.Int64 + lastUpdateTime atomic.Pointer[time.Time] + } +} + +type HealthStatusDetail struct { + ClientSideSlowScore int64 + TiKVSideSlowScore int64 +} + +func newStoreHealthStatus() *StoreHealthStatus { + return &StoreHealthStatus{} +} + +// IsSlow returns whether current Store is slow. +func (s *StoreHealthStatus) IsSlow() bool { + return s.isSlow.Load() +} + +// GetHealthStatusDetail gets the current detailed information about the store's health status. +func (s *StoreHealthStatus) GetHealthStatusDetail() HealthStatusDetail { + return HealthStatusDetail{ + ClientSideSlowScore: int64(s.clientSideSlowScore.getSlowScore()), + TiKVSideSlowScore: s.tikvSideSlowScore.score.Load(), + } +} + +// tick updates the health status that changes over time, such as slow score's decaying, etc. This function is expected +// to be called periodically. +func (s *StoreHealthStatus) tick(now time.Time) { + s.clientSideSlowScore.updateSlowScore() + s.updateTiKVServerSideSlowScoreOnTick(now) + s.updateSlowFlag() +} + +// recordClientSideSlowScoreStat records timecost of each request to update the client side slow score. +func (s *StoreHealthStatus) recordClientSideSlowScoreStat(timecost time.Duration) { + s.clientSideSlowScore.recordSlowScoreStat(timecost) + s.updateSlowFlag() +} + +// markAlreadySlow marks the related store already slow. +func (s *StoreHealthStatus) markAlreadySlow() { + s.clientSideSlowScore.markAlreadySlow() + s.updateSlowFlag() +} + +// updateTiKVServerSideSlowScoreOnTick updates the slow score actively, which is expected to be a periodic job. +// It skips updating if the last update time didn't elapse long enough, or it's being updated concurrently. +func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) { + if !s.tikvSideSlowScore.hasTiKVFeedback.Load() { + // Do nothing if no feedback has been received from this store yet. + return + } + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() + if lastUpdateTime == nil || now.Sub(*lastUpdateTime) < tikvSlowScoreUpdateFromPDInterval { + // If the first feedback is + return + } + + if !s.tikvSideSlowScore.TryLock() { + // It must be being updated concurrently. + return + } + defer s.tikvSideSlowScore.Unlock() + + // Reload update time as it might be updated concurrently before acquiring mutex + lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() + elapsed := now.Sub(*lastUpdateTime) + if elapsed < tikvSlowScoreUpdateFromPDInterval { + return + } + + // TODO: Try to get store status from PD here. But it's not mandatory. + // Don't forget to update tests if getting slow score from PD is implemented here. + + // If updating from PD is not successful: decay the slow score. + score := s.tikvSideSlowScore.score.Load() + if score < 1 { + return + } + // Linear decay by time + score = max(int64(math.Round(float64(score)-tikvSlowScoreDecayRate*elapsed.Seconds())), 1) + s.tikvSideSlowScore.score.Store(score) + + newUpdateTime := new(time.Time) + *newUpdateTime = now + s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) +} + +// updateTiKVServerSideSlowScore updates the tikv side slow score with the given value. +// Ignores if the last update time didn't elapse long enough, or it's being updated concurrently. +func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime time.Time) { + defer s.updateSlowFlag() + + lastScore := s.tikvSideSlowScore.score.Load() + + if lastScore == score { + return + } + + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() + + if lastUpdateTime != nil && currTime.Sub(*lastUpdateTime) < tikvSlowScoreUpdateInterval { + return + } + + if !s.tikvSideSlowScore.TryLock() { + // It must be being updated concurrently. Skip. + return + } + defer s.tikvSideSlowScore.Unlock() + + s.tikvSideSlowScore.hasTiKVFeedback.Store(true) + // Reload update time as it might be updated concurrently before acquiring mutex + lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() + if lastUpdateTime != nil && currTime.Sub(*lastUpdateTime) < tikvSlowScoreUpdateInterval { + return + } + + newScore := score + + newUpdateTime := new(time.Time) + *newUpdateTime = currTime + + s.tikvSideSlowScore.score.Store(newScore) + s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) +} + +func (s *StoreHealthStatus) updateSlowFlag() { + isSlow := s.clientSideSlowScore.isSlow() || s.tikvSideSlowScore.score.Load() >= tikvSlowScoreSlowThreshold + s.isSlow.Store(isSlow) +} + +// setTiKVSlowScoreLastUpdateTimeForTest force sets last update time of TiKV server side slow score to specified value. +// For test purpose only. +func (s *StoreHealthStatus) setTiKVSlowScoreLastUpdateTimeForTest(lastUpdateTime time.Time) { + s.tikvSideSlowScore.Lock() + defer s.tikvSideSlowScore.Unlock() + s.tikvSideSlowScore.lastUpdateTime.Store(&lastUpdateTime) +} + +func (s *Store) recordHealthFeedback(feedback *tikvpb.HealthFeedback) { + // Note that the `FeedbackSeqNo` field of `HealthFeedback` is not used yet. It's a monotonic value that can help + // to drop out-of-order feedback messages. But it's not checked for now since it's not very necessary to receive + // only a slow score. It's prepared for possible use in the future. + s.healthStatus.updateTiKVServerSideSlowScore(int64(feedback.GetSlowScore()), time.Now()) +} + +// getReplicaFlowsStats returns the statistics on the related replicaFlowsType. +func (s *Store) getReplicaFlowsStats(destType replicaFlowsType) uint64 { + return atomic.LoadUint64(&s.replicaFlowsStats[destType]) +} + +// resetReplicaFlowsStats resets the statistics on the related replicaFlowsType. +func (s *Store) resetReplicaFlowsStats(destType replicaFlowsType) { + atomic.StoreUint64(&s.replicaFlowsStats[destType], 0) +} + +// recordReplicaFlowsStats records the statistics on the related replicaFlowsType. +func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { + atomic.AddUint64(&s.replicaFlowsStats[destType], 1) +} From 8b78d1c71f55c7a1d6d3b745e73f5bd6be84f6be Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 1 Mar 2024 17:14:49 +0800 Subject: [PATCH 2/2] use std atomic instead of uber's atomic Signed-off-by: zyguan --- internal/locate/region_cache.go | 3 +-- internal/locate/region_request.go | 2 +- internal/locate/store_cache.go | 5 ++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index f269104ae..ca9c95779 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -69,7 +69,6 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" - uatomic "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -214,7 +213,7 @@ type regionStore struct { // accessIndex[tiKVOnly][proxyTiKVIdx] is the index of TiKV that can forward requests to the leader in stores, -1 means not using proxy. proxyTiKVIdx AccessIndex // accessIndex[tiFlashOnly][workTiFlashIdx] is the index of the current working TiFlash in stores. - workTiFlashIdx uatomic.Int32 + workTiFlashIdx atomic.Int32 // buckets is not accurate and it can change even if the region is not changed. // It can be stale and buckets keys can be out of the region range. buckets *metapb.Buckets diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8cc7b0baa..21e68b4b8 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1844,7 +1844,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { count := st.tokenCount.Load() // Decreasing tokenCount is no thread safe, preferring this for avoiding check in loop. if count > 0 { - st.tokenCount.Sub(1) + st.tokenCount.Add(-1) return } logutil.BgLogger().Warn("release store token failed, count equals to 0") diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 0b33e95c4..c0b8cc288 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" - uatomic "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/singleflight" "google.golang.org/grpc" @@ -183,9 +182,9 @@ type Store struct { resolveMutex sync.Mutex // protect pd from concurrent init requests epoch uint32 // store fail epoch, see RegionStore.storeEpochs storeType tikvrpc.EndpointType // type of the store - tokenCount uatomic.Int64 // used store token count + tokenCount atomic.Int64 // used store token count - loadStats uatomic.Pointer[storeLoadStats] + loadStats atomic.Pointer[storeLoadStats] // whether the store is unreachable due to some reason, therefore requests to the store needs to be // forwarded by other stores. this is also the flag that a health check loop is running for this store.