Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP-52821: use Mtime in Xapi_periodic_scheduler #6161

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
(public_name xapi-stdext-threads.scheduler)
(name xapi_stdext_threads_scheduler)
(modules ipq scheduler)
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads)
(libraries mtime mtime.clock.os threads.posix unix xapi-log xapi-stdext-threads clock)
)

(tests
Expand Down
14 changes: 7 additions & 7 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*)
(* Imperative priority queue *)

type 'a event = {ev: 'a; time: Mtime.t}
type 'a event = {ev: 'a; time: Mtime.span}

type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array}

Expand All @@ -23,7 +23,7 @@ let create n default =
if n <= 0 then
invalid_arg "create"
else
let default = {ev= default; time= Mtime_clock.now ()} in
let default = {ev= default; time= Mtime_clock.elapsed ()} in
{default; size= 0; data= Array.make n default}

let is_empty h = h.size <= 0
Expand All @@ -45,7 +45,7 @@ let add h x =
(* moving [x] up in the heap *)
let rec moveup i =
let fi = (i - 1) / 2 in
if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then (
if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then (
d.(i) <- d.(fi) ;
moveup fi
) else
Expand All @@ -69,7 +69,7 @@ let remove h s =
(* moving [x] up in the heap *)
let rec moveup i =
let fi = (i - 1) / 2 in
if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then (
if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then (
d.(i) <- d.(fi) ;
moveup fi
) else
Expand All @@ -83,7 +83,7 @@ let remove h s =
let j' = j + 1 in
if j' < n && d.(j').time < d.(j).time then j' else j
in
if Mtime.is_earlier d.(j).time ~than:x.time then (
if Mtime.Span.is_shorter d.(j).time ~than:x.time then (
d.(i) <- d.(j) ;
movedown j
) else
Expand All @@ -93,7 +93,7 @@ let remove h s =
in
if s = n then
()
else if Mtime.is_later d.(s).time ~than:x.time then
else if Mtime.Span.is_longer d.(s).time ~than:x.time then
moveup s
else
movedown s ;
Expand Down Expand Up @@ -129,7 +129,7 @@ let check h =
let d = h.data in
for i = 1 to h.size - 1 do
let fi = (i - 1) / 2 in
let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in
let ordered = Mtime.Span.is_longer d.(i).time ~than:d.(fi).time in
assert ordered
done

Expand Down
2 changes: 1 addition & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* GNU Lesser General Public License for more details.
*)

type 'a event = {ev: 'a; time: Mtime.t}
type 'a event = {ev: 'a; time: Mtime.span}

type 'a t

Expand Down
14 changes: 7 additions & 7 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module Ipq = Xapi_stdext_threads_scheduler.Ipq
(* test we get "out of bound" exception calling Ipq.remove *)
let test_out_of_index () =
let q = Ipq.create 10 0 in
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ;
let is_oob = function
| Invalid_argument s when String.ends_with ~suffix:" out of bounds" s ->
true
Expand All @@ -43,18 +43,18 @@ let test_leak () =
let use_array () = array.(0) <- 'a' in
let allocated = Atomic.make true in
Gc.finalise (fun _ -> Atomic.set allocated false) array ;
Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ;
Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.elapsed ()} ;
Ipq.remove q 0 ;
Gc.full_major () ;
Gc.full_major () ;
Alcotest.(check bool) "allocated" false (Atomic.get allocated) ;
Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()}
Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.elapsed ()}

(* test Ipq.is_empty call *)
let test_empty () =
let q = Ipq.create 10 0 in
Alcotest.(check bool) "same value" true (Ipq.is_empty q) ;
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ;
Alcotest.(check bool) "same value" false (Ipq.is_empty q) ;
Ipq.remove q 0 ;
Alcotest.(check bool) "same value" true (Ipq.is_empty q)
Expand All @@ -75,7 +75,7 @@ let set queue =
Ipq.iter
(fun d ->
let t = d.time in
let t = Mtime.to_uint64_ns t in
let t = Mtime.Span.to_uint64_ns t in
s := Int64Set.add t !s
)
queue ;
Expand All @@ -86,7 +86,7 @@ let test_old () =
let s = ref Int64Set.empty in
let add i =
let ti = Random.int64 1000000L in
let t = Mtime.of_uint64_ns ti in
let t = Mtime.Span.of_uint64_ns ti in
let e = {Ipq.time= t; Ipq.ev= i} in
Ipq.add test e ;
s := Int64Set.add ti !s
Expand Down Expand Up @@ -123,7 +123,7 @@ let test_old () =
let prev = ref 0L in
for _ = 0 to 49 do
let e = Ipq.pop_maximum test in
let t = Mtime.to_uint64_ns e.time in
let t = Mtime.Span.to_uint64_ns e.time in
Alcotest.(check bool)
(Printf.sprintf "%Ld bigger than %Ld" t !prev)
true (t >= !prev) ;
Expand Down
49 changes: 23 additions & 26 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,23 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default

let lock = Mutex.create ()

module Clock = struct
let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9))

let span_to_s span =
Mtime.Span.to_uint64_ns span |> Int64.to_float |> fun ns -> ns /. 1e9

let add_span clock secs =
(* return mix or max available value if the add overflows *)
match Mtime.add_span clock (span secs) with
| Some t ->
t
| None when secs > 0. ->
Mtime.max_stamp
| None ->
Mtime.min_stamp
end

