Add retry utility
shonfeder committed Aug 21, 2024
1 parent d57b358 commit 9792514
Expand Up @@ -333,6 +333,7 @@ module Process = Process
module Switch = Switch
module Pool = Pool
module Log_matcher = Log_matcher
module Retry = Retry

module Job = struct
include Job
2 changes: 2 additions & 0 deletions lib/current.mli
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,5 @@ module Log_matcher : sig
val drop_all : unit -> unit
val analyse_string : ?job:Job.t -> string -> string option

module Retry = Retry
84 changes: 84 additions & 0 deletions lib/
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
open Lwt.Syntax

let default_sleep_duration n' =
let base_sleep_time = 2.0 in
let n = Int.to_float n' in
let backoff = n *. base_sleep_time *. Float.pow 1.5 n in

type ('retry, 'fatal) error =
[ `Retry of 'retry
| `Fatal of 'fatal

let pp_error : ?retry:'retry Fmt.t -> ?fatal:'fatal Fmt.t -> ('retry, 'fatal) error Fmt.t =
fun ?retry ?fatal fmt err ->
let default fmt _ = fmt "<opaque>" in
let pp_retry = Option.value retry ~default in
let pp_fatal = Option.value fatal ~default in
match err with
| `Retry r -> fmt "retryable error '%a'" pp_retry r
| `Fatal f -> fmt "fatal error '%a'" pp_fatal f

let equal_error ~retry ~fatal a b =
match a, b with
| `Retry a', `Retry b' -> retry a' b'
| `Fatal a', `Fatal b' -> fatal a' b'
| _ -> false

type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result

let is_retryable = function
| Error (`Retry _) -> true
| _ -> false

let on_error
(f : unit -> ('ok, 'retry, 'fatal) attempt Lwt.t)
: ('ok, 'retry, 'fatal) attempt Lwt_stream.t
let stop = ref false in
let attempt () =
if !stop then
let+ result = f () in
stop := not (is_retryable result);
Some result
Lwt_stream.from attempt

let numbered attempts : (int * _) Lwt_stream.t =
let i = ref 0 in
let indexes = Lwt_stream.from_direct (fun () -> let n = !i in incr i; Some n) in
Lwt_stream.combine indexes attempts

let with_sleep ?(duration=default_sleep_duration) attempts =
|> numbered
|> Lwt_stream.map_s (fun (attempt_number, attempt_result) ->
let+ () = Lwt_unix.sleep (duration @@ attempt_number) in

let pp_n_times_error fmt = function
| `Retry _ -> fmt "(exhausted)"
| `Fatal _ -> fmt "(fatal)"

let n_times
: ?pp:('retry, 'fatal) error Fmt.t
-> int
-> ('ok, 'retry, 'fatal) attempt Lwt_stream.t
-> ('a, [`Msg of string]) result Lwt.t
= fun ?pp n strm ->
let pp = match pp with
| Some f -> fun fmt err -> fmt "%a %a" f err pp_n_times_error err
| None -> fun fmt err -> fmt "%a %a" (fun e -> pp_error e) err pp_n_times_error err
let to_or_error attempt : ('a, [`Msg of string]) result =
Result.map_error (fun err -> `Msg (Fmt.to_to_string pp err)) attempt
let+ attempts = strm |> to_or_error |> Lwt_stream.nget n in
match List.rev attempts with
| last :: _ -> last
| _ -> failwith "impossible"
114 changes: 114 additions & 0 deletions lib/retry.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
(** Utilities for retrying Lwt computations *)

type ('retry, 'fatal) error =
[ `Retry of 'retry
| `Fatal of 'fatal
(** The type of errors that a retryable computation can produce.
- [`Retry r] when `r` represents an error that can be retried.
- [`Fatal f] when `f` represents an error that cannot be retried. *)

type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result
(** A [('ok, 'retry, 'fatal) attempt] is an alias for the [result] of a
retryable computation.
- [Ok v] produces a successful value [v]
- [Error err] produces the {!type:error} [err] *)

val pp_error :
?retry:'retry Fmt.t ->
?fatal:'fatal Fmt.t ->
('retry, 'fatal) error Fmt.t
(** [pp_error ~retry ~fatal] is a formatter for {!type:error}s that formats
fatal and retryable errors according to the provided formatters.
If either formatter is not provided, a default formatter will represent the
values as ["<opaque>"]. *)

val equal_error :
retry:('retry -> 'retry -> bool) ->
fatal:('fatal -> 'fatal -> bool) ->
('retry, 'fatal) error ->
('retry, 'fatal) error ->

val on_error :
(unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t
(** [on_error f] is a stream of attempts to compute [f]. The stream will continue until
the computation succeeds or produces a fatal error.
# open Current;;
# let success () = Lwt.return_ok ();;
val success : unit -> (unit, 'a) result Lwt.t = <fun>
# Retry.(success |> on_error) |> Lwt_stream.to_list;;
- : (unit, 'a, 'b) Current.Retry.attempt list = [Ok ()]
# let fatal_failure () = Lwt.return_error (`Fatal ());;
val fatal_failure : unit -> ('a, [> `Fatal of unit ]) result Lwt.t = <fun>
# Retry.(fatal_failure |> on_error) |> Lwt_stream.to_list;;
- : ('a, 'b, unit) Current.Retry.attempt list = [Error (`Fatal ())]
# let retryable_error () = Lwt.return_error (`Retry ());;
val retryable_error : unit -> ('a, [> `Retry of unit ]) result Lwt.t = <fun>
# Retry.(retryable_error |> on_error) |> Lwt_stream.nget 5;;
- : ('a, unit, 'b) Current.Retry.attempt list =
[Error (`Retry ()); Error (`Retry ()); Error (`Retry ()); Error (`Retry ());
Error (`Retry ())]

val with_sleep :
?duration:(int -> float) ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t
(** [with_sleep ~duration attempts] is the stream of [attempts] with a sleep
added after computing each [n]th retryable attempt based on [duration n].
@param duration the optional sleep duration. This defaults to an exponential
backoff computed as n * 2 * (1.5 ^ n), which gives the approximate sequence 0s -> 3s ->
9s -> 20.25 -> 40.5s -> 75.9s -> 136.7...
# let retryable_error () = Lwt.return_error (`Retry ());;
# let attempts_with_sleeps = Retry.(retryable_error |> on_error |> with_sleep);;
# Lwt_stream.get attempts_with_sleeps;;
(* computed immediately *)
Some (Error (`Retry ()))
# Lwt_stream.get attempts_with_sleeps;;
(* after 3 seconds *)
Some (Error (`Retry ()))
# Lwt_stream.get attempts_with_sleeps;;
(* after 9 seconds *)
Some (Error (`Retry ()))
(* a stream a constant 1s sleep between attempts *)
# let attempts_with_constant_sleeps =
Retry.(retryable_error |> on_error |> with_sleep ~duration:(fun _ -> 1.0));;
]} *)

val n_times :
?pp:('retry, 'fatal) error Fmt.t ->
int ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t ->
('ok, [`Msg of string]) result Lwt.t

(** [n_times n attempts] is [Ok v] if one of the [attempts] succeeds within [n]
retries. Otherwise, it is [Error (`Msg msg)] with [msg] derived based on the
error formatter.
@param pp an optional formatter used to produce the error message,
defaulting to {!pp_error}.
# let operation () =
let i = ref 0 in
fun () -> Lwt.return_error (if !i < 3 then (incr i; `Retry !i) else `Fatal "msg");;
# Retry.(operation () |> on_error |> n_times ~pp:(pp_error ~fatal:Fmt.string) 5);;
- : ('a, [ `Msg of string ]) result = Error (`Msg "fatal error 'msg' (fatal)")
]} *)
2 changes: 1 addition & 1 deletion test/dune
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(names test test_monitor test_cache)
(modules test test_monitor test_cache test_job test_log_matcher driver)
(modules test test_monitor test_cache test_job test_log_matcher test_retry driver)
1 change: 1 addition & 0 deletions test/
Original file line number Diff line number Diff line change
Expand Up @@ -388,5 +388,6 @@ let () =
"monitor", Test_monitor.tests;
"job", Test_job.tests;
"log_matcher", Test_log_matcher.tests;
"retry", Test_retry.tests;
163 changes: 163 additions & 0 deletions test/
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
open Lwt.Infix
open Lwt.Syntax

module Retry = Current.Retry

let attempt
(ok : 'ok Alcotest.testable)
(retry : 'retry Alcotest.testable)
(fatal : 'fatal Alcotest.testable)
: ('ok, 'retry, 'fatal) Retry.attempt Alcotest.testable
let pp = Retry.pp_error
~retry:(Alcotest.pp retry)
~fatal:(Alcotest.pp fatal)
let eq = Retry.equal_error
~retry:(Alcotest.equal retry)
~fatal:(Alcotest.equal fatal)
let error = Alcotest.testable pp eq in
Alcotest.result ok error

let err_msg
(err : 'err Alcotest.testable)
: [`Msg of 'err] Alcotest.testable
let pp fmt (`Msg m) = fmt "`Msg %a" (Alcotest.pp err) m in
let eq (`Msg a) (`Msg b) = Alcotest.equal err a b in
Alcotest.testable pp eq

let test_success_without_retry _switch () =
let strm =
Retry.on_error (fun () -> Lwt.return_ok 42)

let msg = "expected success" in
let expected = (Ok 42) in
let* actual = strm in
Alcotest.(check' (attempt int unit unit)) ~msg ~expected ~actual;

let msg = "expected stream to be empty" in
let expected = true in
let+ actual = Lwt_stream.is_empty strm in
Alcotest.(check' bool) ~msg ~expected ~actual

let test_retries _switch () =
let strm =
Retry.on_error (fun () -> Lwt.return_error (`Retry ()))

let msg = "expected 3 retry errors" in
let expected = List.init 3 (fun _ -> Error (`Retry ())) in
let+ actual = Lwt_stream.nget 3 strm in
Alcotest.(check' (list (attempt unit unit unit))) ~msg ~expected ~actual

let test_retries_before_fatal_error _switch () =
let retries_before_fatal = 3 in
let i = ref 0 in
let strm = Retry.on_error
(fun () ->
if !i < retries_before_fatal then (
incr i;
Lwt.return_error (`Retry ())
) else
Lwt.return_error (`Fatal ()))

let msg = "expected 3 retry errors" in
let expected = retries_before_fatal in
let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in
Alcotest.(check' int) ~msg ~expected ~actual;

let msg = "expected fatal error" in
let expected = Error (`Fatal ()) in
let* actual = strm in
Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual;

let msg = "expected stream to be empty" in
let+ stream_is_empty = Lwt_stream.is_empty strm in
Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty

let test_retries_before_success _switch () =
let retries_before_fatal = 3 in
let i = ref 0 in
let strm = Retry.on_error (fun () ->
if !i < retries_before_fatal then (
incr i;
Lwt.return_error (`Retry ())
) else
Lwt.return_ok ()

let msg = "expected 3 retry errors" in
let expected = retries_before_fatal in
let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in
Alcotest.(check' int) ~msg ~expected ~actual;

let msg = "expected success error" in
let expected = Ok () in
let* actual = strm in
Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual;

let msg = "expected stream to be empty" in
let+ stream_is_empty = Lwt_stream.is_empty strm in
Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty

let test_n_times_fatal _switch () =
let i = ref 0 in
let operation () =
if !i < 3 then (
incr i;
Lwt.return_error (`Retry ())
) else
Lwt.return_error (`Fatal ())
let msg = "expected fatal error message" in
let expected = `Msg "fatal error '<opaque>' (fatal)" in
let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in
Alcotest.(check' (err_msg string)) ~msg ~expected ~actual

let test_n_times_exhaustion _switch () =
let operation () = Lwt.return_error (`Retry ()) in
let msg = "expected exhaustion error message" in
let expected = `Msg "retriable error '<opaque>' (exhausted)" in
let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in
Alcotest.(check' (err_msg string)) ~msg ~expected ~actual

let test_n_times_success _switch () =
let i = ref 0 in
let operation () =
if !i < 3 then (
incr i;
Lwt.return_error (`Retry ())
) else
Lwt.return_ok ()
try Retry.(operation |> on_error |> n_times 5) >|= Result.get_ok
with Invalid_argument _ -> "expected Ok result"

(* test that the sleeps actually do throttle the computations *)
let test_with_sleep _switch () =
let duration _ = 0.1 in
let racing_operation = Lwt_unix.sleep 0.2 >|= Result.ok in
let operation () = Lwt.return_error (`Retry ()) in
let retries = Retry.(operation |> on_error |> with_sleep ~duration |> n_times 5) in
(* If [with_sleep] is removed the test fails, as expected *)
let msg = "expected racing_operation to complete before the retries with sleeps" in
let expected = Ok () in
let+ actual = Lwt.choose [racing_operation; retries] in
Alcotest.(check' (result unit (err_msg string))) ~msg ~expected ~actual

let tests =
Alcotest_lwt.test_case "test_success_without_retry" `Quick test_success_without_retry;
Alcotest_lwt.test_case "test_retries" `Quick test_retries;
Alcotest_lwt.test_case "test_retries_before_fatal_error" `Quick test_retries_before_fatal_error;
Alcotest_lwt.test_case "test_retries_before_success" `Quick test_retries_before_success;
Alcotest_lwt.test_case "test_n_times_fatal" `Quick test_n_times_fatal;
Alcotest_lwt.test_case "test_n_times_exhaustion" `Quick test_n_times_exhaustion;
Alcotest_lwt.test_case "test_n_times_success" `Quick test_n_times_success;
Alcotest_lwt.test_case "test_with_sleep" `Quick test_with_sleep;
1 change: 1 addition & 0 deletions test/test_retry.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
val tests : unit Alcotest_lwt.test_case list

