From 76bc34aee22554353bd7ea9785d3772dcb7ab31b Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 30 Sep 2024 11:46:04 +0800 Subject: [PATCH] pkg: delete config when QPS and concurrency are both deleted (#8653) ref tikv/pd#4373 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/ratelimit/controller_test.go | 32 +++---- pkg/ratelimit/limiter.go | 39 ++++----- pkg/ratelimit/limiter_test.go | 28 +++--- pkg/ratelimit/option.go | 28 +++--- server/api/service_middleware.go | 106 ++++++++++------------- server/api/service_middleware_test.go | 59 +++++-------- tests/server/server_test.go | 6 +- tools/pd-ctl/tests/config/config_test.go | 8 ++ 8 files changed, 141 insertions(+), 165 deletions(-) diff --git a/pkg/ratelimit/controller_test.go b/pkg/ratelimit/controller_test.go index d4093555ba7..a25b152c48e 100644 --- a/pkg/ratelimit/controller_test.go +++ b/pkg/ratelimit/controller_test.go @@ -89,7 +89,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { opt: UpdateConcurrencyLimiter(10), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & ConcurrencyChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 15, fail: 5, @@ -106,7 +106,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { opt: UpdateConcurrencyLimiter(10), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & ConcurrencyNoChange) + re.NotZero(status & LimiterNotChanged) }, checkStatusFunc: func(_ string) {}, }, @@ -114,7 +114,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { opt: UpdateConcurrencyLimiter(5), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & ConcurrencyChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 15, fail: 10, @@ -131,7 +131,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { opt: UpdateConcurrencyLimiter(0), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & ConcurrencyDeleted) + re.NotZero(status & LimiterDeleted) }, totalRequest: 15, fail: 0, @@ -153,7 +153,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { opt: UpdateConcurrencyLimiter(15), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & ConcurrencyChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 10, fail: 0, @@ -170,7 +170,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) { opt: UpdateConcurrencyLimiter(10), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & ConcurrencyChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 10, fail: 10, @@ -222,7 +222,7 @@ func TestControllerWithQPSLimiter(t *testing.T) { opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 3, fail: 2, @@ -238,7 +238,7 @@ func TestControllerWithQPSLimiter(t *testing.T) { opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSNoChange) + re.NotZero(status & LimiterNotChanged) }, checkStatusFunc: func(_ string) {}, }, @@ -246,7 +246,7 @@ func TestControllerWithQPSLimiter(t *testing.T) { opt: UpdateQPSLimiter(5, 5), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 10, fail: 5, @@ -262,7 +262,7 @@ func TestControllerWithQPSLimiter(t *testing.T) { opt: UpdateQPSLimiter(0, 0), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSDeleted) + re.NotZero(status & LimiterDeleted) }, totalRequest: 10, fail: 0, @@ -284,7 +284,7 @@ func TestControllerWithQPSLimiter(t *testing.T) { opt: UpdateQPSLimiter(50, 5), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 10, fail: 5, @@ -300,7 +300,7 @@ func TestControllerWithQPSLimiter(t *testing.T) { opt: UpdateQPSLimiter(0, 0), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSDeleted) + re.NotZero(status & LimiterDeleted) }, totalRequest: 10, fail: 0, @@ -335,7 +335,7 @@ func TestControllerWithTwoLimiters(t *testing.T) { }), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 200, fail: 100, @@ -355,7 +355,7 @@ func TestControllerWithTwoLimiters(t *testing.T) { opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 200, fail: 199, @@ -377,7 +377,7 @@ func TestControllerWithTwoLimiters(t *testing.T) { opt: UpdateQPSLimiter(50, 5), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) }, totalRequest: 10, fail: 5, @@ -393,7 +393,7 @@ func TestControllerWithTwoLimiters(t *testing.T) { opt: UpdateQPSLimiter(0, 0), checkOptionStatus: func(label string, o Option) { status := limiter.Update(label, o) - re.NotZero(status & QPSDeleted) + re.NotZero(status & LimiterDeleted) }, totalRequest: 10, fail: 0, diff --git a/pkg/ratelimit/limiter.go b/pkg/ratelimit/limiter.go index eaf6acf7c17..e312066dc56 100644 --- a/pkg/ratelimit/limiter.go +++ b/pkg/ratelimit/limiter.go @@ -59,15 +59,8 @@ func (l *limiter) getRateLimiter() *RateLimiter { return l.rate } -func (l *limiter) deleteRateLimiter() bool { - l.mu.Lock() - defer l.mu.Unlock() - l.rate = nil - return l.isEmpty() -} - func (l *limiter) isEmpty() bool { - return l.concurrency == nil && l.rate == nil + return (l.concurrency == nil || l.concurrency.limit == 0) && l.rate == nil } func (l *limiter) getQPSLimiterStatus() (limit rate.Limit, burst int) { @@ -89,47 +82,51 @@ func (l *limiter) getConcurrencyLimiterStatus() (limit uint64, current uint64) { func (l *limiter) updateConcurrencyConfig(limit uint64) UpdateStatus { oldConcurrencyLimit, _ := l.getConcurrencyLimiterStatus() if oldConcurrencyLimit == limit { - return ConcurrencyNoChange + return LimiterNotChanged } l.mu.Lock() defer l.mu.Unlock() if l.concurrency != nil { if limit < 1 { - l.concurrency.setLimit(0) - return ConcurrencyDeleted + l.concurrency = NewConcurrencyLimiter(0) + if l.isEmpty() { + return LimiterDeleted + } + return LimiterUpdated } l.concurrency.setLimit(limit) } else { l.concurrency = NewConcurrencyLimiter(limit) } - return ConcurrencyChanged + return LimiterUpdated } func (l *limiter) updateQPSConfig(limit float64, burst int) UpdateStatus { oldQPSLimit, oldBurst := l.getQPSLimiterStatus() if math.Abs(float64(oldQPSLimit)-limit) < eps && oldBurst == burst { - return QPSNoChange - } - if limit <= eps || burst < 1 { - l.deleteRateLimiter() - return QPSDeleted + return LimiterNotChanged } l.mu.Lock() defer l.mu.Unlock() if l.rate != nil { + if limit <= eps || burst < 1 { + l.rate = nil + if l.isEmpty() { + return LimiterDeleted + } + return LimiterUpdated + } l.rate.SetLimit(rate.Limit(limit)) l.rate.SetBurst(burst) } else { l.rate = NewRateLimiter(limit, burst) } - return QPSChanged + return LimiterUpdated } func (l *limiter) updateDimensionConfig(cfg *DimensionConfig) UpdateStatus { - status := l.updateQPSConfig(cfg.QPS, cfg.QPSBurst) - status |= l.updateConcurrencyConfig(cfg.ConcurrencyLimit) - return status + return l.updateQPSConfig(cfg.QPS, cfg.QPSBurst) | l.updateConcurrencyConfig(cfg.ConcurrencyLimit) } func (l *limiter) allow() (DoneFunc, error) { diff --git a/pkg/ratelimit/limiter_test.go b/pkg/ratelimit/limiter_test.go index 36f339b47ac..520ad3d13d1 100644 --- a/pkg/ratelimit/limiter_test.go +++ b/pkg/ratelimit/limiter_test.go @@ -44,7 +44,7 @@ func TestWithConcurrencyLimiter(t *testing.T) { limiter := newLimiter() status := limiter.updateConcurrencyConfig(10) - re.NotZero(status & ConcurrencyChanged) + re.NotZero(status & LimiterUpdated) var lock syncutil.Mutex successCount, failedCount := 0, 0 var wg sync.WaitGroup @@ -67,10 +67,10 @@ func TestWithConcurrencyLimiter(t *testing.T) { re.Equal(uint64(0), current) status = limiter.updateConcurrencyConfig(10) - re.NotZero(status & ConcurrencyNoChange) + re.NotZero(status & LimiterNotChanged) status = limiter.updateConcurrencyConfig(5) - re.NotZero(status & ConcurrencyChanged) + re.NotZero(status & LimiterUpdated) failedCount = 0 successCount = 0 for i := 0; i < 15; i++ { @@ -85,7 +85,7 @@ func TestWithConcurrencyLimiter(t *testing.T) { } status = limiter.updateConcurrencyConfig(0) - re.NotZero(status & ConcurrencyDeleted) + re.NotZero(status & LimiterDeleted) failedCount = 0 successCount = 0 for i := 0; i < 15; i++ { @@ -105,7 +105,7 @@ func TestWithQPSLimiter(t *testing.T) { re := require.New(t) limiter := newLimiter() status := limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) var lock syncutil.Mutex successCount, failedCount := 0, 0 @@ -124,10 +124,10 @@ func TestWithQPSLimiter(t *testing.T) { re.Equal(1, burst) status = limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1) - re.NotZero(status & QPSNoChange) + re.NotZero(status & LimiterNotChanged) status = limiter.updateQPSConfig(5, 5) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) limit, burst = limiter.getQPSLimiterStatus() re.Equal(rate.Limit(5), limit) re.Equal(5, burst) @@ -145,7 +145,7 @@ func TestWithQPSLimiter(t *testing.T) { time.Sleep(time.Second) status = limiter.updateQPSConfig(0, 0) - re.NotZero(status & QPSDeleted) + re.NotZero(status & LimiterDeleted) for i := 0; i < 10; i++ { _, err := limiter.allow() re.NoError(err) @@ -157,7 +157,7 @@ func TestWithQPSLimiter(t *testing.T) { successCount = 0 failedCount = 0 status = limiter.updateQPSConfig(float64(rate.Every(3*time.Second)), 100) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) wg.Add(200) for i := 0; i < 200; i++ { go countSingleLimiterHandleResult(limiter, &successCount, &failedCount, &lock, &wg, r) @@ -183,8 +183,8 @@ func TestWithTwoLimiters(t *testing.T) { } limiter := newLimiter() status := limiter.updateDimensionConfig(cfg) - re.NotZero(status & QPSChanged) - re.NotZero(status & ConcurrencyChanged) + re.NotZero(status & LimiterUpdated) + re.NotZero(status & LimiterUpdated) var lock syncutil.Mutex successCount, failedCount := 0, 0 @@ -211,7 +211,7 @@ func TestWithTwoLimiters(t *testing.T) { r.release() } status = limiter.updateQPSConfig(float64(rate.Every(10*time.Second)), 1) - re.NotZero(status & QPSChanged) + re.NotZero(status & LimiterUpdated) wg.Add(100) for i := 0; i < 100; i++ { go countSingleLimiterHandleResult(limiter, &successCount, &failedCount, &lock, &wg, r) @@ -225,8 +225,8 @@ func TestWithTwoLimiters(t *testing.T) { cfg = &DimensionConfig{} status = limiter.updateDimensionConfig(cfg) - re.NotZero(status & ConcurrencyDeleted) - re.NotZero(status & QPSDeleted) + re.NotZero(status & LimiterDeleted) + re.NotZero(status & LimiterDeleted) } func countSingleLimiterHandleResult(limiter *limiter, successCount *int, diff --git a/pkg/ratelimit/option.go b/pkg/ratelimit/option.go index f1faac5b550..e6d4a4f8ff0 100644 --- a/pkg/ratelimit/option.go +++ b/pkg/ratelimit/option.go @@ -20,18 +20,12 @@ type UpdateStatus uint32 // Flags for limiter. const ( eps float64 = 1e-8 - // QPSNoChange shows that limiter's config isn't changed. - QPSNoChange UpdateStatus = 1 << iota - // QPSChanged shows that limiter's config is changed and not deleted. - QPSChanged - // QPSDeleted shows that limiter's config is deleted. - QPSDeleted - // ConcurrencyNoChange shows that limiter's config isn't changed. - ConcurrencyNoChange - // ConcurrencyChanged shows that limiter's config is changed and not deleted. - ConcurrencyChanged - // ConcurrencyDeleted shows that limiter's config is deleted. - ConcurrencyDeleted + + LimiterNotChanged UpdateStatus = 1 << iota + // LimiterUpdated shows that limiter's config is updated. + LimiterUpdated + // LimiterDeleted shows that limiter's config is deleted. + LimiterDeleted // InAllowList shows that limiter's config isn't changed because it is in in allow list. InAllowList ) @@ -45,7 +39,7 @@ type Option func(string, *Controller) UpdateStatus func AddLabelAllowList() Option { return func(label string, l *Controller) UpdateStatus { l.labelAllowList[label] = struct{}{} - return 0 + return InAllowList } } @@ -73,11 +67,11 @@ func UpdateQPSLimiter(limit float64, burst int) Option { // UpdateDimensionConfig creates QPS limiter and concurrency limiter for a given label by config if it doesn't exist. func UpdateDimensionConfig(cfg *DimensionConfig) Option { - return func(label string, l *Controller) UpdateStatus { - if _, allow := l.labelAllowList[label]; allow { + return func(label string, c *Controller) UpdateStatus { + if _, allow := c.labelAllowList[label]; allow { return InAllowList } - lim, _ := l.limiters.LoadOrStore(label, newLimiter()) + lim, _ := c.limiters.LoadOrStore(label, newLimiter()) return lim.(*limiter).updateDimensionConfig(cfg) } } @@ -89,6 +83,6 @@ func InitLimiter() Option { return InAllowList } l.limiters.LoadOrStore(label, newLimiter()) - return ConcurrencyChanged + return LimiterNotChanged } } diff --git a/server/api/service_middleware.go b/server/api/service_middleware.go index ecd41eede08..da40a79cbea 100644 --- a/server/api/service_middleware.go +++ b/server/api/service_middleware.go @@ -68,40 +68,40 @@ func (h *serviceMiddlewareHandler) SetServiceMiddlewareConfig(w http.ResponseWri data, err := io.ReadAll(r.Body) r.Body.Close() if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) return } conf := make(map[string]any) if err := json.Unmarshal(data, &conf); err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) + h.rd.Text(w, http.StatusBadRequest, err.Error()) return } if len(conf) == 0 { - h.rd.JSON(w, http.StatusOK, "The input is empty.") + h.rd.Text(w, http.StatusOK, "The input is empty.") } for k, v := range conf { if s := strings.Split(k, "."); len(s) > 1 { if err := h.updateServiceMiddlewareConfig(cfg, k, v); err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) + h.rd.Text(w, http.StatusBadRequest, err.Error()) return } continue } key := reflectutil.FindJSONFullTagByChildTag(reflect.TypeOf(config.ServiceMiddlewareConfig{}), k) if key == "" { - h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) + h.rd.Text(w, http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) return } if err := h.updateServiceMiddlewareConfig(cfg, key, v); err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) + h.rd.Text(w, http.StatusBadRequest, err.Error()) return } } - h.rd.JSON(w, http.StatusOK, "The service-middleware config is updated.") + h.rd.Text(w, http.StatusOK, "The service-middleware config is updated.") } func (h *serviceMiddlewareHandler) updateServiceMiddlewareConfig(cfg *config.ServiceMiddlewareConfig, key string, value any) error { @@ -148,7 +148,7 @@ func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r * } typeStr, ok := input["type"].(string) if !ok { - h.rd.JSON(w, http.StatusBadRequest, "The type is empty.") + h.rd.Text(w, http.StatusBadRequest, "The type is empty.") return } var serviceLabel string @@ -156,42 +156,41 @@ func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r * case "label": serviceLabel, ok = input["label"].(string) if !ok || len(serviceLabel) == 0 { - h.rd.JSON(w, http.StatusBadRequest, "The label is empty.") + h.rd.Text(w, http.StatusBadRequest, "The label is empty.") return } if len(h.svr.GetServiceLabels(serviceLabel)) == 0 { - h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.") + h.rd.Text(w, http.StatusBadRequest, "There is no label matched.") return } case "path": method, _ := input["method"].(string) path, ok := input["path"].(string) if !ok || len(path) == 0 { - h.rd.JSON(w, http.StatusBadRequest, "The path is empty.") + h.rd.Text(w, http.StatusBadRequest, "The path is empty.") return } serviceLabel = h.svr.GetAPIAccessServiceLabel(apiutil.NewAccessPath(path, method)) if len(serviceLabel) == 0 { - h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.") + h.rd.Text(w, http.StatusBadRequest, "There is no label matched.") return } default: - h.rd.JSON(w, http.StatusBadRequest, "The type is invalid.") + h.rd.Text(w, http.StatusBadRequest, "The type is invalid.") return } if h.svr.IsInRateLimitAllowList(serviceLabel) { - h.rd.JSON(w, http.StatusBadRequest, "This service is in allow list whose config can not be changed.") + h.rd.Text(w, http.StatusBadRequest, "This service is in allow list whose config can not be changed.") return } - cfg := h.svr.GetRateLimitConfig().LimiterConfig[serviceLabel] + oldCfg := h.svr.GetRateLimitConfig().Clone() + cfg := oldCfg.LimiterConfig[serviceLabel] // update concurrency limiter - concurrencyUpdatedFlag := "Concurrency limiter is not changed." concurrencyFloat, okc := input["concurrency"].(float64) if okc { cfg.ConcurrencyLimit = uint64(concurrencyFloat) } // update qps rate limiter - qpsRateUpdatedFlag := "QPS rate limiter is not changed." qps, okq := input["qps"].(float64) if okq { burst := 0 @@ -204,27 +203,26 @@ func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r * cfg.QPSBurst = burst } if !okc && !okq { - h.rd.JSON(w, http.StatusOK, "No changed.") + h.rd.Text(w, http.StatusOK, "Rate limiter is not changed.") } else { status := h.svr.UpdateServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&cfg)) - switch { - case status&ratelimit.QPSChanged != 0: - qpsRateUpdatedFlag = "QPS rate limiter is changed." - case status&ratelimit.QPSDeleted != 0: - qpsRateUpdatedFlag = "QPS rate limiter is deleted." - } - switch { - case status&ratelimit.ConcurrencyChanged != 0: - concurrencyUpdatedFlag = "Concurrency limiter is changed." - case status&ratelimit.ConcurrencyDeleted != 0: - concurrencyUpdatedFlag = "Concurrency limiter is deleted." + if status&ratelimit.LimiterDeleted != 0 { + cfg := h.svr.GetServiceMiddlewareConfig() + delete(cfg.RateLimitConfig.LimiterConfig, serviceLabel) + if err := h.svr.SetRateLimitConfig(cfg.RateLimitConfig); err != nil { + old := oldCfg.LimiterConfig[serviceLabel] + h.svr.UpdateServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&old)) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.Text(w, http.StatusOK, "Rate limiter is deleted.") + return } err := h.svr.UpdateRateLimitConfig("limiter-config", serviceLabel, cfg) if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) } else { - result := rateLimitResult{concurrencyUpdatedFlag, qpsRateUpdatedFlag, h.svr.GetServiceMiddlewareConfig().RateLimitConfig.LimiterConfig} - h.rd.JSON(w, http.StatusOK, result) + h.rd.Text(w, http.StatusOK, "Rate limiter is updated.") } } } @@ -245,23 +243,22 @@ func (h *serviceMiddlewareHandler) SetGRPCRateLimitConfig(w http.ResponseWriter, serviceLabel, ok := input["label"].(string) if !ok || len(serviceLabel) == 0 { - h.rd.JSON(w, http.StatusBadRequest, "The label is empty.") + h.rd.Text(w, http.StatusBadRequest, "The label is empty.") return } if !h.svr.IsGRPCServiceLabelExist(serviceLabel) { - h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.") + h.rd.Text(w, http.StatusBadRequest, "There is no label matched.") return } - cfg := h.svr.GetGRPCRateLimitConfig().LimiterConfig[serviceLabel] + oldCfg := h.svr.GetGRPCRateLimitConfig().Clone() + cfg := oldCfg.LimiterConfig[serviceLabel] // update concurrency limiter - concurrencyUpdatedFlag := "Concurrency limiter is not changed." concurrencyFloat, okc := input["concurrency"].(float64) if okc { cfg.ConcurrencyLimit = uint64(concurrencyFloat) } // update qps rate limiter - qpsRateUpdatedFlag := "QPS rate limiter is not changed." qps, okq := input["qps"].(float64) if okq { burst := 0 @@ -274,33 +271,26 @@ func (h *serviceMiddlewareHandler) SetGRPCRateLimitConfig(w http.ResponseWriter, cfg.QPSBurst = burst } if !okc && !okq { - h.rd.JSON(w, http.StatusOK, "No changed.") + h.rd.Text(w, http.StatusOK, "gRPC limiter is not changed.") } else { status := h.svr.UpdateGRPCServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&cfg)) - switch { - case status&ratelimit.QPSChanged != 0: - qpsRateUpdatedFlag = "QPS rate limiter is changed." - case status&ratelimit.QPSDeleted != 0: - qpsRateUpdatedFlag = "QPS rate limiter is deleted." - } - switch { - case status&ratelimit.ConcurrencyChanged != 0: - concurrencyUpdatedFlag = "Concurrency limiter is changed." - case status&ratelimit.ConcurrencyDeleted != 0: - concurrencyUpdatedFlag = "Concurrency limiter is deleted." + if status&ratelimit.LimiterDeleted != 0 { + cfg := h.svr.GetServiceMiddlewareConfig() + delete(cfg.GRPCRateLimitConfig.LimiterConfig, serviceLabel) + if err := h.svr.SetGRPCRateLimitConfig(cfg.GRPCRateLimitConfig); err != nil { + old := oldCfg.LimiterConfig[serviceLabel] + h.svr.UpdateGRPCServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&old)) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.Text(w, http.StatusOK, "gRPC limiter is deleted.") + return } err := h.svr.UpdateGRPCRateLimitConfig("grpc-limiter-config", serviceLabel, cfg) if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + h.rd.Text(w, http.StatusInternalServerError, err.Error()) } else { - result := rateLimitResult{concurrencyUpdatedFlag, qpsRateUpdatedFlag, h.svr.GetServiceMiddlewareConfig().GRPCRateLimitConfig.LimiterConfig} - h.rd.JSON(w, http.StatusOK, result) + h.rd.Text(w, http.StatusOK, "gRPC limiter is updated.") } } } - -type rateLimitResult struct { - ConcurrencyUpdatedFlag string `json:"concurrency"` - QPSRateUpdatedFlag string `json:"qps"` - LimiterConfig map[string]ratelimit.DimensionConfig `json:"limiter-config"` -} diff --git a/server/api/service_middleware_test.go b/server/api/service_middleware_test.go index 7d5e0db98be..2599ccbfb54 100644 --- a/server/api/service_middleware_test.go +++ b/server/api/service_middleware_test.go @@ -140,7 +140,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err := json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"The type is empty.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "The type is empty.")) re.NoError(err) // test invalid type input = make(map[string]any) @@ -148,7 +148,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"The type is invalid.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "The type is invalid.")) re.NoError(err) // test empty label @@ -158,7 +158,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"The label is empty.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "The label is empty.")) re.NoError(err) // test no label matched input = make(map[string]any) @@ -167,7 +167,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"There is no label matched.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "There is no label matched.")) re.NoError(err) // test empty path @@ -177,7 +177,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"The path is empty.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "The path is empty.")) re.NoError(err) // test path but no label matched @@ -187,7 +187,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"There is no label matched.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "There is no label matched.")) re.NoError(err) // no change @@ -197,7 +197,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringEqual(re, "\"No changed.\"\n")) + tu.StatusOK(re), tu.StringEqual(re, "Rate limiter is not changed.")) re.NoError(err) // change concurrency @@ -209,13 +209,13 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is changed.")) + tu.StatusOK(re), tu.StringContain(re, "Rate limiter is updated")) re.NoError(err) input["concurrency"] = 0 jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is deleted.")) + tu.StatusOK(re), tu.StringContain(re, "Rate limiter is deleted")) re.NoError(err) // change qps @@ -227,7 +227,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is changed.")) + tu.StatusOK(re), tu.StringContain(re, "Rate limiter is updated.")) re.NoError(err) input = make(map[string]any) @@ -238,7 +238,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is changed.")) + tu.StatusOK(re), tu.StringContain(re, "Rate limiter is updated.")) re.NoError(err) re.Equal(1, suite.svr.GetRateLimitConfig().LimiterConfig["GetHealthStatus"].QPSBurst) @@ -246,7 +246,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is deleted.")) + tu.StatusOK(re), tu.StringContain(re, "Rate limiter is deleted.")) re.NoError(err) // change both @@ -257,15 +257,8 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { input["concurrency"] = 100 jsonBody, err = json.Marshal(input) re.NoError(err) - result := rateLimitResult{} err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is changed."), - tu.StringContain(re, "QPS rate limiter is changed."), - tu.ExtractJSON(re, &result), - ) - re.Equal(100., result.LimiterConfig["Profile"].QPS) - re.Equal(100, result.LimiterConfig["Profile"].QPSBurst) - re.Equal(uint64(100), result.LimiterConfig["Profile"].ConcurrencyLimit) + tu.StatusOK(re), tu.StringContain(re, "Rate limiter is updated.")) re.NoError(err) limiter := suite.svr.GetServiceRateLimiter() @@ -280,7 +273,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusNotOK(re), tu.StringEqual(re, "\"This service is in allow list whose config can not be changed.\"\n")) + tu.StatusNotOK(re), tu.StringEqual(re, "This service is in allow list whose config can not be changed.")) re.NoError(err) } @@ -294,7 +287,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { jsonBody, err := json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"The label is empty.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "The label is empty.")) re.NoError(err) // test no label matched input = make(map[string]any) @@ -302,7 +295,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"There is no label matched.\"\n")) + tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "There is no label matched.")) re.NoError(err) // no change @@ -311,7 +304,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringEqual(re, "\"No changed.\"\n")) + tu.StatusOK(re), tu.StringEqual(re, "gRPC limiter is not changed.")) re.NoError(err) // change concurrency @@ -321,13 +314,13 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is changed.")) + tu.StatusOK(re), tu.StringContain(re, "gRPC limiter is updated.")) re.NoError(err) input["concurrency"] = 0 jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is deleted.")) + tu.StatusOK(re), tu.StringContain(re, "gRPC limiter is deleted.")) re.NoError(err) // change qps @@ -337,7 +330,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is changed.")) + tu.StatusOK(re), tu.StringContain(re, "gRPC limiter is updated.")) re.NoError(err) input = make(map[string]any) @@ -346,7 +339,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is changed.")) + tu.StatusOK(re), tu.StringContain(re, "gRPC limiter is updated.")) re.NoError(err) re.Equal(1, suite.svr.GetGRPCRateLimitConfig().LimiterConfig["StoreHeartbeat"].QPSBurst) @@ -354,7 +347,7 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { jsonBody, err = json.Marshal(input) re.NoError(err) err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is deleted.")) + tu.StatusOK(re), tu.StringContain(re, "gRPC limiter is deleted.")) re.NoError(err) // change both @@ -364,15 +357,9 @@ func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() { input["concurrency"] = 100 jsonBody, err = json.Marshal(input) re.NoError(err) - result := rateLimitResult{} err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody, - tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is changed."), - tu.StringContain(re, "QPS rate limiter is changed."), - tu.ExtractJSON(re, &result), + tu.StatusOK(re), ) - re.Equal(100., result.LimiterConfig["GetStore"].QPS) - re.Equal(100, result.LimiterConfig["GetStore"].QPSBurst) - re.Equal(uint64(100), result.LimiterConfig["GetStore"].ConcurrencyLimit) re.NoError(err) } diff --git a/tests/server/server_test.go b/tests/server/server_test.go index c41ed0f96a6..68c4d5da65e 100644 --- a/tests/server/server_test.go +++ b/tests/server/server_test.go @@ -168,7 +168,7 @@ func TestGRPCRateLimit(t *testing.T) { jsonBody, err := json.Marshal(input) re.NoError(err) err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix, jsonBody, - testutil.StatusOK(re), testutil.StringContain(re, "QPS rate limiter is changed")) + testutil.StatusOK(re), testutil.StringContain(re, "gRPC limiter is updated")) re.NoError(err) for i := 0; i < 2; i++ { resp, err := grpcPDClient.GetRegion(context.Background(), &pdpb.GetRegionRequest{ @@ -188,7 +188,7 @@ func TestGRPCRateLimit(t *testing.T) { jsonBody, err = json.Marshal(input) re.NoError(err) err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix, jsonBody, - testutil.StatusOK(re), testutil.StringContain(re, "QPS rate limiter is deleted")) + testutil.StatusOK(re), testutil.StringContain(re, "gRPC limiter is deleted")) re.NoError(err) for i := 0; i < 100; i++ { resp, err := grpcPDClient.GetRegion(context.Background(), &pdpb.GetRegionRequest{ @@ -208,7 +208,7 @@ func TestGRPCRateLimit(t *testing.T) { errCh = make(chan string) ) err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix, jsonBody, - testutil.StatusOK(re), testutil.StringContain(re, "Concurrency limiter is changed")) + testutil.StatusOK(re), testutil.StringContain(re, "gRPC limiter is updated")) re.NoError(err) re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayProcess", `pause`)) var wg sync.WaitGroup diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index aa9c624a303..5a9e077d7e4 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -1020,6 +1020,14 @@ func TestServiceMiddlewareConfig(t *testing.T) { re.NoError(err) conf.GRPCRateLimitConfig.EnableRateLimit = false check() + _, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "rate-limit", "GetRegion", "concurrency", "0") + re.NoError(err) + conf.RateLimitConfig.LimiterConfig["GetRegion"] = ratelimit.DimensionConfig{QPS: 100, QPSBurst: 100} + check() + _, err = tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "service-middleware", "rate-limit", "GetRegion", "qps", "0") + re.NoError(err) + delete(conf.RateLimitConfig.LimiterConfig, "GetRegion") + check() } func (suite *configTestSuite) TestUpdateDefaultReplicaConfig() {