diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 0c8eaeac828..1834e509696 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -491,7 +491,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex request := gc.collectRequestAndConsumption(typ) if request != nil { c.run.currentRequests = append(c.run.currentRequests, request) - gc.tokenRequestCounter.Inc() + gc.metrics.tokenRequestCounter.Inc() } return true }) @@ -576,13 +576,9 @@ type groupCostController struct { calculators []ResourceCalculator handleRespFunc func(*rmpb.TokenBucketResponse) - successfulRequestDuration prometheus.Observer - failedLimitReserveDuration prometheus.Observer - requestRetryCounter prometheus.Counter - failedRequestCounter prometheus.Counter - tokenRequestCounter prometheus.Counter - - mu struct { + // metrics + metrics *groupMetricsCollection + mu struct { sync.Mutex consumption *rmpb.Consumption storeCounter map[uint64]*rmpb.Consumption @@ -629,6 +625,30 @@ type groupCostController struct { tombstone bool } +type groupMetricsCollection struct { + successfulRequestDuration prometheus.Observer + failedLimitReserveDuration prometheus.Observer + requestRetryCounter prometheus.Counter + failedRequestCounterWithOthers prometheus.Counter + failedRequestCounterWithThrottled prometheus.Counter + tokenRequestCounter prometheus.Counter +} + +func initMetrics(name string) *groupMetricsCollection { + const ( + otherType = "others" + throttledType = "throttled" + ) + return &groupMetricsCollection{ + successfulRequestDuration: successfulRequestDuration.WithLabelValues(name), + failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(name), + failedRequestCounterWithOthers: failedRequestCounter.WithLabelValues(name, otherType), + failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(name, throttledType), + requestRetryCounter: requestRetryCounter.WithLabelValues(name), + tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(name), + } +} + type tokenCounter struct { getTokenBucketFunc func() *rmpb.TokenBucket @@ -669,16 +689,13 @@ func newGroupCostController( default: return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } + ms := initMetrics(group.Name) gc := &groupCostController{ - meta: group, - name: group.Name, - mainCfg: mainCfg, - mode: group.GetMode(), - successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name), - failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name), - failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name), - requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name), - tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name), + meta: group, + name: group.Name, + mainCfg: mainCfg, + mode: group.GetMode(), + metrics: ms, calculators: []ResourceCalculator{ newKVCalculator(mainCfg), newSQLCalculator(mainCfg), @@ -733,7 +750,7 @@ func (gc *groupCostController) initRunState() { case rmpb.GroupMode_RUMode: gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter) for typ := range requestUnitLimitTypeList { - limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) + limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) counter := &tokenCounter{ limiter: limiter, avgRUPerSec: 0, @@ -747,7 +764,7 @@ func (gc *groupCostController) initRunState() { case rmpb.GroupMode_RawMode: gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter) for typ := range requestResourceLimitTypeList { - limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) + limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) counter := &tokenCounter{ limiter: limiter, avgRUPerSec: 0, @@ -1176,7 +1193,7 @@ func (gc *groupCostController) onRequestWait( res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } - if d, err = WaitReservations(ctx, now, res); err == nil { + if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { break retryLoop } case rmpb.GroupMode_RUMode: @@ -1186,17 +1203,19 @@ func (gc *groupCostController) onRequestWait( res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } - if d, err = WaitReservations(ctx, now, res); err == nil { + if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { break retryLoop } } - gc.requestRetryCounter.Inc() + gc.metrics.requestRetryCounter.Inc() time.Sleep(retryInterval) } if err != nil { - gc.failedRequestCounter.Inc() - if d.Seconds() > 0 { - gc.failedLimitReserveDuration.Observe(d.Seconds()) + if errs.ErrClientResourceGroupThrottled.Equal(err) { + gc.metrics.failedRequestCounterWithThrottled.Inc() + gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) + } else { + gc.metrics.failedRequestCounterWithOthers.Inc() } gc.mu.Lock() sub(gc.mu.consumption, delta) @@ -1206,7 +1225,7 @@ func (gc *groupCostController) onRequestWait( }) return nil, nil, err } - gc.successfulRequestDuration.Observe(d.Seconds()) + gc.metrics.successfulRequestDuration.Observe(d.Seconds()) } gc.mu.Lock() diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 165d501ddb1..3696dcba845 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -26,6 +26,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/stretchr/testify/require" + "github.com/tikv/pd/client/errs" ) func createTestGroupCostController(re *require.Assertions) *groupCostController { @@ -112,3 +113,17 @@ func TestRequestAndResponseConsumption(t *testing.T) { re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum) } } + +func TestResourceGroupThrottledError(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + gc.initRunState() + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 10000000, + } + // The group is throttled + _, _, err := gc.onRequestWait(context.TODO(), req) + re.Error(err) + re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) +} diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index f509acbc991..bb1bc18dbfc 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -26,6 +26,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" "go.uber.org/zap" ) @@ -81,6 +82,15 @@ type Limiter struct { isLowProcess bool // remainingNotifyTimes is used to limit notify when the speed limit is already set. remainingNotifyTimes int + name string + + // metrics + metrics *limiterMetricsCollection +} + +// limiterMetricsCollection is a collection of metrics for a limiter. +type limiterMetricsCollection struct { + lowTokenNotifyCounter prometheus.Counter } // Limit returns the maximum overall event rate. @@ -106,8 +116,9 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify // NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter { lim := &Limiter{ + name: name, limit: Limit(cfg.NewRate), last: now, tokens: cfg.NewTokens, @@ -115,6 +126,9 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN notifyThreshold: cfg.NotifyThreshold, lowTokensNotifyChan: lowTokensNotifyChan, } + lim.metrics = &limiterMetricsCollection{ + lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name), + } log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim))) return lim } @@ -223,6 +237,14 @@ func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) lim.notifyThreshold = threshold } +// SetName sets the name of the limiter. +func (lim *Limiter) SetName(name string) *Limiter { + lim.mu.Lock() + defer lim.mu.Unlock() + lim.name = name + return lim +} + // notify tries to send a non-blocking notification on notifyCh and disables // further notifications (until the next Reconfigure or StartNotification). func (lim *Limiter) notify() { @@ -233,6 +255,9 @@ func (lim *Limiter) notify() { lim.isLowProcess = true select { case lim.lowTokensNotifyChan <- struct{}{}: + if lim.metrics != nil { + lim.metrics.lowTokenNotifyCounter.Inc() + } default: } } diff --git a/client/resource_group/controller/metrics.go b/client/resource_group/controller/metrics.go index 7e6a559265b..a340cb93a0d 100644 --- a/client/resource_group/controller/metrics.go +++ b/client/resource_group/controller/metrics.go @@ -21,7 +21,9 @@ const ( requestSubsystem = "request" tokenRequestSubsystem = "token_request" - resourceGroupNameLabel = "name" + resourceGroupNameLabel = "name" + newResourceGroupNameLabel = "resource_group" + errType = "type" ) var ( @@ -38,7 +40,7 @@ var ( Namespace: namespace, Subsystem: requestSubsystem, Name: "success", - Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30 + Buckets: []float64{0.0005, .005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600}, // 0.0005 ~ 1h Help: "Bucketed histogram of wait duration of successful request.", }, []string{resourceGroupNameLabel}) @@ -47,7 +49,7 @@ var ( Namespace: namespace, Subsystem: requestSubsystem, Name: "limit_reserve_time_failed", - Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30 + Buckets: []float64{0.0005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600, 86400}, // 0.0005 ~ 24h Help: "Bucketed histogram of wait duration of failed request.", }, []string{resourceGroupNameLabel}) @@ -57,7 +59,7 @@ var ( Subsystem: requestSubsystem, Name: "fail", Help: "Counter of failed request.", - }, []string{resourceGroupNameLabel}) + }, []string{resourceGroupNameLabel, errType}) requestRetryCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -71,6 +73,7 @@ var ( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: tokenRequestSubsystem, + Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 8s Name: "duration", Help: "Bucketed histogram of latency(s) of token request.", }, []string{"type"}) @@ -82,6 +85,14 @@ var ( Name: "resource_group", Help: "Counter of token request by every resource group.", }, []string{resourceGroupNameLabel}) + + lowTokenRequestNotifyCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: tokenRequestSubsystem, + Name: "low_token_notified", + Help: "Counter of low token request.", + }, []string{newResourceGroupNameLabel}) ) var ( @@ -98,4 +109,5 @@ func init() { prometheus.MustRegister(requestRetryCounter) prometheus.MustRegister(tokenRequestDuration) prometheus.MustRegister(resourceGroupTokenRequestCounter) + prometheus.MustRegister(lowTokenRequestNotifyCounter) }