Skip to content

Commit

Permalink
Update feature/perf again (xapi-project#6173)
Browse files Browse the repository at this point in the history
Can't solve the conflicts on feature/perf due to branch restrictions,
have to open PR from another branch.
  • Loading branch information
edwintorok authored Dec 12, 2024
2 parents ce5abab + 1ac3f07 commit 0b34302
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 58 deletions.
6 changes: 3 additions & 3 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(public_name xapi-stdext-threads)
(name xapi_stdext_threads)
(modules :standard \ ipq scheduler threadext_test ipq_test)
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
(libraries
mtime
mtime.clock.os
Expand All @@ -22,8 +22,8 @@
)

(tests
(names threadext_test ipq_test)
(names threadext_test ipq_test scheduler_test)
(package xapi-stdext-threads)
(modules threadext_test ipq_test)
(modules threadext_test ipq_test scheduler_test)
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
)
120 changes: 70 additions & 50 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ let delay = Delay.make ()

let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""}

let (pending_event : t option ref) = ref None

let (queue : t Ipq.t) = Ipq.create 50 queue_default

let lock = Mutex.create ()
Expand All @@ -48,66 +50,84 @@ module Clock = struct
Mtime.min_stamp
end

let add_to_queue ?(signal = true) name ty start newfunc =
with_lock lock (fun () ->
let ( ++ ) = Clock.add_span in
Ipq.add queue
{
Ipq.ev= {func= newfunc; ty; name}
; Ipq.time= Mtime_clock.now () ++ start
}
) ;
if signal then Delay.signal delay
let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
in
with_lock lock (fun () -> Ipq.add queue item) ;
Delay.signal delay

let remove_from_queue name =
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index

let wait_next sleep =
try ignore (Delay.wait delay sleep)
with e ->
let detailed_msg =
match e with
| Unix.Unix_error (code, _, _) ->
Unix.error_message code
| _ ->
"unknown error"
in
error
"Could not schedule interruptable delay (%s). Falling back to normal \
delay. New events may be missed."
detailed_msg ;
Thread.delay sleep
with_lock lock @@ fun () ->
match !pending_event with
| Some ev when ev.name = name ->
pending_event := None
| Some _ | None ->
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index

let add_periodic_pending () =
with_lock lock @@ fun () ->
match !pending_event with
| Some ({ty= Periodic timer; _} as ev) ->
let ( ++ ) = Clock.add_span in
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
Ipq.add queue item ;
pending_event := None
| Some {ty= OneShot; _} ->
pending_event := None
| None ->
()