let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let add_to_queue_span name ty start_span newfunc =
let ( ++ ) = Mtime.Span.add in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
{
Ipq.ev= {func= newfunc; ty; name}
; Ipq.time= Mtime_clock.elapsed () ++ start_span
}
in
with_lock lock (fun () -> Ipq.add queue item) ;
Delay.signal delay

let add_to_queue name ty start newfunc =
let start_span =
Clock.Timer.s_to_span start |> Option.value ~default:Mtime.Span.max_span
in
add_to_queue_span name ty start_span newfunc

let remove_from_queue name =
with_lock lock @@ fun () ->
match !pending_event with
Expand All @@ -72,8 +64,11 @@ let add_periodic_pending () =
with_lock lock @@ fun () ->
match !pending_event with
| Some ({ty= Periodic timer; _} as ev) ->
let ( ++ ) = Clock.add_span in
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
let ( ++ ) = Mtime.Span.add in
let delta =
Clock.Timer.s_to_span timer |> Option.value ~default:Mtime.Span.max_span
in
let item = {Ipq.ev; Ipq.time= Mtime_clock.elapsed () ++ delta} in
Ipq.add queue item ;
pending_event := None
| Some {ty= OneShot; _} ->
Expand All @@ -85,15 +80,15 @@ let loop () =
debug "%s started" __MODULE__ ;
try
while true do
let now = Mtime_clock.now () in
let now = Mtime_clock.elapsed () in
let deadline, item =
with_lock lock @@ fun () ->
(* empty: wait till we get something *)
if Ipq.is_empty queue then
(Clock.add_span now 10.0, None)
(Mtime.Span.add now Mtime.Span.(10 * s), None)
else
let next = Ipq.maximum queue in
if Mtime.is_later next.Ipq.time ~than:now then
if Mtime.Span.is_longer next.Ipq.time ~than:now then
(* not expired: wait till time or interrupted *)
(next.Ipq.time, None)
else (
Expand All @@ -111,7 +106,9 @@ let loop () =
| None -> (
(* Sleep until next event. *)
let sleep =
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
Mtime.(Span.abs_diff deadline now)
|> Mtime.Span.(add ms)
|> Clock.Timer.span_to_s
in
try ignore (Delay.wait delay sleep)
with e ->
Expand Down
4 changes: 4 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type func_ty =
| OneShot (** Fire just once *)
| Periodic of float (** Fire periodically with a given period in seconds *)

val add_to_queue_span :
string -> func_ty -> Mtime.span -> (unit -> unit) -> unit
(** Start a new timer. *)

val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
(** Start a new timer. *)

Expand Down
38 changes: 24 additions & 14 deletions ocaml/xapi/xapi_event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,25 @@ module From = struct

let session_is_invalid call = with_lock call.m (fun () -> call.session_invalid)

let wait2 call from_id deadline =
let wait2 call from_id timer =
let timeoutname = Printf.sprintf "event_from_timeout_%Ld" call.index in
with_lock m (fun () ->
while
from_id = call.cur_id
&& (not (session_is_invalid call))
&& Unix.gettimeofday () < deadline
&& not (Clock.Timer.has_expired timer)
do
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue timeoutname
Xapi_stdext_threads_scheduler.Scheduler.OneShot
(deadline -. Unix.gettimeofday () +. 0.5)
(fun () -> Condition.broadcast c) ;
Condition.wait c m ;
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue timeoutname
match Clock.Timer.remaining timer with
| Expired _ ->
()
| Remaining delta ->
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue_span
timeoutname Xapi_stdext_threads_scheduler.Scheduler.OneShot
delta (fun () -> Condition.broadcast c
) ;
Condition.wait c m ;
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue
timeoutname
done
) ;
if session_is_invalid call then (
Expand Down Expand Up @@ -506,7 +511,7 @@ let rec next ~__context =
else
rpc_of_events relevant

let from_inner __context session subs from from_t deadline =
let from_inner __context session subs from from_t timer =
let open Xapi_database in
let open From in
(* The database tables involved in our subscription *)
Expand Down Expand Up @@ -605,14 +610,14 @@ let from_inner __context session subs from from_t deadline =
&& mods = []
&& deletes = []
&& messages = []
&& Unix.gettimeofday () < deadline
&& not (Clock.Timer.has_expired timer)
then (
last_generation := last ;
(* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
sub.cur_id <- last ;
(* last id the client got is equivalent to the current one *)
last_msg_gen := msg_gen ;
wait2 sub last deadline ;
wait2 sub last timer ;
Thread.delay 0.05 ;
grab_nonempty_range ()
) else
Expand Down Expand Up @@ -705,14 +710,19 @@ let from ~__context ~classes ~token ~timeout =
)
in
let subs = List.map Subscription.of_string classes in
let deadline = Unix.gettimeofday () +. timeout in
let duration =
timeout
|> Clock.Timer.s_to_span
|> Option.value ~default:Mtime.Span.(24 * hour)
in
let timer = Clock.Timer.start ~duration in
(* We need to iterate because it's possible for an empty event set
to be generated if we peek in-between a Modify and a Delete; we'll
miss the Delete event and fail to generate the Modify because the
snapshot can't be taken. *)
let rec loop () =
let event_from = from_inner __context session subs from from_t deadline in
if event_from.events = [] && Unix.gettimeofday () < deadline then (
let event_from = from_inner __context session subs from from_t timer in
if event_from.events = [] && not (Clock.Timer.has_expired timer) then (
debug "suppressing empty event.from" ;
loop ()
) else
Expand Down
Loading