Skip to content

Commit

Permalink
Circuit Breaker metrics (#403)
Browse files Browse the repository at this point in the history
* Expose CircuitBreaker state changes via a Dequeue

* Add metrics to CircuitBreaker

* More metrics + doc

* Use MetricLabels and add 'rezilience_' prefix

* First test. Remove description to get the right metric in the test

* Another test + fix

* Tweak

* Add initial value test

* Fix

* Adjust example

* Fix

* Fix

* Fix tests
  • Loading branch information
svroonland authored Mar 3, 2024
1 parent 10659df commit 35e43ad
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 100 deletions.
26 changes: 18 additions & 8 deletions docs/docs/docs/circuitbreaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,21 @@ ZIO.scoped {
You may want to monitor circuit breaker failures and trigger alerts when the circuit breaker trips. For this purpose, CircuitBreaker publishes state changes via a callback provided to `make`. Usage:

```scala mdoc:silent
CircuitBreaker.make(
trippingStrategy = TrippingStrategy.failureCount(maxFailures = 10),
onStateChange = (s: State) => ZIO.debug(s"State changed to ${s}").ignore
).flatMap { circuitBreaker =>
// Make calls to an external system
circuitBreaker(ZIO.unit) // etc
}
```
import zio.stream._

CircuitBreaker
.make(trippingStrategy = TrippingStrategy.failureCount(maxFailures = 10))
.tap(cb =>
cb.stateChanges.flatMap(
ZStream
.fromQueue(_)
.mapZIO(stateChange => ZIO.debug(s"State changed from ${stateChange.from} to ${stateChange.to}"))
.runDrain
.forkScoped
)
)
.flatMap { circuitBreaker =>
// Make calls to an external system
circuitBreaker(ZIO.unit) // etc
}
```
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package nl.vroste.rezilience.config

import nl.vroste.rezilience
import nl.vroste.rezilience.CircuitBreaker.{ isFailureAny, State }
import nl.vroste.rezilience.CircuitBreaker.isFailureAny
import nl.vroste.rezilience.config.CircuitBreakerConfig.{ ResetSchedule, TrippingStrategy }
import nl.vroste.rezilience.{ CircuitBreaker, Retry }
import zio.config._
import zio.{ Scope, UIO, ZIO }
import zio.{ Scope, ZIO }

trait CircuitBreakerFromConfigSyntax {
implicit class CircuitBreakerExtensions(self: CircuitBreaker.type) {
def fromConfig[E](
source: ConfigSource,
isFailure: PartialFunction[E, Boolean] = isFailureAny[E],
onStateChange: State => UIO[Unit] = _ => ZIO.unit
isFailure: PartialFunction[E, Boolean] = isFailureAny[E]
): ZIO[Scope, ReadError[String], CircuitBreaker[E]] =
for {
config <- read(CircuitBreakerConfig.descriptor from source)
Expand All @@ -36,7 +35,7 @@ trait CircuitBreakerFromConfigSyntax {
case ResetSchedule.ExponentialBackoff(min, max, factor) =>
Retry.Schedules.exponentialBackoff(min, max, factor)
}
cb <- self.make[E](trippingStrategy, resetSchedule, isFailure, onStateChange)
cb <- self.make[E](trippingStrategy, resetSchedule, isFailure)
} yield cb
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package nl.vroste.rezilience

import nl.vroste.rezilience.CircuitBreaker.CircuitBreakerCallError
import nl.vroste.rezilience.CircuitBreaker.{ CircuitBreakerCallError, State, StateChange }
import nl.vroste.rezilience.Policy.PolicyError
import zio._
import zio.metrics.{ Metric, MetricLabel }
import zio.stream.ZStream

import java.time.Instant

/**
* CircuitBreaker protects external resources against overload under failure
*
Expand Down Expand Up @@ -68,12 +71,23 @@ trait CircuitBreaker[-E] {
* A new CircuitBreaker defined for failures of type E2
*/
def widen[E2](pf: PartialFunction[E2, E]): CircuitBreaker[E2]

/**
* Stream of Circuit Breaker state changes
*
* Is backed by a zio.Hub, so each use of the Dequeue will receive all state changes
*/
val stateChanges: ZIO[Scope, Nothing, Dequeue[StateChange]]

def currentState: UIO[State]
}

object CircuitBreaker {

import State._

case class StateChange(from: State, to: State, at: Instant)

sealed trait CircuitBreakerCallError[+E] { self =>
def toException: Exception = CircuitBreakerException(self)

Expand Down Expand Up @@ -106,18 +120,15 @@ object CircuitBreaker {
* @param isFailure
* Only failures that match according to `isFailure` are treated as failures by the circuit breaker. Other failures
* are passed on, circumventing the circuit breaker's failure counter.
* @param onStateChange
* Observer for circuit breaker state changes
* @return
* The CircuitBreaker as a managed resource
*/
def withMaxFailures[E](
maxFailures: Int,
resetPolicy: Schedule[Any, Any, Any] = Retry.Schedules.exponentialBackoff(1.second, 1.minute),
isFailure: PartialFunction[E, Boolean] = isFailureAny[E],
onStateChange: State => UIO[Unit] = _ => ZIO.unit
isFailure: PartialFunction[E, Boolean] = isFailureAny[E]
): ZIO[Scope, Nothing, CircuitBreaker[E]] =
make(TrippingStrategy.failureCount(maxFailures), resetPolicy, isFailure, onStateChange)
make(TrippingStrategy.failureCount(maxFailures), resetPolicy, isFailure)

/**
* Create a CircuitBreaker with the given tripping strategy
Expand All @@ -129,40 +140,39 @@ object CircuitBreaker {
* @param isFailure
* Only failures that match according to `isFailure` are treated as failures by the circuit breaker. Other failures
* are passed on, circumventing the circuit breaker's failure counter.
* @param onStateChange
* Observer for circuit breaker state changes
* @return
*/
def make[E](
trippingStrategy: ZIO[Scope, Nothing, TrippingStrategy],
resetPolicy: Schedule[Any, Any, Any] =
Retry.Schedules.exponentialBackoff(1.second, 1.minute), // TODO should move to its own namespace
isFailure: PartialFunction[E, Boolean] = isFailureAny[E],
onStateChange: State => UIO[Unit] = _ => ZIO.unit
isFailure: PartialFunction[E, Boolean] = isFailureAny[E]
): ZIO[Scope, Nothing, CircuitBreaker[E]] =
for {
strategy <- trippingStrategy
state <- Ref.make[State](Closed)
halfOpenSwitch <- Ref.make[Boolean](true)
schedule <- resetPolicy.driver
resetRequests <- Queue.bounded[Unit](1)
_ <- ZStream
.fromQueue(resetRequests)
.mapZIO { _ =>
for {
_ <- schedule.next(()) // TODO handle schedule completion?
_ <- halfOpenSwitch.set(true)
_ <- state.set(HalfOpen)
_ <- onStateChange(HalfOpen).fork // Do not wait for user code
} yield ()
}
.runDrain
.forkScoped
strategy <- trippingStrategy
state <- Ref.make[State](Closed)
halfOpenSwitch <- Ref.make[Boolean](true)
schedule <- resetPolicy.driver
resetRequests <- Queue.bounded[Unit](1)
stateChangesHub <- Hub.sliding[StateChange](32).withFinalizer(_.shutdown)
_ <- ZStream
.fromQueue(resetRequests)
.mapZIO { _ =>
for {
_ <- schedule.next(()) // TODO handle schedule completion?
_ <- halfOpenSwitch.set(true)
_ <- state.set(HalfOpen)
now <- ZIO.clockWith(_.instant)
_ <- stateChangesHub.publish(StateChange(Open, HalfOpen, now))
} yield ()
}
.runDrain
.forkScoped
} yield new CircuitBreakerImpl[resetPolicy.State, E](
state,
resetRequests,
strategy,
onStateChange,
stateChangesHub,
schedule,
isFailure,
halfOpenSwitch
Expand All @@ -172,20 +182,27 @@ object CircuitBreaker {
state: Ref[State],
resetRequests: Queue[Unit],
strategy: TrippingStrategy,
onStateChange: State => UIO[Unit],
stateChangesHub: Hub[StateChange],
schedule: Schedule.Driver[ScheduleState, Any, Any, Any],
isFailure: PartialFunction[E, Boolean],
halfOpenSwitch: Ref[Boolean]
) extends CircuitBreaker[E] {

val changeToOpen = state.set(Open) *>
resetRequests.offer(()) <*
onStateChange(Open).fork // Do not wait for user code
val changeToOpen: ZIO[Any, Nothing, Unit] =
for {
oldState <- state.getAndSet(Open)
_ <- resetRequests.offer(())
now <- ZIO.clockWith(_.instant)
_ <- stateChangesHub.publish(StateChange(oldState, Open, now))
} yield ()

val changeToClosed = strategy.onReset *>
schedule.reset *>
state.set(Closed) <*
onStateChange(Closed).fork // Do not wait for user code
val changeToClosed: ZIO[Any, Nothing, Unit] = for {
_ <- strategy.onReset
_ <- schedule.reset
now <- ZIO.clockWith(_.instant)
oldState <- state.getAndSet(Closed)
_ <- stateChangesHub.publish(StateChange(oldState, Closed, now))
} yield ()

override def apply[R, E1 <: E, A](f: ZIO[R, E1, A]): ZIO[R, CircuitBreakerCallError[E1], A] =
for {
Expand Down Expand Up @@ -240,13 +257,106 @@ object CircuitBreaker {
state,
resetRequests,
strategy,
onStateChange,
stateChangesHub,
schedule,
pf andThen isFailure,
halfOpenSwitch
)

override def currentState: UIO[State] = state.get

override val stateChanges: ZIO[Scope, Nothing, Dequeue[StateChange]] = stateChangesHub.subscribe
}

private[rezilience] def isFailureAny[E]: PartialFunction[E, Boolean] = { case _ => true }

private[rezilience] case class CircuitBreakerMetrics(
state: Metric.Gauge[Double],
nrStateChanges: Metric.Counter[Long],
callsSuccess: Metric.Counter[Long],
callsFailure: Metric.Counter[Long],
callsRejected: Metric.Counter[Long]
)

/**
* Takes an existing CircuitBreaker and returns a new one that records metrics
*
* Metrics are
* - rezilience_circuit_breaker_state: current state (0 = closed, 1 = half-open, 2 = open)
* - rezilience_circuit_breaker_state_changes: number of state changes
* - rezilience_circuit_breaker_calls_success: number of successful calls
* - rezilience_circuit_breaker_calls_failure: number of failed calls
* - rezilience_circuit_breaker_calls_rejected: number of calls rejected in the open state
*
* Be sure to use only the returned CircuitBreaker and not the one given as parameter, otherwise no metrics will be
* recorded. Recommended usage is to create it in go, eg `cb <-
* CircuitBreaker.withMaxFailures(10).flatMap(CircuitBreaker.withMetrics(_, labels))`
*
* @param circuitBreaker
* Existing CircuitBreaker
* @param labels
* Set of labels to annotate metrics with, to distinguish this circuit breaker from others in the same application.
*
* @return
* CircuitBreaker that records metrics
*/
def withMetrics[E](
circuitBreaker: CircuitBreaker[E],
labels: Set[MetricLabel]
): ZIO[Scope, Nothing, CircuitBreakerWithMetrics[E]] = {

val metrics = CircuitBreakerMetrics(
state = Metric
.gauge("rezilience_circuit_breaker_state")
.tagged(labels),
nrStateChanges = Metric.counter("rezilience_circuit_breaker_state_changes").tagged(labels),
callsSuccess = Metric.counter("rezilience_circuit_breaker_calls_success").tagged(labels),
callsFailure = Metric.counter("rezilience_circuit_breaker_calls_failure").tagged(labels),
callsRejected = Metric
.counter("rezilience_circuit_breaker_calls_rejected")
.tagged(labels)
)

for {
stateChanges <- circuitBreaker.stateChanges
_ <- ZStream
.fromQueue(stateChanges)
.tap { stateChange =>
val stateAsInt = stateChange.to match {
case State.Closed => 0
case State.HalfOpen => 1
case State.Open => 2
}

metrics.nrStateChanges.increment *> metrics.state.set(stateAsInt.doubleValue)
}
.runDrain
.forkScoped
} yield new CircuitBreakerWithMetrics[E](circuitBreaker, metrics)
}

private[rezilience] case class CircuitBreakerWithMetrics[E](
circuitBreaker: CircuitBreaker[E],
metrics: CircuitBreakerMetrics
) extends CircuitBreaker[E] {
override def apply[R, E1 <: E, A](
f: ZIO[R, E1, A]
): ZIO[R, CircuitBreakerCallError[E1], A] = circuitBreaker
.apply(f)
.tapBoth(
{
case CircuitBreaker.CircuitBreakerOpen => metrics.callsRejected.increment
case CircuitBreaker.WrappedError(_) => metrics.callsFailure.increment
},
_ => metrics.callsSuccess.increment
)

override def widen[E2](pf: PartialFunction[E2, E]): CircuitBreaker[E2] =
CircuitBreakerWithMetrics(circuitBreaker.widen[E2](pf), metrics)

override def currentState: UIO[State] = circuitBreaker.currentState

override val stateChanges: ZIO[Scope, Nothing, Dequeue[StateChange]] = circuitBreaker.stateChanges
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package nl.vroste.rezilience
import nl.vroste.rezilience.Bulkhead.{ BulkheadError, Metrics }
import nl.vroste.rezilience.CircuitBreaker.CircuitBreakerCallError
import nl.vroste.rezilience.Policy.{ flattenWrappedError, PolicyError }
import zio.{ UIO, ZIO }
import zio.{ Queue, UIO, ZIO }

/**
* Represents a composition of one or more rezilience policies
Expand Down Expand Up @@ -92,6 +92,10 @@ object Policy {
override def apply[R, E1 <: E, A](f: ZIO[R, E1, A]): ZIO[R, CircuitBreakerCallError[E1], A] =
f.mapError(CircuitBreaker.WrappedError(_))
override def widen[E2](pf: PartialFunction[E2, E]): CircuitBreaker[E2] = new NoopCircuitBreaker[E2]

override val stateChanges = Queue.bounded(1)

override def currentState: UIO[CircuitBreaker.State] = ZIO.succeed(CircuitBreaker.State.Closed)
}

def noopCircuitBreaker[E]: CircuitBreaker[E] = new NoopCircuitBreaker[E]
Expand Down
Loading

0 comments on commit 35e43ad

Please sign in to comment.