let loop () =
debug "%s started" __MODULE__ ;
try
while true do
let empty = with_lock lock (fun () -> Ipq.is_empty queue) in
if empty then
wait_next 10.0
(* Doesn't happen often - the queue isn't usually empty *)
else
let next = with_lock lock (fun () -> Ipq.maximum queue) in
let now = Mtime_clock.now () in
if next.Ipq.time < now then (
let todo =
(with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev
in
let now = Mtime_clock.now () in
let deadline, item =
with_lock lock @@ fun () ->
(* empty: wait till we get something *)
if Ipq.is_empty queue then
(Clock.add_span now 10.0, None)
else
let next = Ipq.maximum queue in
if Mtime.is_later next.Ipq.time ~than:now then
(* not expired: wait till time or interrupted *)
(next.Ipq.time, None)
else (
(* remove expired item *)
Ipq.pop_maximum queue |> ignore ;
(* save periodic to be scheduled again *)
if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ;
(now, Some next.Ipq.ev)
)
in
match item with
| Some todo ->
(try todo.func () with _ -> ()) ;
match todo.ty with
| OneShot ->
()
| Periodic timer ->
add_to_queue ~signal:false todo.name todo.ty timer todo.func
) else (* Sleep until next event. *)
add_periodic_pending ()
| None -> (
(* Sleep until next event. *)
let sleep =
Mtime.(span next.Ipq.time now)
|> Mtime.Span.(add ms)
|> Clock.span_to_s
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
in
wait_next sleep
try ignore (Delay.wait delay sleep)
with e ->
let detailed_msg =
match e with
| Unix.Unix_error (code, _, _) ->
Unix.error_message code
| _ ->
"unknown error"
in
error
"Could not schedule interruptable delay (%s). Falling back to \
normal delay. New events may be missed."
detailed_msg ;
Thread.delay sleep
)
done
with _ ->
error
Expand Down
3 changes: 1 addition & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ type func_ty =
| OneShot (** Fire just once *)
| Periodic of float (** Fire periodically with a given period in seconds *)

val add_to_queue :
?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit
val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
(** Start a new timer. *)

val remove_from_queue : string -> unit
Expand Down
103 changes: 103 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
(*
* Copyright (C) 2024 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

let started = Atomic.make false

let start_schedule () =
if not (Atomic.exchange started true) then
Thread.create Scheduler.loop () |> ignore

let send event data = Event.(send event data |> sync)

let receive event = Event.(receive event |> sync)

let elapsed_ms cnt =
let elapsed_ns = Mtime_clock.count cnt |> Mtime.Span.to_uint64_ns in
Int64.(div elapsed_ns 1000000L |> to_int)

let is_less = Alcotest.(testable (pp int)) Stdlib.( > )

let test_single () =
let finished = Event.new_channel () in
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "result" true (receive finished)

let test_remove_self () =
let which = Event.new_channel () in
Scheduler.add_to_queue "self" (Scheduler.Periodic 0.001) 0.001 (fun () ->
(* this should remove the periodic scheduling *)
Scheduler.remove_from_queue "self" ;
(* add an operation to stop the test *)
Scheduler.add_to_queue "stop" Scheduler.OneShot 0.1 (fun () ->
send which "stop"
) ;
send which "self"
) ;
start_schedule () ;
let cnt = Mtime_clock.counter () in
Alcotest.(check string) "same event name" "self" (receive which) ;
Alcotest.(check string) "same event name" "stop" (receive which) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 300 elapsed_ms

let test_empty () =
let finished = Event.new_channel () in
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "finished" true (receive finished) ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
Scheduler.add_to_queue "two" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
let cnt = Mtime_clock.counter () in
Alcotest.(check bool) "finished" true (receive finished) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 100 elapsed_ms

let test_wakeup () =
let which = Event.new_channel () in
(* schedule a long event *)
Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () ->
send which "long"
) ;
start_schedule () ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
let cnt = Mtime_clock.counter () in
(* schedule a quick event, should wake up the loop *)
Scheduler.add_to_queue "quick" Scheduler.OneShot 0.1 (fun () ->
send which "quick"
) ;
Alcotest.(check string) "same event name" "quick" (receive which) ;
Scheduler.remove_from_queue "long" ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 150 elapsed_ms

let tests =
[
("test_single", `Quick, test_single)
; ("test_remove_self", `Quick, test_remove_self)
; ("test_empty", `Quick, test_empty)
; ("test_wakeup", `Quick, test_wakeup)
]

let () = Alcotest.run "Scheduler" [("generic", tests)]
Empty file.
15 changes: 14 additions & 1 deletion ocaml/xapi/storage_smapiv1_wrapper.ml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ functor
List.fold_left perform_one vdi_t ops

let perform_nolock context ~dbg ~dp ~sr ~vdi ~vm this_op =
debug "%s dp=%s, sr=%s, vdi=%s, vm=%s, op=%s" __FUNCTION__ dp
(s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm)
(Vdi_automaton.string_of_op this_op) ;
match Host.find sr !Host.host with
| None ->
raise (Storage_error (Sr_not_attached (s_of_sr sr)))
Expand All @@ -473,6 +476,15 @@ functor
superstate to superstate'. These may fail: if so we revert the
datapath+VDI state to the most appropriate value. *)
let ops = Vdi_automaton.( - ) superstate superstate' in
debug "%s %s -> %s: %s" __FUNCTION__
(Vdi_automaton.string_of_state superstate)
(Vdi_automaton.string_of_state superstate')
(String.concat ", "
(List.map
(fun (op, _) -> Vdi_automaton.string_of_op op)
ops
)
) ;
side_effects context dbg dp sr sr_t vdi vdi_t vm ops
with e ->
let e =
Expand Down Expand Up @@ -529,7 +541,8 @@ functor
)
with e ->
if not allow_leak then (
ignore (Vdi.add_leaked dp vdi_t) ;
Sr.add_or_replace vdi (Vdi.add_leaked dp vdi_t) sr_t ;
Everything.to_file !host_state_path (Everything.make ()) ;
raise e
) else (
(* allow_leak means we can forget this dp *)
Expand Down
23 changes: 21 additions & 2 deletions ocaml/xapi/xapi_sr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -778,15 +778,34 @@ let scan ~__context ~sr =
Db.VDI.get_records_where ~__context
~expr:(Eq (Field "SR", Literal sr'))
in
(* It is sufficient to just compare the refs in two db_vdis, as this
is what update_vdis uses to determine what to delete *)
let vdis_ref_equal db_vdi1 db_vdi2 =
Listext.List.set_difference (List.map fst db_vdi1)
(List.map fst db_vdi2)
= []
in
let db_vdis_before = find_vdis () in
let vs, sr_info =
C.SR.scan2 (Ref.string_of task)
(Storage_interface.Sr.of_string sr_uuid)
in
let db_vdis_after = find_vdis () in
if limit > 0 && db_vdis_after <> db_vdis_before then
if limit > 0 && not (vdis_ref_equal db_vdis_before db_vdis_after)
then (
debug
"%s detected db change while scanning, before scan vdis %s, \
after scan vdis %s, retry limit left %d"
__FUNCTION__
(List.map (fun (_, v) -> v.vDI_uuid) db_vdis_before
|> String.concat ","
)
(List.map (fun (_, v) -> v.vDI_uuid) db_vdis_after
|> String.concat ","
)
limit ;
(scan_rec [@tailcall]) (limit - 1)
else if limit = 0 then
) else if limit = 0 then
raise
(Api_errors.Server_error
(Api_errors.internal_error, ["SR.scan retry limit exceeded"])
Expand Down

0 comments on commit 0b34302

Please sign in to comment.