Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx committed Dec 19, 2024
1 parent 579f67b commit 0f13444
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 23 deletions.
36 changes: 18 additions & 18 deletions client/pkg/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ var AlwaysClosedSettings = Settings{
}

// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker[T any] struct {
type CircuitBreaker struct {
config *Settings
name string

mutex sync.Mutex
state *State[T]
state *State

successCounter prometheus.Counter
errorCounter prometheus.Counter
Expand Down Expand Up @@ -103,8 +103,8 @@ func (s StateType) String() string {
var replacer = strings.NewReplacer(" ", "_", "-", "_")

// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker[T any](name string, st Settings) *CircuitBreaker[T] {
cb := new(CircuitBreaker[T])
func NewCircuitBreaker(name string, st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = name
cb.config = &st
cb.state = cb.newState(time.Now(), StateClosed)
Expand All @@ -119,7 +119,7 @@ func NewCircuitBreaker[T any](name string, st Settings) *CircuitBreaker[T] {

// ChangeSettings changes the CircuitBreaker settings.
// The changes will be reflected only in the next evaluation window.
func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
func (cb *CircuitBreaker) ChangeSettings(apply func(config *Settings)) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -130,7 +130,7 @@ func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
// Execute returns an error instantly if the CircuitBreaker is open.
// https://github.com/tikv/rfcs/blob/master/text/0115-circuit-breaker.md
func (cb *CircuitBreaker[T]) Execute(call func() (Overloading, error)) error {
func (cb *CircuitBreaker) Execute(call func() (Overloading, error)) error {
state, err := cb.onRequest()
if err != nil {
cb.fastFailCounter.Inc()
Expand All @@ -152,7 +152,7 @@ func (cb *CircuitBreaker[T]) Execute(call func() (Overloading, error)) error {
return err
}

func (cb *CircuitBreaker[T]) onRequest() (*State[T], error) {
func (cb *CircuitBreaker) onRequest() (*State, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -161,7 +161,7 @@ func (cb *CircuitBreaker[T]) onRequest() (*State[T], error) {
return state, err
}

func (cb *CircuitBreaker[T]) onResult(state *State[T], overloaded Overloading) {
func (cb *CircuitBreaker) onResult(state *State, overloaded Overloading) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

Expand All @@ -170,7 +170,7 @@ func (cb *CircuitBreaker[T]) onResult(state *State[T], overloaded Overloading) {
state.onResult(overloaded)
}

func (cb *CircuitBreaker[T]) emitMetric(overloaded Overloading, err error) {
func (cb *CircuitBreaker) emitMetric(overloaded Overloading, err error) {
switch overloaded {
case No:
cb.successCounter.Inc()
Expand All @@ -185,9 +185,9 @@ func (cb *CircuitBreaker[T]) emitMetric(overloaded Overloading, err error) {
}

// State represents the state of CircuitBreaker.
type State[T any] struct {
type State struct {
stateType StateType
cb *CircuitBreaker[T]
cb *CircuitBreaker
end time.Time

pendingCount uint32
Expand All @@ -196,7 +196,7 @@ type State[T any] struct {
}

// newState creates a new State with the given configuration and reset all success/failure counters.
func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State[T] {
func (cb *CircuitBreaker) newState(now time.Time, stateType StateType) *State {
var end time.Time
var pendingCount uint32
switch stateType {
Expand All @@ -211,7 +211,7 @@ func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State
default:
panic("unknown state")
}
return &State[T]{
return &State{
cb: cb,
stateType: stateType,
pendingCount: pendingCount,
Expand All @@ -227,7 +227,7 @@ func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State
// Open state fails all request, it has a fixed duration of `Settings.CoolDownInterval` and always moves to HalfOpen state at the end of the interval.
// HalfOpen state does not have a fixed duration and lasts till `Settings.HalfOpenSuccessCount` are evaluated.
// If any of `Settings.HalfOpenSuccessCount` fails then it moves back to Open state, otherwise it moves to Closed state.
func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
func (s *State) onRequest(cb *CircuitBreaker) (*State, error) {
var now = time.Now()
switch s.stateType {
case StateClosed:
Expand Down Expand Up @@ -299,7 +299,7 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
}
}

func (s *State[T]) onResult(overloaded Overloading) {
func (s *State) onResult(overloaded Overloading) {
switch overloaded {
case No:
s.successCount++
Expand All @@ -317,17 +317,17 @@ type cbCtxKey struct{}
var CircuitBreakerKey = cbCtxKey{}

// FromContext retrieves the circuit breaker from the context
func FromContext[T any](ctx context.Context) *CircuitBreaker[T] {
func FromContext(ctx context.Context) *CircuitBreaker {
if ctx == nil {
return nil
}
if cb, ok := ctx.Value(CircuitBreakerKey).(*CircuitBreaker[T]); ok {
if cb, ok := ctx.Value(CircuitBreakerKey).(*CircuitBreaker); ok {
return cb
}
return nil
}

// WithCircuitBreaker stores the circuit breaker into a new context
func WithCircuitBreaker[T any](ctx context.Context, cb *CircuitBreaker[T]) context.Context {
func WithCircuitBreaker(ctx context.Context, cb *CircuitBreaker) context.Context {
return context.WithValue(ctx, CircuitBreakerKey, cb)
}
2 changes: 1 addition & 1 deletion client/pkg/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCircuitBreakerHalfOpenFailOverPendingCount(t *testing.T) {

func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
cb := NewCircuitBreaker("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)

start := make(chan bool)
Expand Down
7 changes: 3 additions & 4 deletions client/pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"

"github.com/tikv/pd/client/errs"
Expand Down Expand Up @@ -75,9 +74,9 @@ func UnaryBackofferInterceptor() grpc.UnaryClientInterceptor {
}
}

func UnaryCircuitBreakerInterceptor[T any]() grpc.UnaryClientInterceptor {
func UnaryCircuitBreakerInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
cb := circuitbreaker.FromContext[*pdpb.GetRegionResponse](ctx)
cb := circuitbreaker.FromContext(ctx)
if cb == nil {
return invoker(ctx, method, req, reply, cc, opts...)
}
Expand Down Expand Up @@ -132,7 +131,7 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g
retryOpt := grpc.WithChainUnaryInterceptor(UnaryBackofferInterceptor())

// Add circuit breaker interceptor
cbOpt := grpc.WithChainUnaryInterceptor(UnaryCircuitBreakerInterceptor[any]())
cbOpt := grpc.WithChainUnaryInterceptor(UnaryCircuitBreakerInterceptor())

// Add retry related connection parameters
backoffOpts := grpc.WithConnectParams(grpc.ConnectParams{
Expand Down

0 comments on commit 0f13444

Please sign in to comment.