diff --git a/lib/current.ml b/lib/current.ml index eb7b54de..8d9bfabc 100644 --- a/lib/current.ml +++ b/lib/current.ml @@ -333,6 +333,7 @@ module Process = Process module Switch = Switch module Pool = Pool module Log_matcher = Log_matcher +module Retry = Retry module Job = struct include Job diff --git a/lib/current.mli b/lib/current.mli index ca0d8e84..47ece6bc 100644 --- a/lib/current.mli +++ b/lib/current.mli @@ -460,3 +460,5 @@ module Log_matcher : sig val drop_all : unit -> unit val analyse_string : ?job:Job.t -> string -> string option end + +module Retry = Retry diff --git a/lib/retry.ml b/lib/retry.ml new file mode 100644 index 00000000..3f920950 --- /dev/null +++ b/lib/retry.ml @@ -0,0 +1,72 @@ +open Lwt.Syntax + +let default_sleep_duration n' = + let base_sleep_time = 2.0 in + let n = Int.to_float n' in + n *. base_sleep_time *. Float.pow 1.5 n + +type ('retry, 'fatal) error = + [ `Retry of 'retry + | `Fatal of 'fatal + ] + +let pp_error ?retry ?fatal fmt err = + let default fmt _ = Fmt.pf fmt "" in + let pp_retry = Option.value retry ~default in + let pp_fatal = Option.value fatal ~default in + match err with + | `Retry r -> Fmt.pf fmt "retryable error '%a'" pp_retry r + | `Fatal f -> Fmt.pf fmt "fatal error '%a'" pp_fatal f + +let equal_error ~retry ~fatal a b = + match a, b with + | `Retry a', `Retry b' -> retry a' b' + | `Fatal a', `Fatal b' -> fatal a' b' + | _ -> false + +type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result + +let is_retryable = function + | Error (`Retry _) -> true + | _ -> false + +let on_error + (f : unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) + : ('ok, 'retry, 'fatal) attempt Lwt_stream.t + = + let stop = ref false in + let attempt () = + if !stop then + Lwt.return_none + else + let+ result = f () in + stop := not (is_retryable result); + Some result + in + Lwt_stream.from attempt + +let numbered attempts : (int * _) Lwt_stream.t = + let i = ref 0 in + let indexes = Lwt_stream.from_direct (fun () -> let n = !i in incr i; Some n) in + Lwt_stream.combine indexes attempts + +let with_sleep ?(duration=default_sleep_duration) attempts = + attempts + |> numbered + |> Lwt_stream.map_s (fun (attempt_number, attempt_result) -> + let+ () = attempt_number |> duration |> Lwt_unix.sleep in + attempt_result) + +let pp_n_times_error fmt = function + | `Retry _ -> Fmt.pf fmt "(exhausted)" + | `Fatal _ -> Fmt.pf fmt "(fatal)" + +let n_times ?(pp = (fun e -> pp_error e)) n attempts + : (_, [`Msg of string]) result Lwt.t + = + let to_msg err : [`Msg of string] = + Fmt.kstr (fun m -> `Msg m) "%a %a" pp err pp_n_times_error err in + let+ attempts = Lwt_stream.nget n attempts in + match List.rev attempts with + | last :: _ -> last |> Result.map_error to_msg (* a [_ Current.or_error] *) + | _ -> failwith "impossible" diff --git a/lib/retry.mli b/lib/retry.mli new file mode 100644 index 00000000..12ab02f4 --- /dev/null +++ b/lib/retry.mli @@ -0,0 +1,117 @@ +(** Utilities for retrying Lwt computations *) + +type ('retry, 'fatal) error = + [ `Retry of 'retry + | `Fatal of 'fatal + ] +(** The type of errors that a retryable computation can produce. + + - [`Retry r] when [r] represents an error that can be retried. + - [`Fatal f] when [f] represents an error that cannot be retried. *) + +type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result +(** A [('ok, 'retry, 'fatal) attempt] is an alias for the [result] of a + retryable computation. + + - [Ok v] produces a successful value [v] + - [Error err] produces the {!type:error} [err] *) + +val pp_error : + ?retry:'retry Fmt.t -> + ?fatal:'fatal Fmt.t -> + ('retry, 'fatal) error Fmt.t +(** [pp_error ~retry ~fatal] is a formatter for {!type:error}s that formats + fatal and retryable errors according to the provided formatters. + + If either formatter is not provided, a default formatter will represent the + values as [""]. *) + +val equal_error : + retry:('retry -> 'retry -> bool) -> + fatal:('fatal -> 'fatal -> bool) -> + ('retry, 'fatal) error -> + ('retry, 'fatal) error -> + bool + +val on_error : + (unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t +(** [on_error f] is a stream of attempts to compute [f]. The stream will continue until + the computation succeeds or produces a fatal error. + + Examples + + {[ + # open Current;; + + # let success () = Lwt.return_ok ();; + val success : unit -> (unit, 'a) result Lwt.t = + # Retry.(success |> on_error) |> Lwt_stream.to_list;; + - : (unit, 'a, 'b) Current.Retry.attempt list = [Ok ()] + + # let fatal_failure () = Lwt.return_error (`Fatal ());; + val fatal_failure : unit -> ('a, [> `Fatal of unit ]) result Lwt.t = + # Retry.(fatal_failure |> on_error) |> Lwt_stream.to_list;; + - : ('a, 'b, unit) Current.Retry.attempt list = [Error (`Fatal ())] + + # let retryable_error () = Lwt.return_error (`Retry ());; + val retryable_error : unit -> ('a, [> `Retry of unit ]) result Lwt.t = + # Retry.(retryable_error |> on_error) |> Lwt_stream.nget 3;; + - : ('a, unit, 'b) Current.Retry.attempt list = + [Error (`Retry ()); Error (`Retry ()); Error (`Retry ())] + ]}*) + +val with_sleep : + ?duration:(int -> float) -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t +(** [with_sleep ~duration attempts] is the stream of [attempts] with a sleep + added after computing each [n]th retryable attempt based on [duration n]. + + @param duration the optional sleep duration. This defaults to an exponential + backoff computed as n * 2 * (1.5 ^ n), which gives the approximate sequence 0s -> 3s -> + 9s -> 20.25 -> 40.5s -> 75.9s -> 136.7 -> ... + + Examples + {[ + # let retryable_error () = Lwt.return_error (`Retry ());; + # let attempts_with_sleeps = Retry.(retryable_error |> on_error |> with_sleep);; + # Lwt_stream.get attempts_with_sleeps;; + (* computed immediately *) + Some (Error (`Retry ())) + + # Lwt_stream.get attempts_with_sleeps;; + (* after 3 seconds *) + Some (Error (`Retry ())) + + # Lwt_stream.get attempts_with_sleeps;; + (* after 9 seconds *) + Some (Error (`Retry ())) + + (* a stream with a constant 1s sleep between attempts *) + # let attempts_with_constant_sleeps = + Retry.(retryable_error |> on_error |> with_sleep ~duration:(fun _ -> 1.0));; + ]} *) + +val n_times : + ?pp:('retry, 'fatal) error Fmt.t -> + int -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t -> + ('ok, [`Msg of string]) result Lwt.t + +(** [n_times n attempts] is [Ok v] if one of the [attempts] succeeds within [n] + retries. Otherwise, it is [Error (`Msg msg)] with [msg] derived based on the + error formatter. + + @param pp an optional formatter used to produce the error message, + defaulting to {!pp_error}. + + Examples + + {[ + # let operation () = + let i = ref 0 in + fun () -> Lwt.return_error (if !i < 3 then (incr i; `Retry !i) else `Fatal "msg");; + # Retry.(operation () |> on_error |> n_times ~pp:(pp_error ~retry:Fmt.int ~fatal:Fmt.string) 5);; + - : ('a, [ `Msg of string ]) result = Error (`Msg "fatal error 'msg' (fatal)") + ]} *) diff --git a/test/dune b/test/dune index f15690fd..de23c9e2 100644 --- a/test/dune +++ b/test/dune @@ -1,6 +1,6 @@ (executables (names test test_monitor test_cache) - (modules test test_monitor test_cache test_job test_log_matcher driver) + (modules test test_monitor test_cache test_job test_log_matcher test_retry driver) (libraries alcotest alcotest-lwt diff --git a/test/test.ml b/test/test.ml index 35417cc6..7e3e3c04 100644 --- a/test/test.ml +++ b/test/test.ml @@ -388,5 +388,6 @@ let () = "monitor", Test_monitor.tests; "job", Test_job.tests; "log_matcher", Test_log_matcher.tests; + "retry", Test_retry.tests; ] end diff --git a/test/test_retry.ml b/test/test_retry.ml new file mode 100644 index 00000000..79084467 --- /dev/null +++ b/test/test_retry.ml @@ -0,0 +1,160 @@ +open Lwt.Infix +open Lwt.Syntax + +module Retry = Current.Retry + +let attempt + (ok : 'ok Alcotest.testable) + (retry : 'retry Alcotest.testable) + (fatal : 'fatal Alcotest.testable) + : ('ok, 'retry, 'fatal) Retry.attempt Alcotest.testable + = + let pp = Retry.pp_error + ~retry:(Alcotest.pp retry) + ~fatal:(Alcotest.pp fatal) + in + let eq = Retry.equal_error + ~retry:(Alcotest.equal retry) + ~fatal:(Alcotest.equal fatal) + in + let error = Alcotest.testable pp eq in + Alcotest.result ok error + +let err_msg : [`Msg of string] Alcotest.testable = + let pp fmt (`Msg m) = Fmt.pf fmt "`Msg %a" Fmt.string m in + let eq (`Msg a) (`Msg b) = String.equal a b in + Alcotest.testable pp eq + +let test_success_without_retry _switch () = + let strm = + Retry.on_error (fun () -> Lwt.return_ok 42) + in + + let msg = "expected success" in + let expected = (Ok 42) in + let* actual = Lwt_stream.next strm in + Alcotest.(check' (attempt int unit unit)) ~msg ~expected ~actual; + + let msg = "expected stream to be empty" in + let expected = true in + let+ actual = Lwt_stream.is_empty strm in + Alcotest.(check' bool) ~msg ~expected ~actual + +let test_retries _switch () = + let strm = + Retry.on_error (fun () -> Lwt.return_error (`Retry ())) + in + + let msg = "expected 3 retry errors" in + let expected = List.init 3 (fun _ -> Error (`Retry ())) in + let+ actual = Lwt_stream.nget 3 strm in + Alcotest.(check' (list (attempt unit unit unit))) ~msg ~expected ~actual + +let test_retries_before_fatal_error _switch () = + let retries_before_fatal = 3 in + let i = ref 0 in + let strm = Retry.on_error + (fun () -> + if !i < retries_before_fatal then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_error (`Fatal ())) + in + + let msg = "expected 3 retry errors" in + let expected = retries_before_fatal in + let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in + Alcotest.(check' int) ~msg ~expected ~actual; + + let msg = "expected fatal error" in + let expected = Error (`Fatal ()) in + let* actual = Lwt_stream.next strm in + Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual; + + let msg = "expected stream to be empty" in + let+ stream_is_empty = Lwt_stream.is_empty strm in + Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty + +let test_retries_before_success _switch () = + let retries_before_fatal = 3 in + let i = ref 0 in + let strm = Retry.on_error (fun () -> + if !i < retries_before_fatal then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_ok () + ) + in + + let msg = "expected 3 retry errors" in + let expected = retries_before_fatal in + let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in + Alcotest.(check' int) ~msg ~expected ~actual; + + let msg = "expected success error" in + let expected = Ok () in + let* actual = Lwt_stream.next strm in + Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual; + + let msg = "expected stream to be empty" in + let+ stream_is_empty = Lwt_stream.is_empty strm in + Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty + +let test_n_times_fatal _switch () = + let i = ref 0 in + let operation () = + if !i < 3 then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_error (`Fatal ()) + in + let msg = "expected fatal error message" in + let expected = `Msg "fatal error '' (fatal)" in + let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in + Alcotest.(check' err_msg) ~msg ~expected ~actual + +let test_n_times_exhaustion _switch () = + let operation () = Lwt.return_error (`Retry ()) in + let msg = "expected exhaustion error message" in + let expected = `Msg "retryable error '' (exhausted)" in + let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in + Alcotest.(check' err_msg) ~msg ~expected ~actual + +let test_n_times_success _switch () = + let i = ref 0 in + let operation () = + if !i < 3 then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_ok () + in + try Retry.(operation |> on_error |> n_times 5) >|= Result.get_ok + with Invalid_argument _ -> Alcotest.fail "expected Ok result" + +(* test that the sleeps actually do throttle the computations *) +let test_with_sleep _switch () = + let duration _ = 0.1 in + let racing_operation = Lwt_unix.sleep (duration ()) >|= Result.ok in + let operation () = Lwt.return_error (`Retry ()) in + let retries = Retry.(operation |> on_error |> with_sleep ~duration |> n_times 5) in + (* If [with_sleep] is removed the test fails, as expected *) + let msg = "expected racing_operation to complete before the retries with sleeps" in + let expected = Ok () in + let+ actual = Lwt.choose [racing_operation; retries] in + Alcotest.(check' (result unit err_msg)) ~msg ~expected ~actual + +let tests = + [ + Alcotest_lwt.test_case "test_success_without_retry" `Quick test_success_without_retry; + Alcotest_lwt.test_case "test_retries" `Quick test_retries; + Alcotest_lwt.test_case "test_retries_before_fatal_error" `Quick test_retries_before_fatal_error; + Alcotest_lwt.test_case "test_retries_before_success" `Quick test_retries_before_success; + Alcotest_lwt.test_case "test_n_times_fatal" `Quick test_n_times_fatal; + Alcotest_lwt.test_case "test_n_times_exhaustion" `Quick test_n_times_exhaustion; + Alcotest_lwt.test_case "test_n_times_success" `Quick test_n_times_success; + Alcotest_lwt.test_case "test_with_sleep" `Quick test_with_sleep; + ] diff --git a/test/test_retry.mli b/test/test_retry.mli new file mode 100644 index 00000000..01604e16 --- /dev/null +++ b/test/test_retry.mli @@ -0,0 +1 @@ +val tests : unit Alcotest_lwt.test_case list