From ec9e3c950339fc268f2792ba3acaf71eb45d0b08 Mon Sep 17 00:00:00 2001 From: Shon Feder Date: Fri, 16 Aug 2024 20:46:18 -0400 Subject: [PATCH] Add retry utility --- lib/current.ml | 1 + lib/current.mli | 4 +- lib/retry.ml | 84 +++++++++++++++++++++++ lib/retry.mli | 114 +++++++++++++++++++++++++++++++ test/dune | 2 +- test/test.ml | 1 + test/test_retry.ml | 163 ++++++++++++++++++++++++++++++++++++++++++++ test/test_retry.mli | 1 + 8 files changed, 368 insertions(+), 2 deletions(-) create mode 100644 lib/retry.ml create mode 100644 lib/retry.mli create mode 100644 test/test_retry.ml create mode 100644 test/test_retry.mli diff --git a/lib/current.ml b/lib/current.ml index eb7b54de..8d9bfabc 100644 --- a/lib/current.ml +++ b/lib/current.ml @@ -333,6 +333,7 @@ module Process = Process module Switch = Switch module Pool = Pool module Log_matcher = Log_matcher +module Retry = Retry module Job = struct include Job diff --git a/lib/current.mli b/lib/current.mli index ca0d8e84..586e5fca 100644 --- a/lib/current.mli +++ b/lib/current.mli @@ -53,7 +53,7 @@ end A primitive is roughly the content of a single box in the diagram. Warning: [Primitive] is the low-level API. You will almost always want to - use {!Current_cache} (for processing or publishing jobs) or {!Monitor} (for + use {!Current_cache} (for processing or publishing jobs) or {!monitor} (for inputs) instead. *) module Primitive : sig type 'a t = ('a Current_term.Output.t * Metadata.t option) Current_incr.t @@ -460,3 +460,5 @@ module Log_matcher : sig val drop_all : unit -> unit val analyse_string : ?job:Job.t -> string -> string option end + +module Retry = Retry diff --git a/lib/retry.ml b/lib/retry.ml new file mode 100644 index 00000000..de7d05a6 --- /dev/null +++ b/lib/retry.ml @@ -0,0 +1,84 @@ +open Lwt.Syntax + + +let default_sleep_duration n' = + let base_sleep_time = 2.0 in + let n = Int.to_float n' in + let backoff = n *. base_sleep_time *. Float.pow 1.5 n in + backoff + +type ('retry, 'fatal) error = + [ `Retry of 'retry + | `Fatal of 'fatal + ] + +let pp_error : ?retry:'retry Fmt.t -> ?fatal:'fatal Fmt.t -> ('retry, 'fatal) error Fmt.t = + fun ?retry ?fatal fmt err -> + let default fmt _ = Fmt.pf fmt "" in + let pp_retry = Option.value retry ~default in + let pp_fatal = Option.value fatal ~default in + match err with + | `Retry r -> Fmt.pf fmt "retryable error '%a'" pp_retry r + | `Fatal f -> Fmt.pf fmt "fatal error '%a'" pp_fatal f + +let equal_error ~retry ~fatal a b = + match a, b with + | `Retry a', `Retry b' -> retry a' b' + | `Fatal a', `Fatal b' -> fatal a' b' + | _ -> false + +type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result + +let is_retryable = function + | Error (`Retry _) -> true + | _ -> false + +let on_error + (f : unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) + : ('ok, 'retry, 'fatal) attempt Lwt_stream.t + = + let stop = ref false in + let attempt () = + if !stop then + Lwt.return_none + else + let+ result = f () in + stop := not (is_retryable result); + Some result + in + Lwt_stream.from attempt + +let numbered attempts : (int * _) Lwt_stream.t = + let i = ref 0 in + let indexes = Lwt_stream.from_direct (fun () -> let n = !i in incr i; Some n) in + Lwt_stream.combine indexes attempts + +let with_sleep ?(duration=default_sleep_duration) attempts = + attempts + |> numbered + |> Lwt_stream.map_s (fun (attempt_number, attempt_result) -> + let+ () = Lwt_unix.sleep (duration @@ attempt_number) in + attempt_result + ) + +let pp_n_times_error fmt = function + | `Retry _ -> Fmt.pf fmt "(exhausted)" + | `Fatal _ -> Fmt.pf fmt "(fatal)" + +let n_times + : ?pp:('retry, 'fatal) error Fmt.t + -> int + -> ('ok, 'retry, 'fatal) attempt Lwt_stream.t + -> ('a, [`Msg of string]) result Lwt.t + = fun ?pp n strm -> + let pp = match pp with + | Some f -> fun fmt err -> Fmt.pf fmt "%a %a" f err pp_n_times_error err + | None -> fun fmt err -> Fmt.pf fmt "%a %a" (fun e -> pp_error e) err pp_n_times_error err + in + let to_or_error attempt : ('a, [`Msg of string]) result = + Result.map_error (fun err -> `Msg (Fmt.to_to_string pp err)) attempt + in + let+ attempts = strm |> Lwt_stream.map to_or_error |> Lwt_stream.nget n in + match List.rev attempts with + | last :: _ -> last + | _ -> failwith "impossible" diff --git a/lib/retry.mli b/lib/retry.mli new file mode 100644 index 00000000..b099808b --- /dev/null +++ b/lib/retry.mli @@ -0,0 +1,114 @@ +(** Utilities for retrying Lwt computations *) + +type ('retry, 'fatal) error = + [ `Retry of 'retry + | `Fatal of 'fatal + ] +(** The type of errors that a retryable computation can produce. + + - [`Retry r] when `r` represents an error that can be retried. + - [`Fatal f] when `f` represents an error that cannot be retried. *) + +type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result +(** A [('ok, 'retry, 'fatal) attempt] is an alias for the [result] of a + retryable computation. + + - [Ok v] produces a successful value [v] + - [Error err] produces the {!type:error} [err] *) + +val pp_error : + ?retry:'retry Fmt.t -> + ?fatal:'fatal Fmt.t -> + ('retry, 'fatal) error Fmt.t +(** [pp_error ~retry ~fatal] is a formatter for {!type:error}s that formats + fatal and retryable errors according to the provided formatters. + + If either formatter is not provided, a default formatter will represent the + values as [""]. *) + +val equal_error : + retry:('retry -> 'retry -> bool) -> + fatal:('fatal -> 'fatal -> bool) -> + ('retry, 'fatal) error -> + ('retry, 'fatal) error -> + bool + +val on_error : + (unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t +(** [on_error f] is a stream of attempts to compute [f]. The stream will continue until + the computation succeeds or produces a fatal error. + + Examples + + {[ + # open Current;; + + # let success () = Lwt.return_ok ();; + val success : unit -> (unit, 'a) result Lwt.t = + # Retry.(success |> on_error) |> Lwt_stream.to_list;; + - : (unit, 'a, 'b) Current.Retry.attempt list = [Ok ()] + + # let fatal_failure () = Lwt.return_error (`Fatal ());; + val fatal_failure : unit -> ('a, [> `Fatal of unit ]) result Lwt.t = + # Retry.(fatal_failure |> on_error) |> Lwt_stream.to_list;; + - : ('a, 'b, unit) Current.Retry.attempt list = [Error (`Fatal ())] + + # let retryable_error () = Lwt.return_error (`Retry ());; + val retryable_error : unit -> ('a, [> `Retry of unit ]) result Lwt.t = + # Retry.(retryable_error |> on_error) |> Lwt_stream.nget 5;; + - : ('a, unit, 'b) Current.Retry.attempt list = + [Error (`Retry ()); Error (`Retry ()); Error (`Retry ()); Error (`Retry ()); + Error (`Retry ())] + ]}*) + +val with_sleep : + ?duration:(int -> float) -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t +(** [with_sleep ~duration attempts] is the stream of [attempts] with a sleep + added after computing each [n]th retryable attempt based on [duration n]. + + @param duration the optional sleep duration. This defaults to an exponential + backoff computed as n * 2 * (1.5 ^ n), which gives the approximate sequence 0s -> 3s -> + 9s -> 20.25 -> 40.5s -> 75.9s -> 136.7... + + Examples + {[ + # let retryable_error () = Lwt.return_error (`Retry ());; + # let attempts_with_sleeps = Retry.(retryable_error |> on_error |> with_sleep);; + # Lwt_stream.get attempts_with_sleeps;; + (* computed immediately *) + Some (Error (`Retry ())) + # Lwt_stream.get attempts_with_sleeps;; + (* after 3 seconds *) + Some (Error (`Retry ())) + # Lwt_stream.get attempts_with_sleeps;; + (* after 9 seconds *) + Some (Error (`Retry ())) + + (* a stream a constant 1s sleep between attempts *) + # let attempts_with_constant_sleeps = + Retry.(retryable_error |> on_error |> with_sleep ~duration:(fun _ -> 1.0));; + ]} *) + +val n_times : + ?pp:('retry, 'fatal) error Fmt.t -> + int -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t -> + ('ok, [`Msg of string]) result Lwt.t + +(** [n_times n attempts] is [Ok v] if one of the [attempts] succeeds within [n] + retries. Otherwise, it is [Error (`Msg msg)] with [msg] derived based on the + error formatter. + + @param pp an optional formatter used to produce the error message, + defaulting to {!pp_error}. + + {[ + # let operation () = + let i = ref 0 in + fun () -> Lwt.return_error (if !i < 3 then (incr i; `Retry !i) else `Fatal "msg");; + # Retry.(operation () |> on_error |> n_times ~pp:(pp_error ~retry:Fmt.int ~fatal:Fmt.string) 5);; + - : ('a, [ `Msg of string ]) result = Error (`Msg "fatal error 'msg' (fatal)") + ]} *) diff --git a/test/dune b/test/dune index f15690fd..de23c9e2 100644 --- a/test/dune +++ b/test/dune @@ -1,6 +1,6 @@ (executables (names test test_monitor test_cache) - (modules test test_monitor test_cache test_job test_log_matcher driver) + (modules test test_monitor test_cache test_job test_log_matcher test_retry driver) (libraries alcotest alcotest-lwt diff --git a/test/test.ml b/test/test.ml index 35417cc6..7e3e3c04 100644 --- a/test/test.ml +++ b/test/test.ml @@ -388,5 +388,6 @@ let () = "monitor", Test_monitor.tests; "job", Test_job.tests; "log_matcher", Test_log_matcher.tests; + "retry", Test_retry.tests; ] end diff --git a/test/test_retry.ml b/test/test_retry.ml new file mode 100644 index 00000000..83d2fb26 --- /dev/null +++ b/test/test_retry.ml @@ -0,0 +1,163 @@ +open Lwt.Infix +open Lwt.Syntax + +module Retry = Current.Retry + +let attempt + (ok : 'ok Alcotest.testable) + (retry : 'retry Alcotest.testable) + (fatal : 'fatal Alcotest.testable) + : ('ok, 'retry, 'fatal) Retry.attempt Alcotest.testable + = + let pp = Retry.pp_error + ~retry:(Alcotest.pp retry) + ~fatal:(Alcotest.pp fatal) + in + let eq = Retry.equal_error + ~retry:(Alcotest.equal retry) + ~fatal:(Alcotest.equal fatal) + in + let error = Alcotest.testable pp eq in + Alcotest.result ok error + +let err_msg + (err : 'err Alcotest.testable) + : [`Msg of 'err] Alcotest.testable + = + let pp fmt (`Msg m) = Fmt.pf fmt "`Msg %a" (Alcotest.pp err) m in + let eq (`Msg a) (`Msg b) = Alcotest.equal err a b in + Alcotest.testable pp eq + +let test_success_without_retry _switch () = + let strm = + Retry.on_error (fun () -> Lwt.return_ok 42) + in + + let msg = "expected success" in + let expected = (Ok 42) in + let* actual = Lwt_stream.next strm in + Alcotest.(check' (attempt int unit unit)) ~msg ~expected ~actual; + + let msg = "expected stream to be empty" in + let expected = true in + let+ actual = Lwt_stream.is_empty strm in + Alcotest.(check' bool) ~msg ~expected ~actual + +let test_retries _switch () = + let strm = + Retry.on_error (fun () -> Lwt.return_error (`Retry ())) + in + + let msg = "expected 3 retry errors" in + let expected = List.init 3 (fun _ -> Error (`Retry ())) in + let+ actual = Lwt_stream.nget 3 strm in + Alcotest.(check' (list (attempt unit unit unit))) ~msg ~expected ~actual + +let test_retries_before_fatal_error _switch () = + let retries_before_fatal = 3 in + let i = ref 0 in + let strm = Retry.on_error + (fun () -> + if !i < retries_before_fatal then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_error (`Fatal ())) + in + + let msg = "expected 3 retry errors" in + let expected = retries_before_fatal in + let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in + Alcotest.(check' int) ~msg ~expected ~actual; + + let msg = "expected fatal error" in + let expected = Error (`Fatal ()) in + let* actual = Lwt_stream.next strm in + Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual; + + let msg = "expected stream to be empty" in + let+ stream_is_empty = Lwt_stream.is_empty strm in + Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty + +let test_retries_before_success _switch () = + let retries_before_fatal = 3 in + let i = ref 0 in + let strm = Retry.on_error (fun () -> + if !i < retries_before_fatal then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_ok () + ) + in + + let msg = "expected 3 retry errors" in + let expected = retries_before_fatal in + let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in + Alcotest.(check' int) ~msg ~expected ~actual; + + let msg = "expected success error" in + let expected = Ok () in + let* actual = Lwt_stream.next strm in + Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual; + + let msg = "expected stream to be empty" in + let+ stream_is_empty = Lwt_stream.is_empty strm in + Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty + +let test_n_times_fatal _switch () = + let i = ref 0 in + let operation () = + if !i < 3 then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_error (`Fatal ()) + in + let msg = "expected fatal error message" in + let expected = `Msg "fatal error '' (fatal)" in + let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in + Alcotest.(check' (err_msg string)) ~msg ~expected ~actual + +let test_n_times_exhaustion _switch () = + let operation () = Lwt.return_error (`Retry ()) in + let msg = "expected exhaustion error message" in + let expected = `Msg "retriable error '' (exhausted)" in + let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in + Alcotest.(check' (err_msg string)) ~msg ~expected ~actual + +let test_n_times_success _switch () = + let i = ref 0 in + let operation () = + if !i < 3 then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_ok () + in + try Retry.(operation |> on_error |> n_times 5) >|= Result.get_ok + with Invalid_argument _ -> Alcotest.fail "expected Ok result" + +(* test that the sleeps actually do throttle the computations *) +let test_with_sleep _switch () = + let duration _ = 0.1 in + let racing_operation = Lwt_unix.sleep 0.2 >|= Result.ok in + let operation () = Lwt.return_error (`Retry ()) in + let retries = Retry.(operation |> on_error |> with_sleep ~duration |> n_times 5) in + (* If [with_sleep] is removed the test fails, as expected *) + let msg = "expected racing_operation to complete before the retries with sleeps" in + let expected = Ok () in + let+ actual = Lwt.choose [racing_operation; retries] in + Alcotest.(check' (result unit (err_msg string))) ~msg ~expected ~actual + +let tests = + [ + Alcotest_lwt.test_case "test_success_without_retry" `Quick test_success_without_retry; + Alcotest_lwt.test_case "test_retries" `Quick test_retries; + Alcotest_lwt.test_case "test_retries_before_fatal_error" `Quick test_retries_before_fatal_error; + Alcotest_lwt.test_case "test_retries_before_success" `Quick test_retries_before_success; + Alcotest_lwt.test_case "test_n_times_fatal" `Quick test_n_times_fatal; + Alcotest_lwt.test_case "test_n_times_exhaustion" `Quick test_n_times_exhaustion; + Alcotest_lwt.test_case "test_n_times_success" `Quick test_n_times_success; + Alcotest_lwt.test_case "test_with_sleep" `Quick test_with_sleep; + ] diff --git a/test/test_retry.mli b/test/test_retry.mli new file mode 100644 index 00000000..01604e16 --- /dev/null +++ b/test/test_retry.mli @@ -0,0 +1 @@ +val tests : unit Alcotest_lwt.test_case list