From 420781e54f72a5ca7056c5631b590e7a50386c72 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 24 Dec 2024 18:00:55 +0800 Subject: [PATCH] fix Signed-off-by: Ryan Leung --- .../circuitbreaker/circuit_breaker_test.go | 26 +++++++++---------- client/pkg/utils/grpcutil/grpcutil.go | 1 + tests/integrations/client/client_test.go | 6 ++--- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/client/pkg/circuitbreaker/circuit_breaker_test.go b/client/pkg/circuitbreaker/circuit_breaker_test.go index 707335c59b7..e62e55c1ab8 100644 --- a/client/pkg/circuitbreaker/circuit_breaker_test.go +++ b/client/pkg/circuitbreaker/circuit_breaker_test.go @@ -24,7 +24,7 @@ import ( ) // advance emulate the state machine clock moves forward by the given duration -func (cb *CircuitBreaker[T]) advance(duration time.Duration) { +func (cb *CircuitBreaker) advance(duration time.Duration) { cb.state.end = cb.state.end.Add(-duration - 1) } @@ -40,7 +40,7 @@ var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindo func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", settings) + cb := NewCircuitBreaker("test_cb", settings) originalError := errors.New("circuit breaker is open") err := cb.Execute(func() (Overloading, error) { @@ -57,7 +57,7 @@ func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) { func TestCircuitBreakerOpenState(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", settings) + cb := NewCircuitBreaker("test_cb", settings) driveQPS(cb, minCountToOpen, Yes, re) re.Equal(StateClosed, cb.state.stateType) assertSucceeds(cb, re) // no error till ErrorRateWindow is finished @@ -68,7 +68,7 @@ func TestCircuitBreakerOpenState(t *testing.T) { func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", settings) + cb := NewCircuitBreaker("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) driveQPS(cb, minCountToOpen/2, Yes, re) cb.advance(settings.ErrorRateWindow) @@ -78,7 +78,7 @@ func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) { func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", settings) + cb := NewCircuitBreaker("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) driveQPS(cb, minCountToOpen/4, Yes, re) driveQPS(cb, minCountToOpen, No, re) @@ -89,7 +89,7 @@ func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) { func TestCircuitBreakerHalfOpenToClosed(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", settings) + cb := NewCircuitBreaker("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) driveQPS(cb, minCountToOpen, Yes, re) cb.advance(settings.ErrorRateWindow) @@ -107,7 +107,7 @@ func TestCircuitBreakerHalfOpenToClosed(t *testing.T) { func TestCircuitBreakerHalfOpenToOpen(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", settings) + cb := NewCircuitBreaker("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) driveQPS(cb, minCountToOpen, Yes, re) cb.advance(settings.ErrorRateWindow) @@ -212,7 +212,7 @@ func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) { func TestCircuitBreakerChangeSettings(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings) + cb := NewCircuitBreaker("test_cb", AlwaysClosedSettings) driveQPS(cb, int(AlwaysClosedSettings.MinQPSForOpen*uint32(AlwaysClosedSettings.ErrorRateWindow.Seconds())), Yes, re) cb.advance(AlwaysClosedSettings.ErrorRateWindow) assertSucceeds(cb, re) @@ -229,8 +229,8 @@ func TestCircuitBreakerChangeSettings(t *testing.T) { re.Equal(StateOpen, cb.state.stateType) } -func newCircuitBreakerMovedToHalfOpenState(re *require.Assertions) *CircuitBreaker[int] { - cb := NewCircuitBreaker[int]("test_cb", settings) +func newCircuitBreakerMovedToHalfOpenState(re *require.Assertions) *CircuitBreaker { + cb := NewCircuitBreaker("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) driveQPS(cb, minCountToOpen, Yes, re) cb.advance(settings.ErrorRateWindow) @@ -240,7 +240,7 @@ func newCircuitBreakerMovedToHalfOpenState(re *require.Assertions) *CircuitBreak return cb } -func driveQPS(cb *CircuitBreaker[int], count int, overload Overloading, re *require.Assertions) { +func driveQPS(cb *CircuitBreaker, count int, overload Overloading, re *require.Assertions) { for range count { err := cb.Execute(func() (Overloading, error) { return overload, nil @@ -249,7 +249,7 @@ func driveQPS(cb *CircuitBreaker[int], count int, overload Overloading, re *requ } } -func assertFastFail(cb *CircuitBreaker[int], re *require.Assertions) { +func assertFastFail(cb *CircuitBreaker, re *require.Assertions) { var executed = false err := cb.Execute(func() (Overloading, error) { executed = true @@ -259,7 +259,7 @@ func assertFastFail(cb *CircuitBreaker[int], re *require.Assertions) { re.False(executed) } -func assertSucceeds(cb *CircuitBreaker[int], re *require.Assertions) { +func assertSucceeds(cb *CircuitBreaker, re *require.Assertions) { err := cb.Execute(func() (Overloading, error) { return No, nil }) diff --git a/client/pkg/utils/grpcutil/grpcutil.go b/client/pkg/utils/grpcutil/grpcutil.go index 5819279e774..235e1088747 100644 --- a/client/pkg/utils/grpcutil/grpcutil.go +++ b/client/pkg/utils/grpcutil/grpcutil.go @@ -74,6 +74,7 @@ func UnaryBackofferInterceptor() grpc.UnaryClientInterceptor { } } +// UnaryCircuitBreakerInterceptor is a gRPC interceptor that adds a circuit breaker to the call. func UnaryCircuitBreakerInterceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { cb := circuitbreaker.FromContext(ctx) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index cdab9e81991..c29ef40a83b 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -2073,7 +2073,7 @@ func TestCircuitBreaker(t *testing.T) { cli := setupCli(ctx, re, endpoints) defer cli.Close() - circuitBreaker := cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", circuitBreakerSettings) + circuitBreaker := cb.NewCircuitBreaker("region_meta", circuitBreakerSettings) ctx = cb.WithCircuitBreaker(ctx, circuitBreaker) for range 10 { region, err := cli.GetRegion(ctx, []byte("a")) @@ -2128,7 +2128,7 @@ func TestCircuitBreakerOpenAndChangeSettings(t *testing.T) { cli := setupCli(ctx, re, endpoints) defer cli.Close() - circuitBreaker := cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", circuitBreakerSettings) + circuitBreaker := cb.NewCircuitBreaker("region_meta", circuitBreakerSettings) ctx = cb.WithCircuitBreaker(ctx, circuitBreaker) for range 10 { region, err := cli.GetRegion(ctx, []byte("a")) @@ -2178,7 +2178,7 @@ func TestCircuitBreakerHalfOpenAndChangeSettings(t *testing.T) { cli := setupCli(ctx, re, endpoints) defer cli.Close() - circuitBreaker := cb.NewCircuitBreaker[*pdpb.GetRegionResponse]("region_meta", circuitBreakerSettings) + circuitBreaker := cb.NewCircuitBreaker("region_meta", circuitBreakerSettings) ctx = cb.WithCircuitBreaker(ctx, circuitBreaker) for range 10 { region, err := cli.GetRegion(ctx, []byte("a"))