Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update feature/perf again #6173

Merged
merged 15 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(public_name xapi-stdext-threads)
(name xapi_stdext_threads)
(modules :standard \ ipq scheduler threadext_test ipq_test)
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
(libraries
mtime
mtime.clock.os
Expand All @@ -22,8 +22,8 @@
)

(tests
(names threadext_test ipq_test)
(names threadext_test ipq_test scheduler_test)
(package xapi-stdext-threads)
(modules threadext_test ipq_test)
(modules threadext_test ipq_test scheduler_test)
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
)
120 changes: 70 additions & 50 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ let delay = Delay.make ()

let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""}

let (pending_event : t option ref) = ref None

let (queue : t Ipq.t) = Ipq.create 50 queue_default

let lock = Mutex.create ()
Expand All @@ -48,66 +50,84 @@ module Clock = struct
Mtime.min_stamp
end

let add_to_queue ?(signal = true) name ty start newfunc =
with_lock lock (fun () ->
let ( ++ ) = Clock.add_span in
Ipq.add queue
{
Ipq.ev= {func= newfunc; ty; name}
; Ipq.time= Mtime_clock.now () ++ start
}
) ;
if signal then Delay.signal delay
let add_to_queue name ty start newfunc =
let ( ++ ) = Clock.add_span in
let item =
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
in
with_lock lock (fun () -> Ipq.add queue item) ;
Delay.signal delay

let remove_from_queue name =
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index

let wait_next sleep =
try ignore (Delay.wait delay sleep)
with e ->
let detailed_msg =
match e with
| Unix.Unix_error (code, _, _) ->
Unix.error_message code
| _ ->
"unknown error"
in
error
"Could not schedule interruptable delay (%s). Falling back to normal \
delay. New events may be missed."
detailed_msg ;
Thread.delay sleep
with_lock lock @@ fun () ->
match !pending_event with
| Some ev when ev.name = name ->
pending_event := None
| Some _ | None ->
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
if index > -1 then
Ipq.remove queue index

let add_periodic_pending () =
with_lock lock @@ fun () ->
match !pending_event with
| Some ({ty= Periodic timer; _} as ev) ->
let ( ++ ) = Clock.add_span in
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
Ipq.add queue item ;
pending_event := None
| Some {ty= OneShot; _} ->
pending_event := None
| None ->
()

