Skip to content

Commit

Permalink
pkg: delete config when QPS and concurrency are both deleted (tikv#8653)
Browse files Browse the repository at this point in the history
ref tikv#4373

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Sep 30, 2024
1 parent 26ced22 commit 76bc34a
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 165 deletions.
32 changes: 16 additions & 16 deletions pkg/ratelimit/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(10),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 15,
fail: 5,
Expand All @@ -106,15 +106,15 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(10),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyNoChange)
re.NotZero(status & LimiterNotChanged)
},
checkStatusFunc: func(_ string) {},
},
{
opt: UpdateConcurrencyLimiter(5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 15,
fail: 10,
Expand All @@ -131,7 +131,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 15,
fail: 0,
Expand All @@ -153,7 +153,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(15),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 0,
Expand All @@ -170,7 +170,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(10),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 10,
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 3,
fail: 2,
Expand All @@ -238,15 +238,15 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSNoChange)
re.NotZero(status & LimiterNotChanged)
},
checkStatusFunc: func(_ string) {},
},
{
opt: UpdateQPSLimiter(5, 5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 5,
Expand All @@ -262,7 +262,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(0, 0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 10,
fail: 0,
Expand All @@ -284,7 +284,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(50, 5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 5,
Expand All @@ -300,7 +300,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(0, 0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 10,
fail: 0,
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
}),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 200,
fail: 100,
Expand All @@ -355,7 +355,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 200,
fail: 199,
Expand All @@ -377,7 +377,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
opt: UpdateQPSLimiter(50, 5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 5,
Expand All @@ -393,7 +393,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
opt: UpdateQPSLimiter(0, 0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 10,
fail: 0,
Expand Down
39 changes: 18 additions & 21 deletions pkg/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,8 @@ func (l *limiter) getRateLimiter() *RateLimiter {
return l.rate
}

func (l *limiter) deleteRateLimiter() bool {
l.mu.Lock()
defer l.mu.Unlock()
l.rate = nil
return l.isEmpty()
}

func (l *limiter) isEmpty() bool {
return l.concurrency == nil && l.rate == nil
return (l.concurrency == nil || l.concurrency.limit == 0) && l.rate == nil
}

func (l *limiter) getQPSLimiterStatus() (limit rate.Limit, burst int) {
Expand All @@ -89,47 +82,51 @@ func (l *limiter) getConcurrencyLimiterStatus() (limit uint64, current uint64) {
func (l *limiter) updateConcurrencyConfig(limit uint64) UpdateStatus {
oldConcurrencyLimit, _ := l.getConcurrencyLimiterStatus()
if oldConcurrencyLimit == limit {
return ConcurrencyNoChange
return LimiterNotChanged
}

l.mu.Lock()
defer l.mu.Unlock()
if l.concurrency != nil {
if limit < 1 {
l.concurrency.setLimit(0)
return ConcurrencyDeleted
l.concurrency = NewConcurrencyLimiter(0)
if l.isEmpty() {
return LimiterDeleted
}
return LimiterUpdated
}
l.concurrency.setLimit(limit)
} else {
l.concurrency = NewConcurrencyLimiter(limit)
}
return ConcurrencyChanged
return LimiterUpdated
}

func (l *limiter) updateQPSConfig(limit float64, burst int) UpdateStatus {
oldQPSLimit, oldBurst := l.getQPSLimiterStatus()
if math.Abs(float64(oldQPSLimit)-limit) < eps && oldBurst == burst {
return QPSNoChange
}
if limit <= eps || burst < 1 {
l.deleteRateLimiter()
return QPSDeleted
return LimiterNotChanged
}
l.mu.Lock()
defer l.mu.Unlock()
if l.rate != nil {
if limit <= eps || burst < 1 {
l.rate = nil
if l.isEmpty() {
return LimiterDeleted
}
return LimiterUpdated
}
l.rate.SetLimit(rate.Limit(limit))
l.rate.SetBurst(burst)
} else {
l.rate = NewRateLimiter(limit, burst)
}
return QPSChanged
return LimiterUpdated
}

func (l *limiter) updateDimensionConfig(cfg *DimensionConfig) UpdateStatus {
status := l.updateQPSConfig(cfg.QPS, cfg.QPSBurst)
status |= l.updateConcurrencyConfig(cfg.ConcurrencyLimit)
return status
return l.updateQPSConfig(cfg.QPS, cfg.QPSBurst) | l.updateConcurrencyConfig(cfg.ConcurrencyLimit)
}

func (l *limiter) allow() (DoneFunc, error) {
Expand Down
28 changes: 14 additions & 14 deletions pkg/ratelimit/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestWithConcurrencyLimiter(t *testing.T) {

limiter := newLimiter()
status := limiter.updateConcurrencyConfig(10)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
var lock syncutil.Mutex
successCount, failedCount := 0, 0
var wg sync.WaitGroup
Expand All @@ -67,10 +67,10 @@ func TestWithConcurrencyLimiter(t *testing.T) {
re.Equal(uint64(0), current)

status = limiter.updateConcurrencyConfig(10)
re.NotZero(status & ConcurrencyNoChange)
re.NotZero(status & LimiterNotChanged)

status = limiter.updateConcurrencyConfig(5)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
failedCount = 0
successCount = 0
for i := 0; i < 15; i++ {
Expand All @@ -85,7 +85,7 @@ func TestWithConcurrencyLimiter(t *testing.T) {
}

status = limiter.updateConcurrencyConfig(0)
re.NotZero(status & ConcurrencyDeleted)
re.NotZero(status & LimiterDeleted)
failedCount = 0
successCount = 0
for i := 0; i < 15; i++ {
Expand All @@ -105,7 +105,7 @@ func TestWithQPSLimiter(t *testing.T) {
re := require.New(t)
limiter := newLimiter()
status := limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)

var lock syncutil.Mutex
successCount, failedCount := 0, 0
Expand All @@ -124,10 +124,10 @@ func TestWithQPSLimiter(t *testing.T) {
re.Equal(1, burst)

status = limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1)
re.NotZero(status & QPSNoChange)
re.NotZero(status & LimiterNotChanged)

status = limiter.updateQPSConfig(5, 5)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
limit, burst = limiter.getQPSLimiterStatus()
re.Equal(rate.Limit(5), limit)
re.Equal(5, burst)
Expand All @@ -145,7 +145,7 @@ func TestWithQPSLimiter(t *testing.T) {
time.Sleep(time.Second)

status = limiter.updateQPSConfig(0, 0)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
for i := 0; i < 10; i++ {
_, err := limiter.allow()
re.NoError(err)
Expand All @@ -157,7 +157,7 @@ func TestWithQPSLimiter(t *testing.T) {
successCount = 0
failedCount = 0
status = limiter.updateQPSConfig(float64(rate.Every(3*time.Second)), 100)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
wg.Add(200)
for i := 0; i < 200; i++ {
go countSingleLimiterHandleResult(limiter, &successCount, &failedCount, &lock, &wg, r)
Expand All @@ -183,8 +183,8 @@ func TestWithTwoLimiters(t *testing.T) {
}
limiter := newLimiter()
status := limiter.updateDimensionConfig(cfg)
re.NotZero(status & QPSChanged)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
re.NotZero(status & LimiterUpdated)

var lock syncutil.Mutex
successCount, failedCount := 0, 0
Expand All @@ -211,7 +211,7 @@ func TestWithTwoLimiters(t *testing.T) {
r.release()
}
status = limiter.updateQPSConfig(float64(rate.Every(10*time.Second)), 1)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
wg.Add(100)
for i := 0; i < 100; i++ {
go countSingleLimiterHandleResult(limiter, &successCount, &failedCount, &lock, &wg, r)
Expand All @@ -225,8 +225,8 @@ func TestWithTwoLimiters(t *testing.T) {

cfg = &DimensionConfig{}
status = limiter.updateDimensionConfig(cfg)
re.NotZero(status & ConcurrencyDeleted)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
re.NotZero(status & LimiterDeleted)
}

func countSingleLimiterHandleResult(limiter *limiter, successCount *int,
Expand Down
28 changes: 11 additions & 17 deletions pkg/ratelimit/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@ type UpdateStatus uint32
// Flags for limiter.
const (
eps float64 = 1e-8
// QPSNoChange shows that limiter's config isn't changed.
QPSNoChange UpdateStatus = 1 << iota
// QPSChanged shows that limiter's config is changed and not deleted.
QPSChanged
// QPSDeleted shows that limiter's config is deleted.
QPSDeleted
// ConcurrencyNoChange shows that limiter's config isn't changed.
ConcurrencyNoChange
// ConcurrencyChanged shows that limiter's config is changed and not deleted.
ConcurrencyChanged
// ConcurrencyDeleted shows that limiter's config is deleted.
ConcurrencyDeleted

LimiterNotChanged UpdateStatus = 1 << iota
// LimiterUpdated shows that limiter's config is updated.
LimiterUpdated
// LimiterDeleted shows that limiter's config is deleted.
LimiterDeleted
// InAllowList shows that limiter's config isn't changed because it is in in allow list.
InAllowList
)
Expand All @@ -45,7 +39,7 @@ type Option func(string, *Controller) UpdateStatus
func AddLabelAllowList() Option {
return func(label string, l *Controller) UpdateStatus {
l.labelAllowList[label] = struct{}{}
return 0
return InAllowList
}
}

Expand Down Expand Up @@ -73,11 +67,11 @@ func UpdateQPSLimiter(limit float64, burst int) Option {

// UpdateDimensionConfig creates QPS limiter and concurrency limiter for a given label by config if it doesn't exist.
func UpdateDimensionConfig(cfg *DimensionConfig) Option {
return func(label string, l *Controller) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, _ := c.limiters.LoadOrStore(label, newLimiter())
return lim.(*limiter).updateDimensionConfig(cfg)
}
}
Expand All @@ -89,6 +83,6 @@ func InitLimiter() Option {
return InAllowList
}
l.limiters.LoadOrStore(label, newLimiter())
return ConcurrencyChanged
return LimiterNotChanged
}
}
Loading

0 comments on commit 76bc34a

Please sign in to comment.