forked from ocurrent/ocurrent
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
355 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
open Lwt.Syntax | ||
|
||
let default_sleep_duration n' = | ||
let base_sleep_time = 2.0 in | ||
let n = Int.to_float n' in | ||
n *. base_sleep_time *. Float.pow 1.5 n | ||
|
||
type ('retry, 'fatal) error = | ||
[ `Retry of 'retry | ||
| `Fatal of 'fatal | ||
] | ||
|
||
let pp_error ?retry ?fatal fmt err = | ||
let default fmt _ = Fmt.pf fmt "<opaque>" in | ||
let pp_retry = Option.value retry ~default in | ||
let pp_fatal = Option.value fatal ~default in | ||
match err with | ||
| `Retry r -> Fmt.pf fmt "retryable error '%a'" pp_retry r | ||
| `Fatal f -> Fmt.pf fmt "fatal error '%a'" pp_fatal f | ||
|
||
let equal_error ~retry ~fatal a b = | ||
match a, b with | ||
| `Retry a', `Retry b' -> retry a' b' | ||
| `Fatal a', `Fatal b' -> fatal a' b' | ||
| _ -> false | ||
|
||
type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result | ||
|
||
let is_retryable = function | ||
| Error (`Retry _) -> true | ||
| _ -> false | ||
|
||
let on_error | ||
(f : unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) | ||
: ('ok, 'retry, 'fatal) attempt Lwt_stream.t | ||
= | ||
let stop = ref false in | ||
let attempt () = | ||
if !stop then | ||
Lwt.return_none | ||
else | ||
let+ result = f () in | ||
stop := not (is_retryable result); | ||
Some result | ||
in | ||
Lwt_stream.from attempt | ||
|
||
let numbered attempts : (int * _) Lwt_stream.t = | ||
let i = ref 0 in | ||
let indexes = Lwt_stream.from_direct (fun () -> let n = !i in incr i; Some n) in | ||
Lwt_stream.combine indexes attempts | ||
|
||
let with_sleep ?(duration=default_sleep_duration) attempts = | ||
attempts | ||
|> numbered | ||
|> Lwt_stream.map_s (fun (attempt_number, attempt_result) -> | ||
let+ () = attempt_number |> duration |> Lwt_unix.sleep in | ||
attempt_result) | ||
|
||
let pp_n_times_error fmt = function | ||
| `Retry _ -> Fmt.pf fmt "(exhausted)" | ||
| `Fatal _ -> Fmt.pf fmt "(fatal)" | ||
|
||
let n_times ?(pp = (fun e -> pp_error e)) n attempts | ||
: (_, [`Msg of string]) result Lwt.t | ||
= | ||
let to_msg err : [`Msg of string] = | ||
Fmt.kstr (fun m -> `Msg m) "%a %a" pp err pp_n_times_error err in | ||
let+ attempts = Lwt_stream.nget n attempts in | ||
match List.rev attempts with | ||
| last :: _ -> last |> Result.map_error to_msg (* a [_ Current.or_error] *) | ||
| _ -> failwith "impossible" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
(** Utilities for retrying Lwt computations *) | ||
|
||
type ('retry, 'fatal) error = | ||
[ `Retry of 'retry | ||
| `Fatal of 'fatal | ||
] | ||
(** The type of errors that a retryable computation can produce. | ||
- [`Retry r] when [r] represents an error that can be retried. | ||
- [`Fatal f] when [f] represents an error that cannot be retried. *) | ||
|
||
type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error) result | ||
(** A [('ok, 'retry, 'fatal) attempt] is an alias for the [result] of a | ||
retryable computation. | ||
- [Ok v] produces a successful value [v] | ||
- [Error err] produces the {!type:error} [err] *) | ||
|
||
val pp_error : | ||
?retry:'retry Fmt.t -> | ||
?fatal:'fatal Fmt.t -> | ||
('retry, 'fatal) error Fmt.t | ||
(** [pp_error ~retry ~fatal] is a formatter for {!type:error}s that formats | ||
fatal and retryable errors according to the provided formatters. | ||
If either formatter is not provided, a default formatter will represent the | ||
values as ["<opaque>"]. *) | ||
|
||
val equal_error : | ||
retry:('retry -> 'retry -> bool) -> | ||
fatal:('fatal -> 'fatal -> bool) -> | ||
('retry, 'fatal) error -> | ||
('retry, 'fatal) error -> | ||
bool | ||
|
||
val on_error : | ||
(unit -> ('ok, 'retry, 'fatal) attempt Lwt.t) -> | ||
('ok, 'retry, 'fatal) attempt Lwt_stream.t | ||
(** [on_error f] is a stream of attempts to compute [f]. The stream will continue until | ||
the computation succeeds or produces a fatal error. | ||
Examples | ||
{[ | ||
# open Current;; | ||
# let success () = Lwt.return_ok ();; | ||
val success : unit -> (unit, 'a) result Lwt.t = <fun> | ||
# Retry.(success |> on_error) |> Lwt_stream.to_list;; | ||
- : (unit, 'a, 'b) Current.Retry.attempt list = [Ok ()] | ||
# let fatal_failure () = Lwt.return_error (`Fatal ());; | ||
val fatal_failure : unit -> ('a, [> `Fatal of unit ]) result Lwt.t = <fun> | ||
# Retry.(fatal_failure |> on_error) |> Lwt_stream.to_list;; | ||
- : ('a, 'b, unit) Current.Retry.attempt list = [Error (`Fatal ())] | ||
# let retryable_error () = Lwt.return_error (`Retry ());; | ||
val retryable_error : unit -> ('a, [> `Retry of unit ]) result Lwt.t = <fun> | ||
# Retry.(retryable_error |> on_error) |> Lwt_stream.nget 3;; | ||
- : ('a, unit, 'b) Current.Retry.attempt list = | ||
[Error (`Retry ()); Error (`Retry ()); Error (`Retry ())] | ||
]}*) | ||
|
||
val with_sleep : | ||
?duration:(int -> float) -> | ||
('ok, 'retry, 'fatal) attempt Lwt_stream.t -> | ||
('ok, 'retry, 'fatal) attempt Lwt_stream.t | ||
(** [with_sleep ~duration attempts] is the stream of [attempts] with a sleep | ||
added after computing each [n]th retryable attempt based on [duration n]. | ||
@param duration the optional sleep duration. This defaults to an exponential | ||
backoff computed as n * 2 * (1.5 ^ n), which gives the approximate sequence 0s -> 3s -> | ||
9s -> 20.25 -> 40.5s -> 75.9s -> 136.7 -> ... | ||
Examples | ||
{[ | ||
# let retryable_error () = Lwt.return_error (`Retry ());; | ||
# let attempts_with_sleeps = Retry.(retryable_error |> on_error |> with_sleep);; | ||
# Lwt_stream.get attempts_with_sleeps;; | ||
(* computed immediately *) | ||
Some (Error (`Retry ())) | ||
# Lwt_stream.get attempts_with_sleeps;; | ||
(* after 3 seconds *) | ||
Some (Error (`Retry ())) | ||
# Lwt_stream.get attempts_with_sleeps;; | ||
(* after 9 seconds *) | ||
Some (Error (`Retry ())) | ||
(* a stream with a constant 1s sleep between attempts *) | ||
# let attempts_with_constant_sleeps = | ||
Retry.(retryable_error |> on_error |> with_sleep ~duration:(fun _ -> 1.0));; | ||
]} *) | ||
|
||
val n_times : | ||
?pp:('retry, 'fatal) error Fmt.t -> | ||
int -> | ||
('ok, 'retry, 'fatal) attempt Lwt_stream.t -> | ||
('ok, [`Msg of string]) result Lwt.t | ||
|
||
(** [n_times n attempts] is [Ok v] if one of the [attempts] succeeds within [n] | ||
retries. Otherwise, it is [Error (`Msg msg)] with [msg] derived based on the | ||
error formatter. | ||
@param pp an optional formatter used to produce the error message, | ||
defaulting to {!pp_error}. | ||
Examples | ||
{[ | ||
# let operation () = | ||
let i = ref 0 in | ||
fun () -> Lwt.return_error (if !i < 3 then (incr i; `Retry !i) else `Fatal "msg");; | ||
# Retry.(operation () |> on_error |> n_times ~pp:(pp_error ~retry:Fmt.int ~fatal:Fmt.string) 5);; | ||
- : ('a, [ `Msg of string ]) result = Error (`Msg "fatal error 'msg' (fatal)") | ||
]} *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
open Lwt.Infix | ||
open Lwt.Syntax | ||
|
||
module Retry = Current.Retry | ||
|
||
let attempt | ||
(ok : 'ok Alcotest.testable) | ||
(retry : 'retry Alcotest.testable) | ||
(fatal : 'fatal Alcotest.testable) | ||
: ('ok, 'retry, 'fatal) Retry.attempt Alcotest.testable | ||
= | ||
let pp = Retry.pp_error | ||
~retry:(Alcotest.pp retry) | ||
~fatal:(Alcotest.pp fatal) | ||
in | ||
let eq = Retry.equal_error | ||
~retry:(Alcotest.equal retry) | ||
~fatal:(Alcotest.equal fatal) | ||
in | ||
let error = Alcotest.testable pp eq in | ||
Alcotest.result ok error | ||
|
||
let err_msg : [`Msg of string] Alcotest.testable = | ||
let pp fmt (`Msg m) = Fmt.pf fmt "`Msg %a" Fmt.string m in | ||
let eq (`Msg a) (`Msg b) = String.equal a b in | ||
Alcotest.testable pp eq | ||
|
||
let test_success_without_retry _switch () = | ||
let strm = | ||
Retry.on_error (fun () -> Lwt.return_ok 42) | ||
in | ||
|
||
let msg = "expected success" in | ||
let expected = (Ok 42) in | ||
let* actual = Lwt_stream.next strm in | ||
Alcotest.(check' (attempt int unit unit)) ~msg ~expected ~actual; | ||
|
||
let msg = "expected stream to be empty" in | ||
let expected = true in | ||
let+ actual = Lwt_stream.is_empty strm in | ||
Alcotest.(check' bool) ~msg ~expected ~actual | ||
|
||
let test_retries _switch () = | ||
let strm = | ||
Retry.on_error (fun () -> Lwt.return_error (`Retry ())) | ||
in | ||
|
||
let msg = "expected 3 retry errors" in | ||
let expected = List.init 3 (fun _ -> Error (`Retry ())) in | ||
let+ actual = Lwt_stream.nget 3 strm in | ||
Alcotest.(check' (list (attempt unit unit unit))) ~msg ~expected ~actual | ||
|
||
let test_retries_before_fatal_error _switch () = | ||
let retries_before_fatal = 3 in | ||
let i = ref 0 in | ||
let strm = Retry.on_error | ||
(fun () -> | ||
if !i < retries_before_fatal then ( | ||
incr i; | ||
Lwt.return_error (`Retry ()) | ||
) else | ||
Lwt.return_error (`Fatal ())) | ||
in | ||
|
||
let msg = "expected 3 retry errors" in | ||
let expected = retries_before_fatal in | ||
let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in | ||
Alcotest.(check' int) ~msg ~expected ~actual; | ||
|
||
let msg = "expected fatal error" in | ||
let expected = Error (`Fatal ()) in | ||
let* actual = Lwt_stream.next strm in | ||
Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual; | ||
|
||
let msg = "expected stream to be empty" in | ||
let+ stream_is_empty = Lwt_stream.is_empty strm in | ||
Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty | ||
|
||
let test_retries_before_success _switch () = | ||
let retries_before_fatal = 3 in | ||
let i = ref 0 in | ||
let strm = Retry.on_error (fun () -> | ||
if !i < retries_before_fatal then ( | ||
incr i; | ||
Lwt.return_error (`Retry ()) | ||
) else | ||
Lwt.return_ok () | ||
) | ||
in | ||
|
||
let msg = "expected 3 retry errors" in | ||
let expected = retries_before_fatal in | ||
let* actual = Lwt_stream.nget retries_before_fatal strm >|= List.length in | ||
Alcotest.(check' int) ~msg ~expected ~actual; | ||
|
||
let msg = "expected success error" in | ||
let expected = Ok () in | ||
let* actual = Lwt_stream.next strm in | ||
Alcotest.(check' (attempt unit unit unit)) ~msg ~expected ~actual; | ||
|
||
let msg = "expected stream to be empty" in | ||
let+ stream_is_empty = Lwt_stream.is_empty strm in | ||
Alcotest.(check' bool) ~msg ~expected:true ~actual:stream_is_empty | ||
|
||
let test_n_times_fatal _switch () = | ||
let i = ref 0 in | ||
let operation () = | ||
if !i < 3 then ( | ||
incr i; | ||
Lwt.return_error (`Retry ()) | ||
) else | ||
Lwt.return_error (`Fatal ()) | ||
in | ||
let msg = "expected fatal error message" in | ||
let expected = `Msg "fatal error '<opaque>' (fatal)" in | ||
let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in | ||
Alcotest.(check' err_msg) ~msg ~expected ~actual | ||
|
||
let test_n_times_exhaustion _switch () = | ||
let operation () = Lwt.return_error (`Retry ()) in | ||
let msg = "expected exhaustion error message" in | ||
let expected = `Msg "retryable error '<opaque>' (exhausted)" in | ||
let+ actual = Retry.(operation |> on_error |> n_times 5) >|= Result.get_error in | ||
Alcotest.(check' err_msg) ~msg ~expected ~actual | ||
|
||
let test_n_times_success _switch () = | ||
let i = ref 0 in | ||
let operation () = | ||
if !i < 3 then ( | ||
incr i; | ||
Lwt.return_error (`Retry ()) | ||
) else | ||
Lwt.return_ok () | ||
in | ||
try Retry.(operation |> on_error |> n_times 5) >|= Result.get_ok | ||
with Invalid_argument _ -> Alcotest.fail "expected Ok result" | ||
|
||
(* test that the sleeps actually do throttle the computations *) | ||
let test_with_sleep _switch () = | ||
let duration _ = 0.1 in | ||
let racing_operation = Lwt_unix.sleep (duration ()) >|= Result.ok in | ||
let operation () = Lwt.return_error (`Retry ()) in | ||
let retries = Retry.(operation |> on_error |> with_sleep ~duration |> n_times 5) in | ||
(* If [with_sleep] is removed the test fails, as expected *) | ||
let msg = "expected racing_operation to complete before the retries with sleeps" in | ||
let expected = Ok () in | ||
let+ actual = Lwt.choose [racing_operation; retries] in | ||
Alcotest.(check' (result unit err_msg)) ~msg ~expected ~actual | ||
|
||
let tests = | ||
[ | ||
Alcotest_lwt.test_case "test_success_without_retry" `Quick test_success_without_retry; | ||
Alcotest_lwt.test_case "test_retries" `Quick test_retries; | ||
Alcotest_lwt.test_case "test_retries_before_fatal_error" `Quick test_retries_before_fatal_error; | ||
Alcotest_lwt.test_case "test_retries_before_success" `Quick test_retries_before_success; | ||
Alcotest_lwt.test_case "test_n_times_fatal" `Quick test_n_times_fatal; | ||
Alcotest_lwt.test_case "test_n_times_exhaustion" `Quick test_n_times_exhaustion; | ||
Alcotest_lwt.test_case "test_n_times_success" `Quick test_n_times_success; | ||
Alcotest_lwt.test_case "test_with_sleep" `Quick test_with_sleep; | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
val tests : unit Alcotest_lwt.test_case list |