From de10750fc5a165a680bae77f8b994e557201994f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 5 Dec 2024 17:01:25 +0000 Subject: [PATCH] CP-52821: xapi_periodic_scheduler: use Mtime.span instead of Mtime.t MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids dealing with overflow Signed-off-by: Edwin Török --- .../lib/xapi-stdext-threads/ipq.ml | 14 ++++++------- .../lib/xapi-stdext-threads/ipq.mli | 2 +- .../lib/xapi-stdext-threads/ipq_test.ml | 14 ++++++------- .../lib/xapi-stdext-threads/scheduler.ml | 20 ++++++++----------- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml index 4cf29ed3d9..7293ae625e 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml @@ -13,7 +13,7 @@ *) (* Imperative priority queue *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array} @@ -23,7 +23,7 @@ let create n default = if n <= 0 then invalid_arg "create" else - let default = {ev= default; time= Mtime_clock.now ()} in + let default = {ev= default; time= Mtime_clock.elapsed ()} in {default; size= 0; data= Array.make n default} let is_empty h = h.size <= 0 @@ -45,7 +45,7 @@ let add h x = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -69,7 +69,7 @@ let remove h s = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -83,7 +83,7 @@ let remove h s = let j' = j + 1 in if j' < n && d.(j').time < d.(j).time then j' else j in - if Mtime.is_earlier d.(j).time ~than:x.time then ( + if Mtime.Span.is_shorter d.(j).time ~than:x.time then ( d.(i) <- d.(j) ; movedown j ) else @@ -93,7 +93,7 @@ let remove h s = in if s = n then () - else if Mtime.is_later d.(s).time ~than:x.time then + else if Mtime.Span.is_longer d.(s).time ~than:x.time then moveup s else movedown s ; @@ -129,7 +129,7 @@ let check h = let d = h.data in for i = 1 to h.size - 1 do let fi = (i - 1) / 2 in - let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in + let ordered = Mtime.Span.is_longer d.(i).time ~than:d.(fi).time in assert ordered done diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli index b7c4974e64..19f8bf1e33 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli @@ -12,7 +12,7 @@ * GNU Lesser General Public License for more details. *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml index e8e64093e1..a9cc2611da 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml @@ -17,7 +17,7 @@ module Ipq = Xapi_stdext_threads_scheduler.Ipq (* test we get "out of bound" exception calling Ipq.remove *) let test_out_of_index () = let q = Ipq.create 10 0 in - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; let is_oob = function | Invalid_argument s when String.ends_with ~suffix:" out of bounds" s -> true @@ -43,18 +43,18 @@ let test_leak () = let use_array () = array.(0) <- 'a' in let allocated = Atomic.make true in Gc.finalise (fun _ -> Atomic.set allocated false) array ; - Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.elapsed ()} ; Ipq.remove q 0 ; Gc.full_major () ; Gc.full_major () ; Alcotest.(check bool) "allocated" false (Atomic.get allocated) ; - Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()} + Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.elapsed ()} (* test Ipq.is_empty call *) let test_empty () = let q = Ipq.create 10 0 in Alcotest.(check bool) "same value" true (Ipq.is_empty q) ; - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; Alcotest.(check bool) "same value" false (Ipq.is_empty q) ; Ipq.remove q 0 ; Alcotest.(check bool) "same value" true (Ipq.is_empty q) @@ -75,7 +75,7 @@ let set queue = Ipq.iter (fun d -> let t = d.time in - let t = Mtime.to_uint64_ns t in + let t = Mtime.Span.to_uint64_ns t in s := Int64Set.add t !s ) queue ; @@ -86,7 +86,7 @@ let test_old () = let s = ref Int64Set.empty in let add i = let ti = Random.int64 1000000L in - let t = Mtime.of_uint64_ns ti in + let t = Mtime.Span.of_uint64_ns ti in let e = {Ipq.time= t; Ipq.ev= i} in Ipq.add test e ; s := Int64Set.add ti !s @@ -123,7 +123,7 @@ let test_old () = let prev = ref 0L in for _ = 0 to 49 do let e = Ipq.pop_maximum test in - let t = Mtime.to_uint64_ns e.time in + let t = Mtime.Span.to_uint64_ns e.time in Alcotest.(check bool) (Printf.sprintf "%Ld bigger than %Ld" t !prev) true (t >= !prev) ; diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 8332e0897a..27cf306995 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -33,16 +33,12 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default let lock = Mutex.create () -let add_span clock span = - (* return max value if the add overflows: spans are unsigned integers *) - match Mtime.add_span clock span with Some t -> t | None -> Mtime.max_stamp - let add_to_queue_span name ty start_span newfunc = - let ( ++ ) = add_span in + let ( ++ ) = Mtime.Span.add in let item = { Ipq.ev= {func= newfunc; ty; name} - ; Ipq.time= Mtime_clock.now () ++ start_span + ; Ipq.time= Mtime_clock.elapsed () ++ start_span } in with_lock lock (fun () -> Ipq.add queue item) ; @@ -68,11 +64,11 @@ let add_periodic_pending () = with_lock lock @@ fun () -> match !pending_event with | Some ({ty= Periodic timer; _} as ev) -> - let ( ++ ) = add_span in + let ( ++ ) = Mtime.Span.add in let delta = Clock.Timer.s_to_span timer |> Option.value ~default:Mtime.Span.max_span in - let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ delta} in + let item = {Ipq.ev; Ipq.time= Mtime_clock.elapsed () ++ delta} in Ipq.add queue item ; pending_event := None | Some {ty= OneShot; _} -> @@ -84,15 +80,15 @@ let loop () = debug "%s started" __MODULE__ ; try while true do - let now = Mtime_clock.now () in + let now = Mtime_clock.elapsed () in let deadline, item = with_lock lock @@ fun () -> (* empty: wait till we get something *) if Ipq.is_empty queue then - (add_span now Mtime.Span.(10 * s), None) + (Mtime.Span.add now Mtime.Span.(10 * s), None) else let next = Ipq.maximum queue in - if Mtime.is_later next.Ipq.time ~than:now then + if Mtime.Span.is_longer next.Ipq.time ~than:now then (* not expired: wait till time or interrupted *) (next.Ipq.time, None) else ( @@ -110,7 +106,7 @@ let loop () = | None -> ( (* Sleep until next event. *) let sleep = - Mtime.(span deadline now) + Mtime.(Span.abs_diff deadline now) |> Mtime.Span.(add ms) |> Clock.Timer.span_to_s in