Skip to content

Commit

Permalink
*: closed immediately if error rate is changed to 0 (#8887)
Browse files Browse the repository at this point in the history
ref #8678

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored Dec 18, 2024
1 parent 88a761e commit 2d970a6
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 19 deletions.
27 changes: 18 additions & 9 deletions client/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
defer cb.mutex.Unlock()

apply(cb.config)
log.Info("circuit breaker settings changed", zap.Any("config", cb.config))
}

// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
Expand Down Expand Up @@ -238,10 +239,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
observedErrorRatePct := s.failureCount * 100 / total
if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct {
// the error threshold is breached, let's move to open state and start failing all requests
log.Error("Circuit breaker tripped. Starting to fail all requests",
log.Error("circuit breaker tripped and starting to fail all requests",
zap.String("name", cb.name),
zap.Uint32("observedErrorRatePct", observedErrorRatePct),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Uint32("observed-err-rate-pct", observedErrorRatePct),
zap.Any("config", cb.config))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
}
}
Expand All @@ -253,29 +254,37 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
// continue in closed state till ErrorRateWindow is over
return s, nil
case StateOpen:
if s.cb.config.ErrorRateThresholdPct == 0 {
return cb.newState(now, StateClosed), nil
}

if now.After(s.end) {
// CoolDownInterval is over, it is time to transition to half-open state
log.Info("Circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateHalfOpen), nil
} else {
// continue in the open state till CoolDownInterval is over
return s, errs.ErrCircuitBreakerOpen
}
case StateHalfOpen:
if s.cb.config.ErrorRateThresholdPct == 0 {
return cb.newState(now, StateClosed), nil
}

// do we need some expire time here in case of one of pending requests is stuck forever?
if s.failureCount > 0 {
// there were some failures during half-open state, let's go back to open state to wait a bit longer
log.Error("Circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
} else if s.successCount == s.cb.config.HalfOpenSuccessCount {
// all probe requests are succeeded, we can move to closed state and allow all requests
log.Info("Circuit breaker is closed. Start allowing all requests",
log.Info("circuit breaker is closed and start allowing all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateClosed), nil
} else if s.pendingCount < s.cb.config.HalfOpenSuccessCount {
// allow more probe requests and continue in half-open state
Expand Down
18 changes: 9 additions & 9 deletions client/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var settings = Settings{

var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindow.Seconds()))

func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) {
func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
originalError := errors.New("circuit breaker is open")
Expand All @@ -57,7 +57,7 @@ func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) {
re.Equal(42, result)
}

func TestCircuitBreaker_OpenState(t *testing.T) {
func TestCircuitBreakerOpenState(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
driveQPS(cb, minCountToOpen, Yes, re)
Expand All @@ -68,7 +68,7 @@ func TestCircuitBreaker_OpenState(t *testing.T) {
re.Equal(StateOpen, cb.state.stateType)
}

func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) {
func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -78,7 +78,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) {
func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -89,7 +89,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) {
func TestCircuitBreakerHalfOpenToClosed(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -107,7 +107,7 @@ func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) {
func TestCircuitBreakerHalfOpenToOpen(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -130,7 +130,7 @@ func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) {

// in half open state, circuit breaker will allow only HalfOpenSuccessCount pending and should fast fail all other request till HalfOpenSuccessCount requests is completed
// this test moves circuit breaker to the half open state and verifies that requests above HalfOpenSuccessCount are failing
func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) {
func TestCircuitBreakerHalfOpenFailOverPendingCount(t *testing.T) {
re := require.New(t)
cb := newCircuitBreakerMovedToHalfOpenState(re)

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) {
re.Equal(uint32(1), cb.state.successCount)
}

func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) {
func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) {
re.Equal(uint32(1), cb.state.successCount)
}

func TestCircuitBreaker_ChangeSettings(t *testing.T) {
func TestCircuitBreakerChangeSettings(t *testing.T) {
re := require.New(t)

cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings)
Expand Down
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -661,6 +663,9 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
failpoint.Inject("triggerCircuitBreaker", func() {
err = status.Error(codes.ResourceExhausted, "resource exhausted")
})
return region, isOverloaded(err), err
})
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
Expand Down
2 changes: 1 addition & 1 deletion client/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func initMetrics(constLabels prometheus.Labels) {
Namespace: "pd_client",
Subsystem: "request",
Name: "circuit_breaker_count",
Help: "Circuit Breaker counters",
Help: "Circuit breaker counters",
ConstLabels: constLabels,
}, []string{"name", "success"})
}
Expand Down
172 changes: 172 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"math"
"os"
"path"
"reflect"
"sort"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"

pd "github.com/tikv/pd/client"
cb "github.com/tikv/pd/client/circuitbreaker"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
Expand Down Expand Up @@ -2049,3 +2051,173 @@ func needRetry(err error) bool {
}
return st.Code() == codes.ResourceExhausted
}

func TestCircuitBreaker(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 1,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

// wait cooldown
time.Sleep(time.Second)

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}
}

func TestCircuitBreakerOpenAndChangeSettings(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 1,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) {
*config = cb.AlwaysClosedSettings
})

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "ResourceExhausted")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))
}

func TestCircuitBreakerHalfOpenAndChangeSettings(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 20,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
// wait for cooldown
time.Sleep(time.Second)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))
// trigger circuit breaker state to be half open
_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
testutil.Eventually(re, func() bool {
b, _ := os.ReadFile(fname)
l := string(b)
// We need to check the log to see if the circuit breaker is half open
return strings.Contains(l, "Transitioning to half-open state to test the service")
})

// The state is half open
re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))
// change settings to always closed
cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) {
*config = cb.AlwaysClosedSettings
})

// It won't be changed to open state.
for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.NotContains(err.Error(), "circuit breaker is open")
}
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))
}

0 comments on commit 2d970a6

Please sign in to comment.