From 601154563fedaac60c68df7aa03b5b13d058a555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 20:18:39 +0100 Subject: [PATCH 1/5] CP-49158: [prep] Add Task completion latency benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No functional change. Signed-off-by: Edwin Török --- ocaml/tests/bench/bench_throttle2.ml | 86 ++++++++++++++++++++++++++++ ocaml/tests/bench/dune | 4 +- ocaml/xapi/taskHelper.mli | 3 + 3 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 ocaml/tests/bench/bench_throttle2.ml diff --git a/ocaml/tests/bench/bench_throttle2.ml b/ocaml/tests/bench/bench_throttle2.ml new file mode 100644 index 00000000000..50582eff4cc --- /dev/null +++ b/ocaml/tests/bench/bench_throttle2.ml @@ -0,0 +1,86 @@ +open Bechamel + +let () = + Suite_init.harness_init () ; + Debug.set_level Syslog.Warning + +let __context, _ = Test_event_common.event_setup_common () + +let allocate_tasks n = + ( __context + , Array.init n @@ fun i -> + let label = Printf.sprintf "task %d" i in + Xapi_task.create ~__context ~label ~description:"test task" + ) + +let free_tasks (__context, tasks) = + let () = + tasks |> Array.iter @@ fun self -> Xapi_task.destroy ~__context ~self + in + () + +let set_pending tasks = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`pending + +let run_tasks _n (__context, tasks) = + set_pending tasks ; + let () = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success + in + tasks |> Array.iter @@ fun t -> Helpers.Task.wait_for ~__context ~tasks:[t] + +let run_tasks' _n (__context, tasks) = + set_pending tasks ; + let () = + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success + in + Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) + +module D = Debug.Make (struct let name = __MODULE__ end) + +let run_tasks'' n (__context, tasks) = + set_pending tasks ; + let finished = Atomic.make 0 in + let (t : Thread.t) = + Thread.create + (fun () -> + for _ = 1 to 10 do + Thread.yield () + done ; + tasks + |> Array.iter @@ fun self -> + Xapi_task.set_status ~__context ~self ~value:`success ; + Atomic.incr finished + ) + () + in + Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) ; + let f = Atomic.get finished in + assert (f = n || f = n - 1) ; + Thread.join t + +let benchmarks = + Test.make_grouped ~name:"Task latency" + [ + Test.make_indexed_with_resource ~name:"task complete+wait latency" + ~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks + ~free:free_tasks (fun n -> Staged.stage (run_tasks n) + ) + ; Test.make_indexed_with_resource ~name:"task complete+wait all latency" + ~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks + ~free:free_tasks (fun n -> Staged.stage (run_tasks' n) + ) + ; Test.make_indexed_with_resource + ~name:"task complete+wait all latency (thread)" ~args:[1; 10; 100] + Test.multiple ~allocate:allocate_tasks ~free:free_tasks (fun n -> + Staged.stage (run_tasks'' n) + ) + ] + +let () = Bechamel_simple_cli.cli benchmarks diff --git a/ocaml/tests/bench/dune b/ocaml/tests/bench/dune index dcd61813e1e..10cffadb857 100644 --- a/ocaml/tests/bench/dune +++ b/ocaml/tests/bench/dune @@ -1,4 +1,4 @@ (executables - (names bench_tracing bench_uuid) - (libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid) + (names bench_tracing bench_uuid bench_throttle2) + (libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid xapi_aux tests_common log xapi_internal) ) diff --git a/ocaml/xapi/taskHelper.mli b/ocaml/xapi/taskHelper.mli index dc5d76cf65b..1c4d5381586 100644 --- a/ocaml/xapi/taskHelper.mli +++ b/ocaml/xapi/taskHelper.mli @@ -36,6 +36,9 @@ val set_result : __context:Context.t -> Rpc.t option -> unit val status_is_completed : [> `cancelled | `failure | `success] -> bool +val status_to_string : + [< `pending | `success | `failure | `cancelling | `cancelled] -> string + val complete : __context:Context.t -> Rpc.t option -> unit val set_cancellable : __context:Context.t -> unit From f713c79f0fc4827c4f4ade4ee4c66881351f6548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 20:38:19 +0100 Subject: [PATCH 2/5] CP-51692: feat(use-event-next): introduce use-event-next configuration flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is more efficient: we can watch a single task, instead of everything in the DB. Feature-flag: use-event-next No functional change. Signed-off-by: Edwin Török --- ocaml/xapi-consts/constants.ml | 3 +++ ocaml/xapi/xapi_globs.ml | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/ocaml/xapi-consts/constants.ml b/ocaml/xapi-consts/constants.ml index 2c7fc49e179..d3ee0bf8531 100644 --- a/ocaml/xapi-consts/constants.ml +++ b/ocaml/xapi-consts/constants.ml @@ -275,6 +275,9 @@ let owner_key = "owner" (* set in VBD other-config to indicate that clients can delete the attached VDI on VM uninstall if they want.. *) +(* xapi-cli-server doesn't link xapi-globs *) +let use_event_next = ref true + (* the time taken to wait before restarting in a different mode for pool eject/join operations *) let fuse_time = ref 10. diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index efdcabfbdb6..0c061731924 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -1395,6 +1395,11 @@ let other_options = , (fun () -> string_of_bool !Db_globs.idempotent_map) , "True if the add_to_ API calls should be idempotent" ) + ; ( "use-event-next" + , Arg.Set Constants.use_event_next + , (fun () -> string_of_bool !Constants.use_event_next) + , "Use deprecated Event.next instead of Event.from" + ) ; ( "nvidia_multi_vgpu_enabled_driver_versions" , Arg.String (fun x -> From 3a36ed99e9a00272f5c39855a98315da46e70f9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 20:38:19 +0100 Subject: [PATCH 3/5] CP-52625: workaround Rpc.Int32 parsing bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit int32_of_rpc doesn't accept Int32 as input, just Int because none of the current deserializers actually produce an Int32. (Int32 is only used by serializers to emit something different). This is an upstream ocaml-rpc bug that should be fixed, meanwhile convert Rpc.Int32 to Rpc.Int, so that the 'fake_rpc' inside XAPI can use Event.from. Otherwise you get this error: ``` Expected int32, got 'I32(0) ``` Signed-off-by: Edwin Török --- ocaml/xapi-types/event_types.ml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ocaml/xapi-types/event_types.ml b/ocaml/xapi-types/event_types.ml index fcd8840e59f..83c82b0bc8d 100644 --- a/ocaml/xapi-types/event_types.ml +++ b/ocaml/xapi-types/event_types.ml @@ -77,6 +77,24 @@ let rec rpc_of_event_from e = ; ("token", rpc_of_token e.token) ] +(* xmlrpc and jsonrpc would map Int32 to Int, but int32_of_rpc can't actually parse + an Int32 back as an int32... this is a bug in ocaml-rpc that should be fixed. + meanwhile work it around by mapping Rpc.Int32 to Rpc.Int upon receiving the message + (it is only Rpc.Int32 for backward compat with non-XAPI Xmlrpc clients) +*) + +let rec fixup_int32 = function + | Rpc.Dict dict -> + Rpc.Dict (List.map fixup_kv dict) + | Rpc.Int32 i -> + Rpc.Int (Int64.of_int32 i) + | rpc -> + rpc + +and fixup_kv (k, v) = (k, fixup_int32 v) + +let event_from_of_rpc rpc = rpc |> fixup_int32 |> event_from_of_rpc + (** Return result of an events.from call *) open Printf From e40c1ae0938b7c337d740c101bc644c75642f16a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 20:38:19 +0100 Subject: [PATCH 4/5] CP-51692: feat(use-event-next): cli_util: use Event.from instead of Event.next MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is more efficient: we can watch a single task, instead of everything in the DB. Feature-flag: use-event-next Signed-off-by: Edwin Török --- ocaml/xapi-cli-server/cli_util.ml | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/ocaml/xapi-cli-server/cli_util.ml b/ocaml/xapi-cli-server/cli_util.ml index 48fd9392ef5..75c4f30360f 100644 --- a/ocaml/xapi-cli-server/cli_util.ml +++ b/ocaml/xapi-cli-server/cli_util.ml @@ -42,21 +42,41 @@ exception Cli_failure of string (** call [callback task_record] on every update to the task, until it completes or fails *) let track callback rpc (session_id : API.ref_session) task = - let classes = ["task"] in + let use_event_next = !Constants.use_event_next in + let classes = + if use_event_next then + ["task"] + else + [Printf.sprintf "task/%s" (Ref.string_of task)] + in finally (fun () -> let finished = ref false in while not !finished do - Client.Event.register ~rpc ~session_id ~classes ; + if use_event_next then + Client.Event.register ~rpc ~session_id ~classes ; try (* Need to check once after registering to avoid a race *) finished := Client.Task.get_status ~rpc ~session_id ~self:task <> `pending ; + let token = ref "" in while not !finished do let events = - Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + if use_event_next then + let events = + Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + in + List.map Event_helper.record_of_event events + else + let event_from = + Event_types.event_from_of_rpc + (Client.Event.from ~rpc ~session_id ~classes ~token:!token + ~timeout:30. + ) + in + token := event_from.token ; + List.map Event_helper.record_of_event event_from.events in - let events = List.map Event_helper.record_of_event events in List.iter (function | Event_helper.Task (t, Some t_rec) when t = task -> From ace50ae4506fa2bbe771c6b5ff0ad4653d329821 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 20:43:49 +0100 Subject: [PATCH 5/5] CP-51692: feat(use-event-next): xe event-wait: use Event.from instead of Event.next MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Feature flag: use-event-next Signed-off-by: Edwin Török --- ocaml/xapi-cli-server/cli_operations.ml | 195 +++++++++++++----------- 1 file changed, 104 insertions(+), 91 deletions(-) diff --git a/ocaml/xapi-cli-server/cli_operations.ml b/ocaml/xapi-cli-server/cli_operations.ml index 1e8ba0f3b37..4f61e843140 100644 --- a/ocaml/xapi-cli-server/cli_operations.ml +++ b/ocaml/xapi-cli-server/cli_operations.ml @@ -2848,8 +2848,6 @@ exception Finished let event_wait_gen rpc session_id classname record_matches = (* Immediately register *) let classes = [classname] in - Client.Event.register ~rpc ~session_id ~classes ; - debug "Registered for events" ; (* Check to see if the condition is already satisfied - get all objects of whatever class specified... *) let poll () = let current_tbls = @@ -2930,96 +2928,111 @@ let event_wait_gen rpc session_id classname record_matches = in List.exists record_matches all_recs in - finally - (fun () -> - if not (poll ()) then - try - while true do - try - let events = - Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) - in - let doevent event = - let tbl = - match Event_helper.record_of_event event with - | Event_helper.VM (r, Some x) -> - let record = vm_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VDI (r, Some x) -> - let record = vdi_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.SR (r, Some x) -> - let record = sr_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Host (r, Some x) -> - let record = host_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Network (r, Some x) -> - let record = net_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VIF (r, Some x) -> - let record = vif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PIF (r, Some x) -> - let record = pif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VBD (r, Some x) -> - let record = vbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PBD (r, Some x) -> - let record = pbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Pool (r, Some x) -> - let record = pool_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Task (r, Some x) -> - let record = task_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VMSS (r, Some x) -> - let record = vmss_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Secret (r, Some x) -> - let record = secret_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | _ -> - failwith - ("Cli listening for class '" - ^ classname - ^ "' not currently implemented" - ) - in - let record = - List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl - in - if record_matches record then raise Finished + let use_event_next = !Constants.use_event_next in + let run () = + if not (poll ()) then + try + let token = ref "" in + while true do + let events = + if use_event_next then + Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + else + let event_from = + Event_types.event_from_of_rpc + (Client.Event.from ~rpc ~session_id ~timeout:30. ~token:!token + ~classes + ) in - List.iter doevent - (List.filter (fun e -> e.Event_types.snapshot <> None) events) - with - | Api_errors.Server_error (code, _) - when code = Api_errors.events_lost - -> - debug "Got EVENTS_LOST; reregistering" ; - Client.Event.unregister ~rpc ~session_id ~classes ; - Client.Event.register ~rpc ~session_id ~classes ; - if poll () then raise Finished - done - with Finished -> () - ) - (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + token := event_from.token ; + event_from.events + in + let doevent event = + let tbl = + match Event_helper.record_of_event event with + | Event_helper.VM (r, Some x) -> + let record = vm_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VDI (r, Some x) -> + let record = vdi_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.SR (r, Some x) -> + let record = sr_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Host (r, Some x) -> + let record = host_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Network (r, Some x) -> + let record = net_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VIF (r, Some x) -> + let record = vif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PIF (r, Some x) -> + let record = pif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VBD (r, Some x) -> + let record = vbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PBD (r, Some x) -> + let record = pbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Pool (r, Some x) -> + let record = pool_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Task (r, Some x) -> + let record = task_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VMSS (r, Some x) -> + let record = vmss_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Secret (r, Some x) -> + let record = secret_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | _ -> + failwith + ("Cli listening for class '" + ^ classname + ^ "' not currently implemented" + ) + in + let record = + List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl + in + if record_matches record then raise_notrace Finished + in + List.iter doevent + (List.filter (fun e -> e.Event_types.snapshot <> None) events) + done + with + | Api_errors.Server_error (code, _) + when code = Api_errors.events_lost && use_event_next -> + debug "Got EVENTS_LOST; reregistering" ; + Client.Event.unregister ~rpc ~session_id ~classes ; + Client.Event.register ~rpc ~session_id ~classes ; + if poll () then raise Finished + | Finished -> + () + in + if use_event_next then ( + Client.Event.register ~rpc ~session_id ~classes ; + debug "Registered for events" ; + finally run (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + ) else + run () (* We're done. Unregister and finish *)