Skip to content

Commit

Permalink
*: add API concurrency metrics (#7541)
Browse files Browse the repository at this point in the history
ref #7167

Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB authored Dec 28, 2023
1 parent cee6e63 commit b7b4537
Show file tree
Hide file tree
Showing 13 changed files with 693 additions and 63 deletions.
202 changes: 199 additions & 3 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -8781,7 +8781,7 @@
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{grpc_method}}",
"refId": "A",
"refId": "B",
"step": 4
}
],
Expand Down Expand Up @@ -8826,6 +8826,104 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The concurrency number of each kind of gRPC commands",
"editable": true,
"error": false,
"fill": 0,
"grid": {},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 127
},
"id": 903,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sideWidth": 300,
"sortDesc": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null as zero",
"paceLength": 10,
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "pd_server_api_concurrency{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", kind=\"grpc\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{api}}",
"refId": "E",
"step": 20
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "gRPC commands concurrency number",
"tooltip": {
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "cumulative"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 10,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down Expand Up @@ -8858,7 +8956,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 119
"y": 135
},
"id": 1001,
"legend": {
Expand Down Expand Up @@ -8954,7 +9052,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 119
"y": 135
},
"id": 1002,
"legend": {
Expand Down Expand Up @@ -9036,6 +9134,104 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The concurrency number of each kind of HTTP commands",
"editable": true,
"error": false,
"fill": 0,
"grid": {},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 143
},
"id": 1003,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sideWidth": 300,
"sortDesc": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null as zero",
"paceLength": 10,
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "pd_server_api_concurrency{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", kind=\"http\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{api}}",
"refId": "E",
"step": 20
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "HTTP commands concurrency number",
"tooltip": {
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "cumulative"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 10,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down
20 changes: 19 additions & 1 deletion pkg/ratelimit/concurrency_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@ type concurrencyLimiter struct {
mu syncutil.RWMutex
current uint64
limit uint64

// statistic
maxLimit uint64
}

func newConcurrencyLimiter(limit uint64) *concurrencyLimiter {
return &concurrencyLimiter{limit: limit}
}

const unlimit = uint64(0)

func (l *concurrencyLimiter) allow() bool {
l.mu.Lock()
defer l.mu.Unlock()

if l.current+1 <= l.limit {
if l.limit == unlimit || l.current+1 <= l.limit {
l.current++
if l.current > l.maxLimit {
l.maxLimit = l.current
}
return true
}
return false
Expand Down Expand Up @@ -66,3 +74,13 @@ func (l *concurrencyLimiter) getCurrent() uint64 {

return l.current
}

func (l *concurrencyLimiter) getMaxConcurrency() uint64 {
l.mu.Lock()
defer func() {
l.maxLimit = l.current
l.mu.Unlock()
}()

return l.maxLimit
}
15 changes: 15 additions & 0 deletions pkg/ratelimit/concurrency_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,24 @@ func TestConcurrencyLimiter(t *testing.T) {
cl.release()
re.True(cl.allow())
re.Equal(uint64(10), cl.getLimit())
re.Equal(uint64(10), cl.getMaxConcurrency())
re.Equal(uint64(10), cl.getMaxConcurrency())
cl.setLimit(5)
re.Equal(uint64(5), cl.getLimit())
re.Equal(uint64(10), cl.getCurrent())
cl.release()
re.Equal(uint64(9), cl.getCurrent())
for i := 0; i < 9; i++ {
cl.release()
}
re.Equal(uint64(10), cl.getMaxConcurrency())
for i := 0; i < 5; i++ {
re.True(cl.allow())
}
re.Equal(uint64(5), cl.getCurrent())
for i := 0; i < 5; i++ {
cl.release()
}
re.Equal(uint64(5), cl.getMaxConcurrency())
re.Equal(uint64(0), cl.getMaxConcurrency())
}
51 changes: 48 additions & 3 deletions pkg/ratelimit/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,69 @@
package ratelimit

import (
"context"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

const limiterMetricsInterval = time.Second * 15

var emptyFunc = func() {}

// Controller is a controller which holds multiple limiters to manage the request rate of different objects.
type Controller struct {
limiters sync.Map
// the label which is in labelAllowList won't be limited, and only inited by hard code.
labelAllowList map[string]struct{}

ctx context.Context
cancel context.CancelFunc
apiType string
concurrencyGauge *prometheus.GaugeVec
}

// NewController returns a global limiter which can be updated in the later.
func NewController() *Controller {
return &Controller{
labelAllowList: make(map[string]struct{}),
func NewController(ctx context.Context, typ string, concurrencyGauge *prometheus.GaugeVec) *Controller {
ctx, cancel := context.WithCancel(ctx)
l := &Controller{
ctx: ctx,
cancel: cancel,
labelAllowList: make(map[string]struct{}),
apiType: typ,
concurrencyGauge: concurrencyGauge,
}
if concurrencyGauge != nil {
go l.collectMetrics()
}
return l
}

// Close closes the Controller.
func (l *Controller) Close() {
l.cancel()
}

func (l *Controller) collectMetrics() {
tricker := time.NewTicker(limiterMetricsInterval)
defer tricker.Stop()
for {
select {
case <-l.ctx.Done():
return
case <-tricker.C:
l.limiters.Range(func(key, value any) bool {
limiter := value.(*limiter)
label := key.(string)
// Due to not in hot path, no need to save sub Gauge.
if con := limiter.getConcurrencyLimiter(); con != nil {
l.concurrencyGauge.WithLabelValues(l.apiType, label).Set(float64(con.getMaxConcurrency()))
}
return true
})
}
}
}

Expand Down
Loading

0 comments on commit b7b4537

Please sign in to comment.