Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP-51692: use Event.from instead of Event.next #6125

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions ocaml/tests/bench/bench_throttle2.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
open Bechamel

let () =
Suite_init.harness_init () ;
Debug.set_level Syslog.Warning

let __context, _ = Test_event_common.event_setup_common ()

let allocate_tasks n =
( __context
, Array.init n @@ fun i ->
let label = Printf.sprintf "task %d" i in
Xapi_task.create ~__context ~label ~description:"test task"
)

let free_tasks (__context, tasks) =
let () =
tasks |> Array.iter @@ fun self -> Xapi_task.destroy ~__context ~self
in
()

let set_pending tasks =
tasks
|> Array.iter @@ fun self ->
Xapi_task.set_status ~__context ~self ~value:`pending

let run_tasks _n (__context, tasks) =
set_pending tasks ;
let () =
tasks
|> Array.iter @@ fun self ->
Xapi_task.set_status ~__context ~self ~value:`success
in
tasks |> Array.iter @@ fun t -> Helpers.Task.wait_for ~__context ~tasks:[t]

let run_tasks' _n (__context, tasks) =
set_pending tasks ;
let () =
tasks
|> Array.iter @@ fun self ->
Xapi_task.set_status ~__context ~self ~value:`success
in
Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks)

module D = Debug.Make (struct let name = __MODULE__ end)

let run_tasks'' n (__context, tasks) =
set_pending tasks ;
let finished = Atomic.make 0 in
let (t : Thread.t) =
Thread.create
(fun () ->
for _ = 1 to 10 do
Thread.yield ()
done ;
tasks
|> Array.iter @@ fun self ->
Xapi_task.set_status ~__context ~self ~value:`success ;
Atomic.incr finished
)
()
in
Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) ;
let f = Atomic.get finished in
assert (f = n || f = n - 1) ;
Thread.join t

