Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 26, 2024
1 parent 0f13444 commit 420781e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
26 changes: 13 additions & 13 deletions client/pkg/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// advance emulate the state machine clock moves forward by the given duration
func (cb *CircuitBreaker[T]) advance(duration time.Duration) {
func (cb *CircuitBreaker) advance(duration time.Duration) {
cb.state.end = cb.state.end.Add(-duration - 1)
}

Expand All @@ -40,7 +40,7 @@ var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindo

func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
cb := NewCircuitBreaker("test_cb", settings)
originalError := errors.New("circuit breaker is open")

err := cb.Execute(func() (Overloading, error) {
Expand All @@ -57,7 +57,7 @@ func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) {

func TestCircuitBreakerOpenState(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
cb := NewCircuitBreaker("test_cb", settings)
driveQPS(cb, minCountToOpen, Yes, re)
re.Equal(StateClosed, cb.state.stateType)
assertSucceeds(cb, re) // no error till ErrorRateWindow is finished
Expand All @@ -68,7 +68,7 @@ func TestCircuitBreakerOpenState(t *testing.T) {

func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
cb := NewCircuitBreaker("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
driveQPS(cb, minCountToOpen/2, Yes, re)
cb.advance(settings.ErrorRateWindow)
Expand All @@ -78,7 +78,7 @@ func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) {

func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
cb := NewCircuitBreaker("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
driveQPS(cb, minCountToOpen/4, Yes, re)
driveQPS(cb, minCountToOpen, No, re)
Expand All @@ -89,7 +89,7 @@ func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) {

func TestCircuitBreakerHalfOpenToClosed(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
cb := NewCircuitBreaker("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
driveQPS(cb, minCountToOpen, Yes, re)
cb.advance(settings.ErrorRateWindow)
Expand All @@ -107,7 +107,7 @@ func TestCircuitBreakerHalfOpenToClosed(t *testing.T) {

func TestCircuitBreakerHalfOpenToOpen(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
cb := NewCircuitBreaker("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
driveQPS(cb, minCountToOpen, Yes, re)
cb.advance(settings.ErrorRateWindow)
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) {
func TestCircuitBreakerChangeSettings(t *testing.T) {
re := require.New(t)

cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings)
cb := NewCircuitBreaker("test_cb", AlwaysClosedSettings)
driveQPS(cb, int(AlwaysClosedSettings.MinQPSForOpen*uint32(AlwaysClosedSettings.ErrorRateWindow.Seconds())), Yes, re)
cb.advance(AlwaysClosedSettings.ErrorRateWindow)
assertSucceeds(cb, re)
Expand All @@ -229,8 +229,8 @@ func TestCircuitBreakerChangeSettings(t *testing.T) {
re.Equal(StateOpen, cb.state.stateType)
}

func newCircuitBreakerMovedToHalfOpenState(re *require.Assertions) *CircuitBreaker[int] {
cb := NewCircuitBreaker[int]("test_cb", settings)
func newCircuitBreakerMovedToHalfOpenState(re *require.Assertions) *CircuitBreaker {
cb := NewCircuitBreaker("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
driveQPS(cb, minCountToOpen, Yes, re)
cb.advance(settings.ErrorRateWindow)
Expand All @@ -240,7 +240,7 @@ func newCircuitBreakerMovedToHalfOpenState(re *require.Assertions) *CircuitBreak
return cb
}

func driveQPS(cb *CircuitBreaker[int], count int, overload Overloading, re *require.Assertions) {
func driveQPS(cb *CircuitBreaker, count int, overload Overloading, re *require.Assertions) {
for range count {
err := cb.Execute(func() (Overloading, error) {
return overload, nil
Expand All @@ -249,7 +249,7 @@ func driveQPS(cb *CircuitBreaker[int], count int, overload Overloading, re *requ
}
}

func assertFastFail(cb *CircuitBreaker[int], re *require.Assertions) {
func assertFastFail(cb *CircuitBreaker, re *require.Assertions) {
var executed = false
err := cb.Execute(func() (Overloading, error) {
executed = true
Expand All @@ -259,7 +259,7 @@ func assertFastFail(cb *CircuitBreaker[int], re *require.Assertions) {
re.False(executed)
}

func assertSucceeds(cb *CircuitBreaker[int], re *require.Assertions) {
func assertSucceeds(cb *CircuitBreaker, re *require.Assertions) {
err := cb.Execute(func() (Overloading, error) {
return No, nil
})
Expand Down
1 change: 1 addition & 0 deletions client/pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func UnaryBackofferInterceptor() grpc.UnaryClientInterceptor {
}
}

// UnaryCircuitBreakerInterceptor is a gRPC interceptor that adds a circuit breaker to the call.
func UnaryCircuitBreakerInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
cb := circuitbreaker.FromContext(ctx)
Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2073,7 +2073,7 @@ func TestCircuitBreaker(t *testing.T) {
cli := setupCli(ctx, re, endpoints)
defer cli.Close()

circuitBreaker := cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", circuitBreakerSettings)
circuitBreaker := cb.NewCircuitBreaker("region_meta", circuitBreakerSettings)
ctx = cb.WithCircuitBreaker(ctx, circuitBreaker)
for range 10 {
region, err := cli.GetRegion(ctx, []byte("a"))
Expand Down Expand Up @@ -2128,7 +2128,7 @@ func TestCircuitBreakerOpenAndChangeSettings(t *testing.T) {
cli := setupCli(ctx, re, endpoints)
defer cli.Close()

circuitBreaker := cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", circuitBreakerSettings)
circuitBreaker := cb.NewCircuitBreaker("region_meta", circuitBreakerSettings)
ctx = cb.WithCircuitBreaker(ctx, circuitBreaker)
for range 10 {
region, err := cli.GetRegion(ctx, []byte("a"))
Expand Down Expand Up @@ -2178,7 +2178,7 @@ func TestCircuitBreakerHalfOpenAndChangeSettings(t *testing.T) {
cli := setupCli(ctx, re, endpoints)
defer cli.Close()

circuitBreaker := cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", circuitBreakerSettings)
circuitBreaker := cb.NewCircuitBreaker("region_meta", circuitBreakerSettings)
ctx = cb.WithCircuitBreaker(ctx, circuitBreaker)
for range 10 {
region, err := cli.GetRegion(ctx, []byte("a"))
Expand Down

0 comments on commit 420781e

Please sign in to comment.