diff --git a/client/circuitbreaker/circuit_breaker.go b/client/circuitbreaker/circuit_breaker.go index f2f23e5f977..b5a4c53ebb5 100644 --- a/client/circuitbreaker/circuit_breaker.go +++ b/client/circuitbreaker/circuit_breaker.go @@ -52,8 +52,8 @@ type Settings struct { HalfOpenSuccessCount uint32 } -// AlwaysOpenSettings is a configuration that never trips the circuit breaker. -var AlwaysOpenSettings = Settings{ +// AlwaysClosedSettings is a configuration that never trips the circuit breaker. +var AlwaysClosedSettings = Settings{ ErrorRateThresholdPct: 0, // never trips ErrorRateWindow: 10 * time.Second, // effectively results in testing for new settings every 10 seconds MinQPSForOpen: 10, diff --git a/client/circuitbreaker/circuit_breaker_test.go b/client/circuitbreaker/circuit_breaker_test.go index 412c7fc43ab..ca77b7f9f99 100644 --- a/client/circuitbreaker/circuit_breaker_test.go +++ b/client/circuitbreaker/circuit_breaker_test.go @@ -68,7 +68,7 @@ func TestCircuitBreaker_OpenState(t *testing.T) { re.Equal(StateOpen, cb.state.stateType) } -func TestCircuitBreaker_OpenState_Not_Enough_QPS(t *testing.T) { +func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -78,7 +78,7 @@ func TestCircuitBreaker_OpenState_Not_Enough_QPS(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_OpenState_Not_Enough_Error_Rate(t *testing.T) { +func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -176,12 +176,47 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { re.Equal(uint32(1), cb.state.successCount) } +func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) { + re := require.New(t) + cb := NewCircuitBreaker[int]("test_cb", settings) + re.Equal(StateClosed, cb.state.stateType) + + start := make(chan bool) + wait := make(chan bool) + end := make(chan bool) + go func() { + defer func() { + end <- true + }() + _, err := cb.Execute(func() (int, Overloading, error) { + start <- true + <-wait + return 42, No, nil + }) + re.NoError(err) + }() + <-start // make sure the request is started + // assert running request is not counted + re.Equal(uint32(0), cb.state.successCount) + + // advance request to the next window + cb.advance(settings.ErrorRateWindow) + assertSucceeds(cb, re) + re.Equal(uint32(1), cb.state.successCount) + + // complete the request from the previous window + wait <- true // resume + <-end // wait for the request to complete + // assert request from last window is not counted + re.Equal(uint32(1), cb.state.successCount) +} + func TestCircuitBreaker_ChangeSettings(t *testing.T) { re := require.New(t) - cb := NewCircuitBreaker[int]("test_cb", AlwaysOpenSettings) - driveQPS(cb, int(AlwaysOpenSettings.MinQPSForOpen*uint32(AlwaysOpenSettings.ErrorRateWindow.Seconds())), Yes, re) - cb.advance(AlwaysOpenSettings.ErrorRateWindow) + cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings) + driveQPS(cb, int(AlwaysClosedSettings.MinQPSForOpen*uint32(AlwaysClosedSettings.ErrorRateWindow.Seconds())), Yes, re) + cb.advance(AlwaysClosedSettings.ErrorRateWindow) assertSucceeds(cb, re) re.Equal(StateClosed, cb.state.stateType) diff --git a/client/opt/option.go b/client/opt/option.go index ff79f104154..9a80a895cc0 100644 --- a/client/opt/option.go +++ b/client/opt/option.go @@ -82,7 +82,7 @@ func NewOption() *Option { MaxRetryTimes: maxInitClusterRetries, EnableTSOFollowerProxyCh: make(chan struct{}, 1), InitMetrics: true, - RegionMetaCircuitBreakerSettings: cb.AlwaysOpenSettings, + RegionMetaCircuitBreakerSettings: cb.AlwaysClosedSettings, } co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)