From 12d952957295859ba01cd724e4ca7099f8d3db2a Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 23 Oct 2023 19:29:53 +0800 Subject: [PATCH] address comment Signed-off-by: Cabinfever_B --- pkg/ratelimit/controller.go | 79 ++++++ pkg/ratelimit/controller_test.go | 426 +++++++++++++++++++++++++++++++ 2 files changed, 505 insertions(+) create mode 100644 pkg/ratelimit/controller.go create mode 100644 pkg/ratelimit/controller_test.go diff --git a/pkg/ratelimit/controller.go b/pkg/ratelimit/controller.go new file mode 100644 index 00000000000..a9cc6bdd757 --- /dev/null +++ b/pkg/ratelimit/controller.go @@ -0,0 +1,79 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "sync" + + "golang.org/x/time/rate" +) + +var emptyFunc = func() {} + +// Controller is a controller which holds multiple limiters to manage the request rate of different objects. +type Controller struct { + limiters sync.Map + // the label which is in labelAllowList won't be limited, and only inited by hard code. + labelAllowList map[string]struct{} +} + +// NewController returns a global limiter which can be updated in the later. +func NewController() *Controller { + return &Controller{ + labelAllowList: make(map[string]struct{}), + } +} + +// Allow is used to check whether it has enough token. +func (l *Controller) Allow(label string) (DoneFunc, error) { + var ok bool + lim, ok := l.limiters.Load(label) + if ok { + return lim.(*limiter).allow() + } + return emptyFunc, nil +} + +// Update is used to update Ratelimiter with Options +func (l *Controller) Update(label string, opts ...Option) UpdateStatus { + var status UpdateStatus + for _, opt := range opts { + status |= opt(label, l) + } + return status +} + +// GetQPSLimiterStatus returns the status of a given label's QPS limiter. +func (l *Controller) GetQPSLimiterStatus(label string) (limit rate.Limit, burst int) { + if limit, exist := l.limiters.Load(label); exist { + return limit.(*limiter).getQPSLimiterStatus() + } + return 0, 0 +} + +// GetConcurrencyLimiterStatus returns the status of a given label's concurrency limiter. +func (l *Controller) GetConcurrencyLimiterStatus(label string) (limit uint64, current uint64) { + if limit, exist := l.limiters.Load(label); exist { + return limit.(*limiter).getConcurrencyLimiterStatus() + } + return 0, 0 +} + +// IsInAllowList returns whether this label is in allow list. +// If returns true, the given label won't be limited +func (l *Controller) IsInAllowList(label string) bool { + _, allow := l.labelAllowList[label] + return allow +} diff --git a/pkg/ratelimit/controller_test.go b/pkg/ratelimit/controller_test.go new file mode 100644 index 00000000000..217e3416f89 --- /dev/null +++ b/pkg/ratelimit/controller_test.go @@ -0,0 +1,426 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/utils/syncutil" + "golang.org/x/time/rate" +) + +type changeAndResult struct { + opt Option + checkOptionStatus func(string, Option) + totalRequest int + success int + fail int + release int + waitDuration time.Duration + checkStatusFunc func(string) +} + +type labelCase struct { + label string + round []changeAndResult +} + +func runMulitLabelLimiter(t *testing.T, limiter *Controller, testCase []labelCase) { + re := require.New(t) + var caseWG sync.WaitGroup + for _, tempCas := range testCase { + caseWG.Add(1) + cas := tempCas + go func() { + var lock syncutil.Mutex + successCount, failedCount := 0, 0 + var wg sync.WaitGroup + r := &releaseUtil{} + for _, rd := range cas.round { + rd.checkOptionStatus(cas.label, rd.opt) + time.Sleep(rd.waitDuration) + for i := 0; i < rd.totalRequest; i++ { + wg.Add(1) + go func() { + countRateLimiterHandleResult(limiter, cas.label, &successCount, &failedCount, &lock, &wg, r) + }() + } + wg.Wait() + re.Equal(rd.fail, failedCount) + re.Equal(rd.success, successCount) + for i := 0; i < rd.release; i++ { + r.release() + } + rd.checkStatusFunc(cas.label) + failedCount -= rd.fail + successCount -= rd.success + } + caseWG.Done() + }() + } + caseWG.Wait() +} + +func TestControllerWithConcurrencyLimiter(t *testing.T) { + t.Parallel() + re := require.New(t) + limiter := NewController() + testCase := []labelCase{ + { + label: "test1", + round: []changeAndResult{ + { + opt: UpdateConcurrencyLimiter(10), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&ConcurrencyChanged != 0) + }, + totalRequest: 15, + fail: 5, + success: 10, + release: 10, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, current := limiter.GetConcurrencyLimiterStatus(label) + re.Equal(uint64(10), limit) + re.Equal(uint64(0), current) + }, + }, + { + opt: UpdateConcurrencyLimiter(10), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&ConcurrencyNoChange != 0) + }, + checkStatusFunc: func(label string) {}, + }, + { + opt: UpdateConcurrencyLimiter(5), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&ConcurrencyChanged != 0) + }, + totalRequest: 15, + fail: 10, + success: 5, + release: 5, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, current := limiter.GetConcurrencyLimiterStatus(label) + re.Equal(uint64(5), limit) + re.Equal(uint64(0), current) + }, + }, + { + opt: UpdateConcurrencyLimiter(0), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&ConcurrencyDeleted != 0) + }, + totalRequest: 15, + fail: 0, + success: 15, + release: 5, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, current := limiter.GetConcurrencyLimiterStatus(label) + re.Equal(uint64(0), limit) + re.Equal(uint64(0), current) + }, + }, + }, + }, + { + label: "test2", + round: []changeAndResult{ + { + opt: UpdateConcurrencyLimiter(15), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&ConcurrencyChanged != 0) + }, + totalRequest: 10, + fail: 0, + success: 10, + release: 0, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, current := limiter.GetConcurrencyLimiterStatus(label) + re.Equal(uint64(15), limit) + re.Equal(uint64(10), current) + }, + }, + { + opt: UpdateConcurrencyLimiter(10), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&ConcurrencyChanged != 0) + }, + totalRequest: 10, + fail: 10, + success: 0, + release: 10, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, current := limiter.GetConcurrencyLimiterStatus(label) + re.Equal(uint64(10), limit) + re.Equal(uint64(0), current) + }, + }, + }, + }, + } + runMulitLabelLimiter(t, limiter, testCase) +} + +func TestBlockList(t *testing.T) { + t.Parallel() + re := require.New(t) + opts := []Option{AddLabelAllowList()} + limiter := NewController() + label := "test" + + re.False(limiter.IsInAllowList(label)) + for _, opt := range opts { + opt(label, limiter) + } + re.True(limiter.IsInAllowList(label)) + + status := UpdateQPSLimiter(float64(rate.Every(time.Second)), 1)(label, limiter) + re.True(status&InAllowList != 0) + for i := 0; i < 10; i++ { + _, err := limiter.Allow(label) + re.NoError(err) + } +} + +func TestControllerWithQPSLimiter(t *testing.T) { + t.Parallel() + re := require.New(t) + limiter := NewController() + testCase := []labelCase{ + { + label: "test1", + round: []changeAndResult{ + { + opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSChanged != 0) + }, + totalRequest: 3, + fail: 2, + success: 1, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(1), limit) + re.Equal(1, burst) + }, + }, + { + opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSNoChange != 0) + }, + checkStatusFunc: func(label string) {}, + }, + { + opt: UpdateQPSLimiter(5, 5), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSChanged != 0) + }, + totalRequest: 10, + fail: 5, + success: 5, + waitDuration: time.Second, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(5), limit) + re.Equal(5, burst) + }, + }, + { + opt: UpdateQPSLimiter(0, 0), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSDeleted != 0) + }, + totalRequest: 10, + fail: 0, + success: 10, + release: 0, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(0), limit) + re.Equal(0, burst) + }, + }, + }, + }, + { + label: "test2", + round: []changeAndResult{ + { + opt: UpdateQPSLimiter(50, 5), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSChanged != 0) + }, + totalRequest: 10, + fail: 5, + success: 5, + waitDuration: time.Second, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(50), limit) + re.Equal(5, burst) + }, + }, + { + opt: UpdateQPSLimiter(0, 0), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSDeleted != 0) + }, + totalRequest: 10, + fail: 0, + success: 10, + release: 0, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(0), limit) + re.Equal(0, burst) + }, + }, + }, + }, + } + runMulitLabelLimiter(t, limiter, testCase) +} + +func TestControllerWithTwoLimiters(t *testing.T) { + t.Parallel() + re := require.New(t) + limiter := NewController() + testCase := []labelCase{ + { + label: "test1", + round: []changeAndResult{ + { + opt: UpdateDimensionConfig(&DimensionConfig{ + QPS: 100, + QPSBurst: 100, + ConcurrencyLimit: 100, + }), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSChanged != 0) + }, + totalRequest: 200, + fail: 100, + success: 100, + release: 100, + waitDuration: time.Second, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(100), limit) + re.Equal(100, burst) + climit, current := limiter.GetConcurrencyLimiterStatus(label) + re.Equal(uint64(100), climit) + re.Equal(uint64(0), current) + }, + }, + { + opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSChanged != 0) + }, + totalRequest: 200, + fail: 199, + success: 1, + release: 0, + waitDuration: time.Second, + checkStatusFunc: func(label string) { + limit, current := limiter.GetConcurrencyLimiterStatus(label) + re.Equal(uint64(100), limit) + re.Equal(uint64(1), current) + }, + }, + }, + }, + { + label: "test2", + round: []changeAndResult{ + { + opt: UpdateQPSLimiter(50, 5), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSChanged != 0) + }, + totalRequest: 10, + fail: 5, + success: 5, + waitDuration: time.Second, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(50), limit) + re.Equal(5, burst) + }, + }, + { + opt: UpdateQPSLimiter(0, 0), + checkOptionStatus: func(label string, o Option) { + status := limiter.Update(label, o) + re.True(status&QPSDeleted != 0) + }, + totalRequest: 10, + fail: 0, + success: 10, + release: 0, + waitDuration: 0, + checkStatusFunc: func(label string) { + limit, burst := limiter.GetQPSLimiterStatus(label) + re.Equal(rate.Limit(0), limit) + re.Equal(0, burst) + }, + }, + }, + }, + } + runMulitLabelLimiter(t, limiter, testCase) +} + +func countRateLimiterHandleResult(limiter *Controller, label string, successCount *int, + failedCount *int, lock *syncutil.Mutex, wg *sync.WaitGroup, r *releaseUtil) { + doneFucn, err := limiter.Allow(label) + lock.Lock() + defer lock.Unlock() + if err == nil { + *successCount++ + r.append(doneFucn) + } else { + *failedCount++ + } + wg.Done() +}