diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index 7fcff9e08c..d8036380cd 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -1,7 +1,7 @@ (library (public_name xapi-stdext-threads) (name xapi_stdext_threads) - (modules :standard \ ipq scheduler threadext_test ipq_test) + (modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test) (libraries mtime mtime.clock.os @@ -22,8 +22,8 @@ ) (tests - (names threadext_test ipq_test) + (names threadext_test ipq_test scheduler_test) (package xapi-stdext-threads) - (modules threadext_test ipq_test) + (modules threadext_test ipq_test scheduler_test) (libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler) ) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 2a4c2c5df1..a544ed79bb 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -27,6 +27,8 @@ let delay = Delay.make () let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""} +let (pending_event : t option ref) = ref None + let (queue : t Ipq.t) = Ipq.create 50 queue_default let lock = Mutex.create () @@ -48,66 +50,84 @@ module Clock = struct Mtime.min_stamp end -let add_to_queue ?(signal = true) name ty start newfunc = - with_lock lock (fun () -> - let ( ++ ) = Clock.add_span in - Ipq.add queue - { - Ipq.ev= {func= newfunc; ty; name} - ; Ipq.time= Mtime_clock.now () ++ start - } - ) ; - if signal then Delay.signal delay +let add_to_queue name ty start newfunc = + let ( ++ ) = Clock.add_span in + let item = + {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} + in + with_lock lock (fun () -> Ipq.add queue item) ; + Delay.signal delay let remove_from_queue name = - let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in - if index > -1 then - Ipq.remove queue index - -let wait_next sleep = - try ignore (Delay.wait delay sleep) - with e -> - let detailed_msg = - match e with - | Unix.Unix_error (code, _, _) -> - Unix.error_message code - | _ -> - "unknown error" - in - error - "Could not schedule interruptable delay (%s). Falling back to normal \ - delay. New events may be missed." - detailed_msg ; - Thread.delay sleep + with_lock lock @@ fun () -> + match !pending_event with + | Some ev when ev.name = name -> + pending_event := None + | Some _ | None -> + let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in + if index > -1 then + Ipq.remove queue index + +let add_periodic_pending () = + with_lock lock @@ fun () -> + match !pending_event with + | Some ({ty= Periodic timer; _} as ev) -> + let ( ++ ) = Clock.add_span in + let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in + Ipq.add queue item ; + pending_event := None + | Some {ty= OneShot; _} -> + pending_event := None + | None -> + () let loop () = debug "%s started" __MODULE__ ; try while true do - let empty = with_lock lock (fun () -> Ipq.is_empty queue) in - if empty then - wait_next 10.0 - (* Doesn't happen often - the queue isn't usually empty *) - else - let next = with_lock lock (fun () -> Ipq.maximum queue) in - let now = Mtime_clock.now () in - if next.Ipq.time < now then ( - let todo = - (with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev - in + let now = Mtime_clock.now () in + let deadline, item = + with_lock lock @@ fun () -> + (* empty: wait till we get something *) + if Ipq.is_empty queue then + (Clock.add_span now 10.0, None) + else + let next = Ipq.maximum queue in + if Mtime.is_later next.Ipq.time ~than:now then + (* not expired: wait till time or interrupted *) + (next.Ipq.time, None) + else ( + (* remove expired item *) + Ipq.pop_maximum queue |> ignore ; + (* save periodic to be scheduled again *) + if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ; + (now, Some next.Ipq.ev) + ) + in + match item with + | Some todo -> (try todo.func () with _ -> ()) ; - match todo.ty with - | OneShot -> - () - | Periodic timer -> - add_to_queue ~signal:false todo.name todo.ty timer todo.func - ) else (* Sleep until next event. *) + add_periodic_pending () + | None -> ( + (* Sleep until next event. *) let sleep = - Mtime.(span next.Ipq.time now) - |> Mtime.Span.(add ms) - |> Clock.span_to_s + Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s in - wait_next sleep + try ignore (Delay.wait delay sleep) + with e -> + let detailed_msg = + match e with + | Unix.Unix_error (code, _, _) -> + Unix.error_message code + | _ -> + "unknown error" + in + error + "Could not schedule interruptable delay (%s). Falling back to \ + normal delay. New events may be missed." + detailed_msg ; + Thread.delay sleep + ) done with _ -> error diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli index b087a35c5c..d4d19b1f79 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli @@ -18,8 +18,7 @@ type func_ty = | OneShot (** Fire just once *) | Periodic of float (** Fire periodically with a given period in seconds *) -val add_to_queue : - ?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit +val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit (** Start a new timer. *) val remove_from_queue : string -> unit diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml new file mode 100644 index 0000000000..0a4a847403 --- /dev/null +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml @@ -0,0 +1,103 @@ +(* + * Copyright (C) 2024 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module Scheduler = Xapi_stdext_threads_scheduler.Scheduler + +let started = Atomic.make false + +let start_schedule () = + if not (Atomic.exchange started true) then + Thread.create Scheduler.loop () |> ignore + +let send event data = Event.(send event data |> sync) + +let receive event = Event.(receive event |> sync) + +let elapsed_ms cnt = + let elapsed_ns = Mtime_clock.count cnt |> Mtime.Span.to_uint64_ns in + Int64.(div elapsed_ns 1000000L |> to_int) + +let is_less = Alcotest.(testable (pp int)) Stdlib.( > ) + +let test_single () = + let finished = Event.new_channel () in + Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () -> + send finished true + ) ; + start_schedule () ; + Alcotest.(check bool) "result" true (receive finished) + +let test_remove_self () = + let which = Event.new_channel () in + Scheduler.add_to_queue "self" (Scheduler.Periodic 0.001) 0.001 (fun () -> + (* this should remove the periodic scheduling *) + Scheduler.remove_from_queue "self" ; + (* add an operation to stop the test *) + Scheduler.add_to_queue "stop" Scheduler.OneShot 0.1 (fun () -> + send which "stop" + ) ; + send which "self" + ) ; + start_schedule () ; + let cnt = Mtime_clock.counter () in + Alcotest.(check string) "same event name" "self" (receive which) ; + Alcotest.(check string) "same event name" "stop" (receive which) ; + let elapsed_ms = elapsed_ms cnt in + Alcotest.check is_less "small time" 300 elapsed_ms + +let test_empty () = + let finished = Event.new_channel () in + Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () -> + send finished true + ) ; + start_schedule () ; + Alcotest.(check bool) "finished" true (receive finished) ; + (* wait loop to go to wait with no work to do *) + Thread.delay 0.1 ; + Scheduler.add_to_queue "two" Scheduler.OneShot 0.001 (fun () -> + send finished true + ) ; + let cnt = Mtime_clock.counter () in + Alcotest.(check bool) "finished" true (receive finished) ; + let elapsed_ms = elapsed_ms cnt in + Alcotest.check is_less "small time" 100 elapsed_ms + +let test_wakeup () = + let which = Event.new_channel () in + (* schedule a long event *) + Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () -> + send which "long" + ) ; + start_schedule () ; + (* wait loop to go to wait with no work to do *) + Thread.delay 0.1 ; + let cnt = Mtime_clock.counter () in + (* schedule a quick event, should wake up the loop *) + Scheduler.add_to_queue "quick" Scheduler.OneShot 0.1 (fun () -> + send which "quick" + ) ; + Alcotest.(check string) "same event name" "quick" (receive which) ; + Scheduler.remove_from_queue "long" ; + let elapsed_ms = elapsed_ms cnt in + Alcotest.check is_less "small time" 150 elapsed_ms + +let tests = + [ + ("test_single", `Quick, test_single) + ; ("test_remove_self", `Quick, test_remove_self) + ; ("test_empty", `Quick, test_empty) + ; ("test_wakeup", `Quick, test_wakeup) + ] + +let () = Alcotest.run "Scheduler" [("generic", tests)] diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.mli new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ocaml/xapi/storage_smapiv1_wrapper.ml b/ocaml/xapi/storage_smapiv1_wrapper.ml index ae1f21f72f..55067efd9d 100644 --- a/ocaml/xapi/storage_smapiv1_wrapper.ml +++ b/ocaml/xapi/storage_smapiv1_wrapper.ml @@ -453,6 +453,9 @@ functor List.fold_left perform_one vdi_t ops let perform_nolock context ~dbg ~dp ~sr ~vdi ~vm this_op = + debug "%s dp=%s, sr=%s, vdi=%s, vm=%s, op=%s" __FUNCTION__ dp + (s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm) + (Vdi_automaton.string_of_op this_op) ; match Host.find sr !Host.host with | None -> raise (Storage_error (Sr_not_attached (s_of_sr sr))) @@ -473,6 +476,15 @@ functor superstate to superstate'. These may fail: if so we revert the datapath+VDI state to the most appropriate value. *) let ops = Vdi_automaton.( - ) superstate superstate' in + debug "%s %s -> %s: %s" __FUNCTION__ + (Vdi_automaton.string_of_state superstate) + (Vdi_automaton.string_of_state superstate') + (String.concat ", " + (List.map + (fun (op, _) -> Vdi_automaton.string_of_op op) + ops + ) + ) ; side_effects context dbg dp sr sr_t vdi vdi_t vm ops with e -> let e = @@ -529,7 +541,8 @@ functor ) with e -> if not allow_leak then ( - ignore (Vdi.add_leaked dp vdi_t) ; + Sr.add_or_replace vdi (Vdi.add_leaked dp vdi_t) sr_t ; + Everything.to_file !host_state_path (Everything.make ()) ; raise e ) else ( (* allow_leak means we can forget this dp *) diff --git a/ocaml/xapi/xapi_sr.ml b/ocaml/xapi/xapi_sr.ml index 12ab2bef92..a40a644ba0 100644 --- a/ocaml/xapi/xapi_sr.ml +++ b/ocaml/xapi/xapi_sr.ml @@ -778,15 +778,34 @@ let scan ~__context ~sr = Db.VDI.get_records_where ~__context ~expr:(Eq (Field "SR", Literal sr')) in + (* It is sufficient to just compare the refs in two db_vdis, as this + is what update_vdis uses to determine what to delete *) + let vdis_ref_equal db_vdi1 db_vdi2 = + Listext.List.set_difference (List.map fst db_vdi1) + (List.map fst db_vdi2) + = [] + in let db_vdis_before = find_vdis () in let vs, sr_info = C.SR.scan2 (Ref.string_of task) (Storage_interface.Sr.of_string sr_uuid) in let db_vdis_after = find_vdis () in - if limit > 0 && db_vdis_after <> db_vdis_before then + if limit > 0 && not (vdis_ref_equal db_vdis_before db_vdis_after) + then ( + debug + "%s detected db change while scanning, before scan vdis %s, \ + after scan vdis %s, retry limit left %d" + __FUNCTION__ + (List.map (fun (_, v) -> v.vDI_uuid) db_vdis_before + |> String.concat "," + ) + (List.map (fun (_, v) -> v.vDI_uuid) db_vdis_after + |> String.concat "," + ) + limit ; (scan_rec [@tailcall]) (limit - 1) - else if limit = 0 then + ) else if limit = 0 then raise (Api_errors.Server_error (Api_errors.internal_error, ["SR.scan retry limit exceeded"])