Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Aug 1, 2024
1 parent ee638dc commit 881b24d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 92 deletions.
146 changes: 55 additions & 91 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ func (gc *groupCostController) updateRunState() {
}
*gc.run.consumption = *gc.mu.consumption
gc.mu.Unlock()
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption))
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption), zap.Bool("is-throttled", gc.isThrottled.Load()))
gc.run.now = newTime
}

Expand Down Expand Up @@ -1042,7 +1042,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
}
gc.burstable.Store(isBurstable)
}
Expand All @@ -1056,7 +1056,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
}
gc.burstable.Store(isBurstable)
}
Expand Down Expand Up @@ -1343,6 +1343,54 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {
return value
}

func (gc *groupCostController) acquireTokens(ctx context.Context, delta *rmpb.Consumption, waitDuration *time.Duration, allowDebt bool) (time.Duration, error) {
gc.metrics.runningKVRequestCounter.Inc()
defer gc.metrics.runningKVRequestCounter.Dec()
var (
err error
d time.Duration
)
retryLoop:
for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ {
now := time.Now()
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
// record the consume token histogram if enable controller debug mode.
if enableControllerTraceLog.Load() {
gc.metrics.consumeTokenHistogram.Observe(v)
}
// allow debt for small request or not in throttled. remove tokens directly.
if allowDebt {
counter.limiter.RemoveTokens(now, v)
break retryLoop
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
*waitDuration += gc.mainCfg.WaitRetryInterval
}
return d, err
}

func (gc *groupCostController) onRequestWaitImpl(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
Expand All @@ -1357,44 +1405,7 @@ func (gc *groupCostController) onRequestWaitImpl(
var waitDuration time.Duration

if !gc.burstable.Load() {
var err error
var i int
var d time.Duration
gc.metrics.runningKVRequestCounter.Inc()
defer gc.metrics.runningKVRequestCounter.Dec()
retryLoop:
for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ {
now := time.Now()
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
// record the consume token histogram if enable controller debug mode.
if enableControllerTraceLog.Load() {
gc.metrics.consumeTokenHistogram.Observe(v)
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
d, err := gc.acquireTokens(ctx, delta, &waitDuration, false)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
Expand Down Expand Up @@ -1479,57 +1490,10 @@ func (gc *groupCostController) onResponseWaitImpl(
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
var (
waitDuration time.Duration
d time.Duration
err error
)
var waitDuration time.Duration
if !gc.burstable.Load() {
gc.metrics.runningKVRequestCounter.Inc()
defer gc.metrics.runningKVRequestCounter.Dec()
retryLoop:
for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ {
now := time.Now()
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
// allow debt for small request or not in throttled.
if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() {
counter.limiter.RemoveTokens(time.Now(), v)
break retryLoop
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
// record the consume token histogram if enable controller debug mode.
if enableControllerTraceLog.Load() {
gc.metrics.consumeTokenHistogram.Observe(v)
}
// allow debt for small request or not in throttled.
if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() {
counter.limiter.RemoveTokens(time.Now(), v)
break retryLoop
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(context.Background(), now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", lim.notifyThreshold), zap.Int64("old-burst", lim.burst))
logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", lim.notifyThreshold), zap.Int64("old-burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
Expand Down

0 comments on commit 881b24d

Please sign in to comment.