let loop () =
debug "%s started" __MODULE__ ;
try
while true do
let empty = with_lock lock (fun () -> Ipq.is_empty queue) in
if empty then
wait_next 10.0
(* Doesn't happen often - the queue isn't usually empty *)
else
let next = with_lock lock (fun () -> Ipq.maximum queue) in
let now = Mtime_clock.now () in
if next.Ipq.time < now then (
let todo =
(with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev
in
let now = Mtime_clock.now () in
let deadline, item =
with_lock lock @@ fun () ->
(* empty: wait till we get something *)
if Ipq.is_empty queue then
(Clock.add_span now 10.0, None)
else
let next = Ipq.maximum queue in
if Mtime.is_later next.Ipq.time ~than:now then
(* not expired: wait till time or interrupted *)
(next.Ipq.time, None)
else (
(* remove expired item *)
Ipq.pop_maximum queue |> ignore ;
(* save periodic to be scheduled again *)
if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ;
(now, Some next.Ipq.ev)
)
in
match item with
| Some todo ->
(try todo.func () with _ -> ()) ;
match todo.ty with
| OneShot ->
()
| Periodic timer ->
add_to_queue ~signal:false todo.name todo.ty timer todo.func
) else (* Sleep until next event. *)
add_periodic_pending ()
| None -> (
(* Sleep until next event. *)
let sleep =
Mtime.(span next.Ipq.time now)
|> Mtime.Span.(add ms)
|> Clock.span_to_s
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
in
wait_next sleep
try ignore (Delay.wait delay sleep)
with e ->
let detailed_msg =
match e with
| Unix.Unix_error (code, _, _) ->
Unix.error_message code
| _ ->
"unknown error"
in
error
"Could not schedule interruptable delay (%s). Falling back to \
normal delay. New events may be missed."
detailed_msg ;
Thread.delay sleep
)
done
with _ ->
error
Expand Down
3 changes: 1 addition & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ type func_ty =
| OneShot (** Fire just once *)
| Periodic of float (** Fire periodically with a given period in seconds *)

val add_to_queue :
?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit
val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
(** Start a new timer. *)

val remove_from_queue : string -> unit
Expand Down
103 changes: 103 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
(*
* Copyright (C) 2024 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

let started = Atomic.make false

let start_schedule () =
if not (Atomic.exchange started true) then
Thread.create Scheduler.loop () |> ignore

let send event data = Event.(send event data |> sync)

let receive event = Event.(receive event |> sync)

let elapsed_ms cnt =
let elapsed_ns = Mtime_clock.count cnt |> Mtime.Span.to_uint64_ns in
Int64.(div elapsed_ns 1000000L |> to_int)

let is_less = Alcotest.(testable (pp int)) Stdlib.( > )

let test_single () =
let finished = Event.new_channel () in
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "result" true (receive finished)

let test_remove_self () =
let which = Event.new_channel () in
Scheduler.add_to_queue "self" (Scheduler.Periodic 0.001) 0.001 (fun () ->
(* this should remove the periodic scheduling *)
Scheduler.remove_from_queue "self" ;
(* add an operation to stop the test *)
Scheduler.add_to_queue "stop" Scheduler.OneShot 0.1 (fun () ->
send which "stop"
) ;
send which "self"
) ;
start_schedule () ;
let cnt = Mtime_clock.counter () in
Alcotest.(check string) "same event name" "self" (receive which) ;
Alcotest.(check string) "same event name" "stop" (receive which) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 300 elapsed_ms

let test_empty () =
let finished = Event.new_channel () in
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
start_schedule () ;
Alcotest.(check bool) "finished" true (receive finished) ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
Scheduler.add_to_queue "two" Scheduler.OneShot 0.001 (fun () ->
send finished true
) ;
let cnt = Mtime_clock.counter () in
Alcotest.(check bool) "finished" true (receive finished) ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 100 elapsed_ms

let test_wakeup () =
let which = Event.new_channel () in
(* schedule a long event *)
Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () ->
send which "long"
) ;
start_schedule () ;
(* wait loop to go to wait with no work to do *)
Thread.delay 0.1 ;
let cnt = Mtime_clock.counter () in
(* schedule a quick event, should wake up the loop *)
Scheduler.add_to_queue "quick" Scheduler.OneShot 0.1 (fun () ->
send which "quick"
) ;
Alcotest.(check string) "same event name" "quick" (receive which) ;
Scheduler.remove_from_queue "long" ;
let elapsed_ms = elapsed_ms cnt in
Alcotest.check is_less "small time" 150 elapsed_ms

let tests =
[
("test_single", `Quick, test_single)
; ("test_remove_self", `Quick, test_remove_self)
; ("test_empty", `Quick, test_empty)
; ("test_wakeup", `Quick, test_wakeup)
]

let () = Alcotest.run "Scheduler" [("generic", tests)]
Empty file.
15 changes: 14 additions & 1 deletion ocaml/xapi/storage_smapiv1_wrapper.ml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ functor
List.fold_left perform_one vdi_t ops

let perform_nolock context ~dbg ~dp ~sr ~vdi ~vm this_op =
debug "%s dp=%s, sr=%s, vdi=%s, vm=%s, op=%s" __FUNCTION__ dp
(s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm)
(Vdi_automaton.string_of_op this_op) ;
match Host.find sr !Host.host with
| None ->
raise (Storage_error (Sr_not_attached (s_of_sr sr)))
Expand All @@ -473,6 +476,15 @@ functor
superstate to superstate'. These may fail: if so we revert the
datapath+VDI state to the most appropriate value. *)
let ops = Vdi_automaton.( - ) superstate superstate' in
debug "%s %s -> %s: %s" __FUNCTION__
(Vdi_automaton.string_of_state superstate)
(Vdi_automaton.string_of_state superstate')
(String.concat ", "
(List.map
(fun (op, _) -> Vdi_automaton.string_of_op op)
ops
)
) ;
side_effects context dbg dp sr sr_t vdi vdi_t vm ops
with e ->
let e =
Expand Down Expand Up @@ -529,7 +541,8 @@ functor
)
with e ->
if not allow_leak then (
ignore (Vdi.add_leaked dp vdi_t) ;
Sr.add_or_replace vdi (Vdi.add_leaked dp vdi_t) sr_t ;
Everything.to_file !host_state_path (Everything.make ()) ;
raise e
) else (
(* allow_leak means we can forget this dp *)
Expand Down
23 changes: 21 additions & 2 deletions ocaml/xapi/xapi_sr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -778,15 +778,34 @@ let scan ~__context ~sr =
Db.VDI.get_records_where ~__context
~expr:(Eq (Field "SR", Literal sr'))
in
(* It is sufficient to just compare the refs in two db_vdis, as this
is what update_vdis uses to determine what to delete *)
let vdis_ref_equal db_vdi1 db_vdi2 =
Listext.List.set_difference (List.map fst db_vdi1)
(List.map fst db_vdi2)
= []
in
let db_vdis_before = find_vdis () in
let vs, sr_info =
C.SR.scan2 (Ref.string_of task)
(Storage_interface.Sr.of_string sr_uuid)
in
let db_vdis_after = find_vdis () in
if limit > 0 && db_vdis_after <> db_vdis_before then
if limit > 0 && not (vdis_ref_equal db_vdis_before db_vdis_after)
then (
debug
"%s detected db change while scanning, before scan vdis %s, \
after scan vdis %s, retry limit left %d"
__FUNCTION__
(List.map (fun (_, v) -> v.vDI_uuid) db_vdis_before
|> String.concat ","
)
(List.map (fun (_, v) -> v.vDI_uuid) db_vdis_after
|> String.concat ","
)
limit ;
(scan_rec [@tailcall]) (limit - 1)
else if limit = 0 then
) else if limit = 0 then
raise
(Api_errors.Server_error
(Api_errors.internal_error, ["SR.scan retry limit exceeded"])
Expand Down
Loading