Skip to content

Commit

Permalink
Add retry utility
Browse files Browse the repository at this point in the history
  • Loading branch information
shonfeder committed Aug 21, 2024
1 parent d57b358 commit ec9e3c9
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/current.ml
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ module Process = Process
module Switch = Switch
module Pool = Pool
module Log_matcher = Log_matcher
module Retry = Retry

module Job = struct
include Job
Expand Down
4 changes: 3 additions & 1 deletion lib/current.mli
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ end
A primitive is roughly the content of a single box in the diagram.
Warning: [Primitive] is the low-level API. You will almost always want to
use {!Current_cache} (for processing or publishing jobs) or {!Monitor} (for
use {!Current_cache} (for processing or publishing jobs) or {!monitor} (for
inputs) instead. *)
module Primitive : sig
type 'a t = ('a Current_term.Output.t * Metadata.t option) Current_incr.t
Expand Down Expand Up @@ -460,3 +460,5 @@ module Log_matcher : sig
val drop_all : unit -> unit
val analyse_string : ?job:Job.t -> string -> string option
end

module Retry = Retry
84 changes: 84 additions & 0 deletions lib/retry.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
open Lwt.Syntax


let default_sleep_duration n' =
let base_sleep_time = 2.0 in
let n = Int.to_float n' in
let backoff = n *. base_sleep_time *. Float.pow 1.5 n in
backoff

type ('retry, 'fatal) error =
[ `Retry of 'retry
| `Fatal of 'fatal
]

let pp_error : ?retry:'retry Fmt.t -> ?fatal:'fatal Fmt.t -> ('retry, 'fatal) error Fmt.t =
fun ?retry ?fatal fmt err ->
let default fmt _ = Fmt.pf fmt "<opaque>" in
let pp_retry = Option.value retry ~default in
let pp_fatal = Option.value fatal ~default in
match err with
| `Retry r -> Fmt.pf fmt "retryable error '%a'" pp_retry r
| `Fatal f -> Fmt.pf fmt "fatal error '%a'" pp_fatal f

let equal_error ~retry ~fatal a b =
match a, b with
| `Retry a', `Retry b' -> retry a' b'
| `Fatal a', `Fatal b' -> fatal a' b'
| _ -> false

type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result

