From 3c400a66700b2f36ad5fe4b3d5aeb0489632fddd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 21 Apr 2024 20:43:49 +0100 Subject: [PATCH] CP-51692: feat(use-event-next): xe event-wait: use Event.from instead of Event.next MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Feature flag: use-event-next Signed-off-by: Edwin Török --- ocaml/xapi-cli-server/cli_operations.ml | 195 +++++++++++++----------- 1 file changed, 104 insertions(+), 91 deletions(-) diff --git a/ocaml/xapi-cli-server/cli_operations.ml b/ocaml/xapi-cli-server/cli_operations.ml index 1e8ba0f3b3..5870e6e41b 100644 --- a/ocaml/xapi-cli-server/cli_operations.ml +++ b/ocaml/xapi-cli-server/cli_operations.ml @@ -2848,8 +2848,6 @@ exception Finished let event_wait_gen rpc session_id classname record_matches = (* Immediately register *) let classes = [classname] in - Client.Event.register ~rpc ~session_id ~classes ; - debug "Registered for events" ; (* Check to see if the condition is already satisfied - get all objects of whatever class specified... *) let poll () = let current_tbls = @@ -2930,96 +2928,111 @@ let event_wait_gen rpc session_id classname record_matches = in List.exists record_matches all_recs in - finally - (fun () -> - if not (poll ()) then - try - while true do - try - let events = - Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) - in - let doevent event = - let tbl = - match Event_helper.record_of_event event with - | Event_helper.VM (r, Some x) -> - let record = vm_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VDI (r, Some x) -> - let record = vdi_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.SR (r, Some x) -> - let record = sr_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Host (r, Some x) -> - let record = host_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Network (r, Some x) -> - let record = net_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VIF (r, Some x) -> - let record = vif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PIF (r, Some x) -> - let record = pif_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VBD (r, Some x) -> - let record = vbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.PBD (r, Some x) -> - let record = pbd_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Pool (r, Some x) -> - let record = pool_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Task (r, Some x) -> - let record = task_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.VMSS (r, Some x) -> - let record = vmss_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | Event_helper.Secret (r, Some x) -> - let record = secret_record rpc session_id r in - record.setrefrec (r, x) ; - record.fields - | _ -> - failwith - ("Cli listening for class '" - ^ classname - ^ "' not currently implemented" - ) - in - let record = - List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl - in - if record_matches record then raise Finished + let use_event_next = !Constants.cli_use_event_next in + let run () = + if not (poll ()) then + try + let token = ref "" in + while true do + let events = + if use_event_next then + Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id) + else + let event_from = + Event_types.event_from_of_rpc + (Client.Event.from ~rpc ~session_id ~timeout:30. ~token:!token + ~classes + ) in - List.iter doevent - (List.filter (fun e -> e.Event_types.snapshot <> None) events) - with - | Api_errors.Server_error (code, _) - when code = Api_errors.events_lost - -> - debug "Got EVENTS_LOST; reregistering" ; - Client.Event.unregister ~rpc ~session_id ~classes ; - Client.Event.register ~rpc ~session_id ~classes ; - if poll () then raise Finished - done - with Finished -> () - ) - (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + token := event_from.token ; + event_from.events + in + let doevent event = + let tbl = + match Event_helper.record_of_event event with + | Event_helper.VM (r, Some x) -> + let record = vm_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VDI (r, Some x) -> + let record = vdi_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.SR (r, Some x) -> + let record = sr_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Host (r, Some x) -> + let record = host_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Network (r, Some x) -> + let record = net_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VIF (r, Some x) -> + let record = vif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PIF (r, Some x) -> + let record = pif_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VBD (r, Some x) -> + let record = vbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.PBD (r, Some x) -> + let record = pbd_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Pool (r, Some x) -> + let record = pool_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Task (r, Some x) -> + let record = task_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.VMSS (r, Some x) -> + let record = vmss_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | Event_helper.Secret (r, Some x) -> + let record = secret_record rpc session_id r in + record.setrefrec (r, x) ; + record.fields + | _ -> + failwith + ("Cli listening for class '" + ^ classname + ^ "' not currently implemented" + ) + in + let record = + List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl + in + if record_matches record then raise_notrace Finished + in + List.iter doevent + (List.filter (fun e -> e.Event_types.snapshot <> None) events) + done + with + | Api_errors.Server_error (code, _) + when code = Api_errors.events_lost && use_event_next -> + debug "Got EVENTS_LOST; reregistering" ; + Client.Event.unregister ~rpc ~session_id ~classes ; + Client.Event.register ~rpc ~session_id ~classes ; + if poll () then raise Finished + | Finished -> + () + in + if use_event_next then ( + Client.Event.register ~rpc ~session_id ~classes ; + debug "Registered for events" ; + finally run (fun () -> Client.Event.unregister ~rpc ~session_id ~classes) + ) else + run () (* We're done. Unregister and finish *)