let benchmarks =
Test.make_grouped ~name:"Task latency"
[
Test.make_indexed_with_resource ~name:"task complete+wait latency"
~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks
~free:free_tasks (fun n -> Staged.stage (run_tasks n)
)
; Test.make_indexed_with_resource ~name:"task complete+wait all latency"
~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks
~free:free_tasks (fun n -> Staged.stage (run_tasks' n)
)
; Test.make_indexed_with_resource
~name:"task complete+wait all latency (thread)" ~args:[1; 10; 100]
Test.multiple ~allocate:allocate_tasks ~free:free_tasks (fun n ->
Staged.stage (run_tasks'' n)
)
]

let () = Bechamel_simple_cli.cli benchmarks
4 changes: 2 additions & 2 deletions ocaml/tests/bench/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(executables
(names bench_tracing bench_uuid)
(libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid)
(names bench_tracing bench_uuid bench_throttle2)
(libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid xapi_aux tests_common log xapi_internal)
)
195 changes: 104 additions & 91 deletions ocaml/xapi-cli-server/cli_operations.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2848,8 +2848,6 @@ exception Finished
let event_wait_gen rpc session_id classname record_matches =
(* Immediately register *)
let classes = [classname] in
Client.Event.register ~rpc ~session_id ~classes ;
debug "Registered for events" ;
(* Check to see if the condition is already satisfied - get all objects of whatever class specified... *)
let poll () =
let current_tbls =
Expand Down Expand Up @@ -2930,96 +2928,111 @@ let event_wait_gen rpc session_id classname record_matches =
in
List.exists record_matches all_recs
in
finally
(fun () ->
if not (poll ()) then
try
while true do
try
let events =
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
in
let doevent event =
let tbl =
match Event_helper.record_of_event event with
| Event_helper.VM (r, Some x) ->
let record = vm_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VDI (r, Some x) ->
let record = vdi_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.SR (r, Some x) ->
let record = sr_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Host (r, Some x) ->
let record = host_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Network (r, Some x) ->
let record = net_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VIF (r, Some x) ->
let record = vif_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.PIF (r, Some x) ->
let record = pif_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VBD (r, Some x) ->
let record = vbd_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.PBD (r, Some x) ->
let record = pbd_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Pool (r, Some x) ->
let record = pool_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Task (r, Some x) ->
let record = task_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VMSS (r, Some x) ->
let record = vmss_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Secret (r, Some x) ->
let record = secret_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| _ ->
failwith
("Cli listening for class '"
^ classname
^ "' not currently implemented"
)
in
let record =
List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl
in
if record_matches record then raise Finished
let use_event_next = !Constants.use_event_next in
let run () =
if not (poll ()) then
try
let token = ref "" in
while true do
let events =
if use_event_next then
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
else
let event_from =
Event_types.event_from_of_rpc
(Client.Event.from ~rpc ~session_id ~timeout:30. ~token:!token
~classes
)
in
List.iter doevent
(List.filter (fun e -> e.Event_types.snapshot <> None) events)
with
| Api_errors.Server_error (code, _)
when code = Api_errors.events_lost
->
debug "Got EVENTS_LOST; reregistering" ;
Client.Event.unregister ~rpc ~session_id ~classes ;
Client.Event.register ~rpc ~session_id ~classes ;
if poll () then raise Finished
done
with Finished -> ()
)
(fun () -> Client.Event.unregister ~rpc ~session_id ~classes)
token := event_from.token ;
event_from.events
in
let doevent event =
let tbl =
match Event_helper.record_of_event event with
| Event_helper.VM (r, Some x) ->
let record = vm_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VDI (r, Some x) ->
let record = vdi_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.SR (r, Some x) ->
let record = sr_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Host (r, Some x) ->
let record = host_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Network (r, Some x) ->
let record = net_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VIF (r, Some x) ->
let record = vif_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.PIF (r, Some x) ->
let record = pif_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VBD (r, Some x) ->
let record = vbd_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.PBD (r, Some x) ->
let record = pbd_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Pool (r, Some x) ->
let record = pool_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Task (r, Some x) ->
let record = task_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.VMSS (r, Some x) ->
let record = vmss_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| Event_helper.Secret (r, Some x) ->
let record = secret_record rpc session_id r in
record.setrefrec (r, x) ;
record.fields
| _ ->
failwith
("Cli listening for class '"
^ classname
^ "' not currently implemented"
)
in
let record =
List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl
in
if record_matches record then raise_notrace Finished
in
List.iter doevent
(List.filter (fun e -> e.Event_types.snapshot <> None) events)
done
with
| Api_errors.Server_error (code, _)
when code = Api_errors.events_lost && use_event_next ->
debug "Got EVENTS_LOST; reregistering" ;
Client.Event.unregister ~rpc ~session_id ~classes ;
Client.Event.register ~rpc ~session_id ~classes ;
if poll () then raise Finished
| Finished ->
()
in
if use_event_next then (
Client.Event.register ~rpc ~session_id ~classes ;
debug "Registered for events" ;
finally run (fun () -> Client.Event.unregister ~rpc ~session_id ~classes)
) else
run ()

(* We're done. Unregister and finish *)

Expand Down
28 changes: 24 additions & 4 deletions ocaml/xapi-cli-server/cli_util.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,41 @@ exception Cli_failure of string

(** call [callback task_record] on every update to the task, until it completes or fails *)
let track callback rpc (session_id : API.ref_session) task =
let classes = ["task"] in
let use_event_next = !Constants.use_event_next in
let classes =
if use_event_next then
["task"]
else
[Printf.sprintf "task/%s" (Ref.string_of task)]
in
finally
(fun () ->
let finished = ref false in
while not !finished do
Client.Event.register ~rpc ~session_id ~classes ;
if use_event_next then
Client.Event.register ~rpc ~session_id ~classes ;
try
(* Need to check once after registering to avoid a race *)
finished :=
Client.Task.get_status ~rpc ~session_id ~self:task <> `pending ;
let token = ref "" in
while not !finished do
let events =
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
if use_event_next then
let events =
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
in
List.map Event_helper.record_of_event events
else
let event_from =
Event_types.event_from_of_rpc
(Client.Event.from ~rpc ~session_id ~classes ~token:!token
~timeout:30.
)
in
token := event_from.token ;
List.map Event_helper.record_of_event event_from.events
in
let events = List.map Event_helper.record_of_event events in
List.iter
(function
| Event_helper.Task (t, Some t_rec) when t = task ->
Expand Down
3 changes: 3 additions & 0 deletions ocaml/xapi-consts/constants.ml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ let owner_key = "owner"

(* set in VBD other-config to indicate that clients can delete the attached VDI on VM uninstall if they want.. *)

(* xapi-cli-server doesn't link xapi-globs *)
let use_event_next = ref true

(* the time taken to wait before restarting in a different mode for pool eject/join operations *)
let fuse_time = ref 10.

Expand Down
18 changes: 18 additions & 0 deletions ocaml/xapi-types/event_types.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ let rec rpc_of_event_from e =
; ("token", rpc_of_token e.token)
]

(* xmlrpc and jsonrpc would map Int32 to Int, but int32_of_rpc can't actually parse
an Int32 back as an int32... this is a bug in ocaml-rpc that should be fixed.
meanwhile work it around by mapping Rpc.Int32 to Rpc.Int upon receiving the message
(it is only Rpc.Int32 for backward compat with non-XAPI Xmlrpc clients)
*)

let rec fixup_int32 = function
| Rpc.Dict dict ->
Rpc.Dict (List.map fixup_kv dict)
| Rpc.Int32 i ->
Rpc.Int (Int64.of_int32 i)
| rpc ->
rpc

and fixup_kv (k, v) = (k, fixup_int32 v)

let event_from_of_rpc rpc = rpc |> fixup_int32 |> event_from_of_rpc

(** Return result of an events.from call *)

open Printf
Expand Down
3 changes: 3 additions & 0 deletions ocaml/xapi/taskHelper.mli
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ val set_result : __context:Context.t -> Rpc.t option -> unit

val status_is_completed : [> `cancelled | `failure | `success] -> bool

val status_to_string :
[< `pending | `success | `failure | `cancelling | `cancelled] -> string

val complete : __context:Context.t -> Rpc.t option -> unit

val set_cancellable : __context:Context.t -> unit
Expand Down
Loading
Loading