let is_retryable = function
| Error (`Retry _) -> true
| _ -> false

let on_error
(f : unit -> ('ok, 'retry, 'fatal) attempt Lwt.t)
: ('ok, 'retry, 'fatal) attempt Lwt_stream.t
=
let stop = ref false in
let attempt () =
if !stop then
Lwt.return_none
else
let+ result = f () in
stop := not (is_retryable result);
Some result
in
Lwt_stream.from attempt

let numbered attempts : (int * _) Lwt_stream.t =
let i = ref 0 in
let indexes = Lwt_stream.from_direct (fun () -> let n = !i in incr i; Some n) in
Lwt_stream.combine indexes attempts

let with_sleep ?(duration=default_sleep_duration) attempts =
attempts
|> numbered
|> Lwt_stream.map_s (fun (attempt_number, attempt_result) ->
let+ () = Lwt_unix.sleep (duration @@ attempt_number) in
attempt_result
)

let pp_n_times_error fmt = function
| `Retry _ -> Fmt.pf fmt "(exhausted)"
| `Fatal _ -> Fmt.pf fmt "(fatal)"

let n_times
: ?pp:('retry, 'fatal) error Fmt.t
-> int
-> ('ok, 'retry, 'fatal) attempt Lwt_stream.t
-> ('a, [`Msg of string]) result Lwt.t
= fun ?pp n strm ->
let pp = match pp with
| Some f -> fun fmt err -> Fmt.pf fmt "%a %a" f err pp_n_times_error err
| None -> fun fmt err -> Fmt.pf fmt "%a %a" (fun e -> pp_error e) err pp_n_times_error err
in
let to_or_error attempt : ('a, [`Msg of string]) result =
Result.map_error (fun err -> `Msg (Fmt.to_to_string pp err)) attempt
in
let+ attempts = strm |> Lwt_stream.map to_or_error |> Lwt_stream.nget n in
match List.rev attempts with
| last :: _ -> last
| _ -> failwith "impossible"
114 changes: 114 additions & 0 deletions lib/retry.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
(** Utilities for retrying Lwt computations *)

type ('retry, 'fatal) error =
[ `Retry of 'retry
| `Fatal of 'fatal
]
(** The type of errors that a retryable computation can produce.
- [`Retry r] when `r` represents an error that can be retried.
- [`Fatal f] when `f` represents an error that cannot be retried. *)

type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result
(** A [('ok, 'retry, 'fatal) attempt] is an alias for the [result] of a
retryable computation.
- [Ok v] produces a successful value [v]
- [Error err] produces the {!type:error} [err] *)

val pp_error :
?retry:'retry Fmt.t ->
?fatal:'fatal Fmt.t ->
('retry, 'fatal) error Fmt.t
(** [pp_error ~retry ~fatal] is a formatter for {!type:error}s that formats
fatal and retryable errors according to the provided formatters.
If either formatter is not provided, a default formatter will represent the
values as ["<opaque>"]. *)

val equal_error :
retry:('retry -> 'retry -> bool) ->
fatal:('fatal -> 'fatal -> bool) ->
('retry, 'fatal) error ->
('retry, 'fatal) error ->
bool

val on_error :
(unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t
(** [on_error f] is a stream of attempts to compute [f]. The stream will continue until
the computation succeeds or produces a fatal error.
Examples
{[
# open Current;;
# let success () = Lwt.return_ok ();;
val success : unit -> (unit, 'a) result Lwt.t = <fun>
# Retry.(success |> on_error) |> Lwt_stream.to_list;;
- : (unit, 'a, 'b) Current.Retry.attempt list = [Ok ()]
# let fatal_failure () = Lwt.return_error (`Fatal ());;
val fatal_failure : unit -> ('a, [> `Fatal of unit ]) result Lwt.t = <fun>
# Retry.(fatal_failure |> on_error) |> Lwt_stream.to_list;;
- : ('a, 'b, unit) Current.Retry.attempt list = [Error (`Fatal ())]
# let retryable_error () = Lwt.return_error (`Retry ());;
val retryable_error : unit -> ('a, [> `Retry of unit ]) result Lwt.t = <fun>
# Retry.(retryable_error |> on_error) |> Lwt_stream.nget 5;;
- : ('a, unit, 'b) Current.Retry.attempt list =
[Error (`Retry ()); Error (`Retry ()); Error (`Retry ()); Error (`Retry ());
Error (`Retry ())]
]}*)

val with_sleep :
?duration:(int -> float) ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t
(** [with_sleep ~duration attempts] is the stream of [attempts] with a sleep
added after computing each [n]th retryable attempt based on [duration n].
@param duration the optional sleep duration. This defaults to an exponential
backoff computed as n * 2 * (1.5 ^ n), which gives the approximate sequence 0s -> 3s ->
9s -> 20.25 -> 40.5s -> 75.9s -> 136.7...
Examples
{[
# let retryable_error () = Lwt.return_error (`Retry ());;
# let attempts_with_sleeps = Retry.(retryable_error |> on_error |> with_sleep);;
# Lwt_stream.get attempts_with_sleeps;;
(* computed immediately *)
Some (Error (`Retry ()))
# Lwt_stream.get attempts_with_sleeps;;
(* after 3 seconds *)
Some (Error (`Retry ()))
# Lwt_stream.get attempts_with_sleeps;;
(* after 9 seconds *)
Some (Error (`Retry ()))
(* a stream a constant 1s sleep between attempts *)
# let attempts_with_constant_sleeps =
Retry.(retryable_error |> on_error |> with_sleep ~duration:(fun _ -> 1.0));;
]} *)

val n_times :
?pp:('retry, 'fatal) error Fmt.t ->
int ->
('ok, 'retry, 'fatal) attempt Lwt_stream.t ->
('ok, [`Msg of string]) result Lwt.t

(** [n_times n attempts] is [Ok v] if one of the [attempts] succeeds within [n]
retries. Otherwise, it is [Error (`Msg msg)] with [msg] derived based on the
error formatter.
@param pp an optional formatter used to produce the error message,
defaulting to {!pp_error}.
{[
# let operation () =
let i = ref 0 in
fun () -> Lwt.return_error (if !i < 3 then (incr i; `Retry !i) else `Fatal "msg");;
# Retry.(operation () |> on_error |> n_times ~pp:(pp_error ~retry:Fmt.int ~fatal:Fmt.string) 5);;
- : ('a, [ `Msg of string ]) result = Error (`Msg "fatal error 'msg' (fatal)")
]} *)
2 changes: 1 addition & 1 deletion test/dune
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(executables
(names test test_monitor test_cache)
(modules test test_monitor test_cache test_job test_log_matcher driver)
(modules test test_monitor test_cache test_job test_log_matcher test_retry driver)
(libraries
alcotest
alcotest-lwt
Expand Down
1 change: 1 addition & 0 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -388,5 +388,6 @@ let () =
"monitor", Test_monitor.tests;
"job", Test_job.tests;
"log_matcher", Test_log_matcher.tests;
"retry", Test_retry.tests;
]
end
Loading

0 comments on commit ec9e3c9

Please sign in to comment.