diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index d8036380cd..5d61f52cfc 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -18,7 +18,7 @@ (public_name xapi-stdext-threads.scheduler) (name xapi_stdext_threads_scheduler) (modules ipq scheduler) - (libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads) + (libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock) ) (tests diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml index 4cf29ed3d9..7293ae625e 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml @@ -13,7 +13,7 @@ *) (* Imperative priority queue *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array} @@ -23,7 +23,7 @@ let create n default = if n <= 0 then invalid_arg "create" else - let default = {ev= default; time= Mtime_clock.now ()} in + let default = {ev= default; time= Mtime_clock.elapsed ()} in {default; size= 0; data= Array.make n default} let is_empty h = h.size <= 0 @@ -45,7 +45,7 @@ let add h x = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -69,7 +69,7 @@ let remove h s = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -83,7 +83,7 @@ let remove h s = let j' = j + 1 in if j' < n && d.(j').time < d.(j).time then j' else j in - if Mtime.is_earlier d.(j).time ~than:x.time then ( + if Mtime.Span.is_shorter d.(j).time ~than:x.time then ( d.(i) <- d.(j) ; movedown j ) else @@ -93,7 +93,7 @@ let remove h s = in if s = n then () - else if Mtime.is_later d.(s).time ~than:x.time then + else if Mtime.Span.is_longer d.(s).time ~than:x.time then moveup s else movedown s ; @@ -129,7 +129,7 @@ let check h = let d = h.data in for i = 1 to h.size - 1 do let fi = (i - 1) / 2 in - let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in + let ordered = Mtime.Span.is_longer d.(i).time ~than:d.(fi).time in assert ordered done diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli index b7c4974e64..19f8bf1e33 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli @@ -12,7 +12,7 @@ * GNU Lesser General Public License for more details. *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml index e8e64093e1..a9cc2611da 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml @@ -17,7 +17,7 @@ module Ipq = Xapi_stdext_threads_scheduler.Ipq (* test we get "out of bound" exception calling Ipq.remove *) let test_out_of_index () = let q = Ipq.create 10 0 in - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; let is_oob = function | Invalid_argument s when String.ends_with ~suffix:" out of bounds" s -> true @@ -43,18 +43,18 @@ let test_leak () = let use_array () = array.(0) <- 'a' in let allocated = Atomic.make true in Gc.finalise (fun _ -> Atomic.set allocated false) array ; - Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.elapsed ()} ; Ipq.remove q 0 ; Gc.full_major () ; Gc.full_major () ; Alcotest.(check bool) "allocated" false (Atomic.get allocated) ; - Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()} + Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.elapsed ()} (* test Ipq.is_empty call *) let test_empty () = let q = Ipq.create 10 0 in Alcotest.(check bool) "same value" true (Ipq.is_empty q) ; - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; Alcotest.(check bool) "same value" false (Ipq.is_empty q) ; Ipq.remove q 0 ; Alcotest.(check bool) "same value" true (Ipq.is_empty q) @@ -75,7 +75,7 @@ let set queue = Ipq.iter (fun d -> let t = d.time in - let t = Mtime.to_uint64_ns t in + let t = Mtime.Span.to_uint64_ns t in s := Int64Set.add t !s ) queue ; @@ -86,7 +86,7 @@ let test_old () = let s = ref Int64Set.empty in let add i = let ti = Random.int64 1000000L in - let t = Mtime.of_uint64_ns ti in + let t = Mtime.Span.of_uint64_ns ti in let e = {Ipq.time= t; Ipq.ev= i} in Ipq.add test e ; s := Int64Set.add ti !s @@ -123,7 +123,7 @@ let test_old () = let prev = ref 0L in for _ = 0 to 49 do let e = Ipq.pop_maximum test in - let t = Mtime.to_uint64_ns e.time in + let t = Mtime.Span.to_uint64_ns e.time in Alcotest.(check bool) (Printf.sprintf "%Ld bigger than %Ld" t !prev) true (t >= !prev) ; diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index a544ed79bb..27cf306995 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -33,31 +33,23 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default let lock = Mutex.create () -module Clock = struct - let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9)) - - let span_to_s span = - Mtime.Span.to_uint64_ns span |> Int64.to_float |> fun ns -> ns /. 1e9 - - let add_span clock secs = - (* return mix or max available value if the add overflows *) - match Mtime.add_span clock (span secs) with - | Some t -> - t - | None when secs > 0. -> - Mtime.max_stamp - | None -> - Mtime.min_stamp -end - -let add_to_queue name ty start newfunc = - let ( ++ ) = Clock.add_span in +let add_to_queue_span name ty start_span newfunc = + let ( ++ ) = Mtime.Span.add in let item = - {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} + { + Ipq.ev= {func= newfunc; ty; name} + ; Ipq.time= Mtime_clock.elapsed () ++ start_span + } in with_lock lock (fun () -> Ipq.add queue item) ; Delay.signal delay +let add_to_queue name ty start newfunc = + let start_span = + Clock.Timer.s_to_span start |> Option.value ~default:Mtime.Span.max_span + in + add_to_queue_span name ty start_span newfunc + let remove_from_queue name = with_lock lock @@ fun () -> match !pending_event with @@ -72,8 +64,11 @@ let add_periodic_pending () = with_lock lock @@ fun () -> match !pending_event with | Some ({ty= Periodic timer; _} as ev) -> - let ( ++ ) = Clock.add_span in - let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in + let ( ++ ) = Mtime.Span.add in + let delta = + Clock.Timer.s_to_span timer |> Option.value ~default:Mtime.Span.max_span + in + let item = {Ipq.ev; Ipq.time= Mtime_clock.elapsed () ++ delta} in Ipq.add queue item ; pending_event := None | Some {ty= OneShot; _} -> @@ -85,15 +80,15 @@ let loop () = debug "%s started" __MODULE__ ; try while true do - let now = Mtime_clock.now () in + let now = Mtime_clock.elapsed () in let deadline, item = with_lock lock @@ fun () -> (* empty: wait till we get something *) if Ipq.is_empty queue then - (Clock.add_span now 10.0, None) + (Mtime.Span.add now Mtime.Span.(10 * s), None) else let next = Ipq.maximum queue in - if Mtime.is_later next.Ipq.time ~than:now then + if Mtime.Span.is_longer next.Ipq.time ~than:now then (* not expired: wait till time or interrupted *) (next.Ipq.time, None) else ( @@ -111,7 +106,9 @@ let loop () = | None -> ( (* Sleep until next event. *) let sleep = - Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s + Mtime.(Span.abs_diff deadline now) + |> Mtime.Span.(add ms) + |> Clock.Timer.span_to_s in try ignore (Delay.wait delay sleep) with e -> diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli index d4d19b1f79..53a7c764dc 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli @@ -18,6 +18,10 @@ type func_ty = | OneShot (** Fire just once *) | Periodic of float (** Fire periodically with a given period in seconds *) +val add_to_queue_span : + string -> func_ty -> Mtime.span -> (unit -> unit) -> unit +(** Start a new timer. *) + val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit (** Start a new timer. *) diff --git a/ocaml/xapi/xapi_event.ml b/ocaml/xapi/xapi_event.ml index af2a610523..94b7c4bd9b 100644 --- a/ocaml/xapi/xapi_event.ml +++ b/ocaml/xapi/xapi_event.ml @@ -419,20 +419,25 @@ module From = struct let session_is_invalid call = with_lock call.m (fun () -> call.session_invalid) - let wait2 call from_id deadline = + let wait2 call from_id timer = let timeoutname = Printf.sprintf "event_from_timeout_%Ld" call.index in with_lock m (fun () -> while from_id = call.cur_id && (not (session_is_invalid call)) - && Unix.gettimeofday () < deadline + && not (Clock.Timer.has_expired timer) do - Xapi_stdext_threads_scheduler.Scheduler.add_to_queue timeoutname - Xapi_stdext_threads_scheduler.Scheduler.OneShot - (deadline -. Unix.gettimeofday () +. 0.5) - (fun () -> Condition.broadcast c) ; - Condition.wait c m ; - Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue timeoutname + match Clock.Timer.remaining timer with + | Expired _ -> + () + | Remaining delta -> + Xapi_stdext_threads_scheduler.Scheduler.add_to_queue_span + timeoutname Xapi_stdext_threads_scheduler.Scheduler.OneShot + delta (fun () -> Condition.broadcast c + ) ; + Condition.wait c m ; + Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue + timeoutname done ) ; if session_is_invalid call then ( @@ -506,7 +511,7 @@ let rec next ~__context = else rpc_of_events relevant -let from_inner __context session subs from from_t deadline = +let from_inner __context session subs from from_t timer = let open Xapi_database in let open From in (* The database tables involved in our subscription *) @@ -605,14 +610,14 @@ let from_inner __context session subs from from_t deadline = && mods = [] && deletes = [] && messages = [] - && Unix.gettimeofday () < deadline + && not (Clock.Timer.has_expired timer) then ( last_generation := last ; (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *) sub.cur_id <- last ; (* last id the client got is equivalent to the current one *) last_msg_gen := msg_gen ; - wait2 sub last deadline ; + wait2 sub last timer ; Thread.delay 0.05 ; grab_nonempty_range () ) else @@ -705,14 +710,19 @@ let from ~__context ~classes ~token ~timeout = ) in let subs = List.map Subscription.of_string classes in - let deadline = Unix.gettimeofday () +. timeout in + let duration = + timeout + |> Clock.Timer.s_to_span + |> Option.value ~default:Mtime.Span.(24 * hour) + in + let timer = Clock.Timer.start ~duration in (* We need to iterate because it's possible for an empty event set to be generated if we peek in-between a Modify and a Delete; we'll miss the Delete event and fail to generate the Modify because the snapshot can't be taken. *) let rec loop () = - let event_from = from_inner __context session subs from from_t deadline in - if event_from.events = [] && Unix.gettimeofday () < deadline then ( + let event_from = from_inner __context session subs from from_t timer in + if event_from.events = [] && not (Clock.Timer.has_expired timer) then ( debug "suppressing empty event.from" ; loop () ) else