diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index cfa55fde2c7..355bf175527 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -126,6 +126,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout | Error error -> D.warn "Error occurred during Cluster.create. Shutting down cluster daemon" ; + Xapi_clustering.Watcher.signal_exit () ; Xapi_clustering.Daemon.disable ~__context ; handle_error error ) @@ -156,6 +157,7 @@ let destroy ~__context ~self = Db.Cluster.destroy ~__context ~self ; D.debug "Cluster destroyed successfully" ; set_ha_cluster_stack ~__context ; + Xapi_clustering.Watcher.signal_exit () ; Xapi_clustering.Daemon.disable ~__context (* Get pool master's cluster_host, return network of PIF *) diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index c55d789b8d9..9644ca8cd78 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -261,6 +261,7 @@ let destroy_op ~__context ~self ~force = ) ; Db.Cluster_host.destroy ~__context ~self ; debug "Cluster_host.%s was successful" fn_str ; + Xapi_clustering.Watcher.signal_exit () ; Xapi_clustering.Daemon.disable ~__context | Error error -> warn "Error occurred during Cluster_host.%s" fn_str ; @@ -361,7 +362,7 @@ let enable ~__context ~self = in (* TODO: Pass these through from CLI *) - if not !Xapi_clustering.Daemon.enabled then ( + if not (Xapi_clustering.Daemon.is_enabled ()) then ( D.debug "Cluster_host.enable: xapi-clusterd not running - attempting to start" ; Xapi_clustering.Daemon.enable ~__context diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 249efa74da1..9f21b4c43c4 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -250,7 +250,9 @@ let assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack raise Api_errors.(Server_error (cluster_stack_in_use, [cluster_stack])) module Daemon = struct - let enabled = ref false + let enabled = Atomic.make false + + let is_enabled () = Atomic.get enabled let maybe_call_script ~__context script params = match Context.get_test_clusterd_rpc __context with @@ -283,13 +285,13 @@ module Daemon = struct (internal_error, [Printf.sprintf "could not start %s" service]) ) ) ; - enabled := true ; + Atomic.set enabled true ; debug "Cluster daemon: enabled & started" let disable ~__context = let port = string_of_int !Xapi_globs.xapi_clusterd_port in debug "Disabling and stopping the clustering daemon" ; - enabled := false ; + Atomic.set enabled false ; maybe_call_script ~__context !Xapi_globs.systemctl ["disable"; service] ; maybe_call_script ~__context !Xapi_globs.systemctl ["stop"; service] ; maybe_call_script ~__context @@ -309,7 +311,7 @@ end * Instead of returning an empty URL which wouldn't work just raise an * exception. *) let rpc ~__context = - if not !Daemon.enabled then + if not (Daemon.is_enabled ()) then raise Api_errors.( Server_error @@ -427,6 +429,8 @@ let compute_corosync_max_host_failures ~__context = corosync_ha_max_hosts module Watcher = struct + module Delay = Xapi_stdext_threads.Threadext.Delay + let routine_updates = "routine updates" let on_corosync_update ~__context ~cluster updates = @@ -552,14 +556,40 @@ module Watcher = struct from corosync represents a consistent snapshot of the current cluster state. *) let stabilising_period = Mtime.Span.(5 * s) + (* The delay on which the watcher will wait. *) + let delay = Delay.make () + + let finish_watch = Atomic.make false + let cluster_stack_watcher : bool Atomic.t = Atomic.make false + (* This function exists to store the fact that the watcher should be destroyed, + to avoid the race that the cluster is destroyed, while the watcher is + still waiting/stabilising. + + There are two cases this function shall be called: 1. when the clustering + is to be disabled; 2. when this host is no longer the coordinator. For the second + case it is only necessary to do this when there is a manual designation of a new + master since in the case of ha the old coordinator would have died, and so would + this thread on the old coordinator. *) + let signal_exit () = + D.debug "%s: Signaled to exit cluster watcher" __FUNCTION__ ; + Delay.signal delay ; + (* set the cluster change watcher back to false as soon as we are signalled + to prevent any race conditions *) + Atomic.set cluster_change_watcher false ; + D.debug + "%s: watcher for cluster change exit, reset cluster_change_watcher back \ + to false" + __FUNCTION__ ; + Atomic.set finish_watch true + (* we handle unclean hosts join and leave in the watcher, i.e. hosts joining and leaving due to network problems, power cut, etc. Join and leave initiated by the API will be handled in the API call themselves, but they share the same code as the watcher. *) let watch_cluster_change ~__context ~host = - while !Daemon.enabled do + while not (Atomic.get finish_watch) do let m = Cluster_client.LocalClient.UPDATES.get (rpc ~__context) "cluster change watcher call" @@ -569,9 +599,13 @@ module Watcher = struct match find_cluster_host ~__context ~host with | Some ch -> let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in - if wait then - Thread.delay (Clock.Timer.span_to_s stabilising_period) ; - on_corosync_update ~__context ~cluster updates + if not wait then + on_corosync_update ~__context ~cluster updates + else if + wait + && Clock.Timer.span_to_s stabilising_period |> Delay.wait delay + then + on_corosync_update ~__context ~cluster updates | None -> () in @@ -591,55 +625,60 @@ module Watcher = struct | exception exn -> warn "%s: Got exception %s while query cluster host updates, retrying" __FUNCTION__ (Printexc.to_string exn) ; - Thread.delay (Clock.Timer.span_to_s cluster_change_interval) - done ; - Atomic.set cluster_change_watcher false + let _ : bool = + Clock.Timer.span_to_s cluster_change_interval |> Delay.wait delay + in + () + done let watch_cluster_stack_version ~__context ~host = - if !Daemon.enabled then - match find_cluster_host ~__context ~host with - | Some ch -> - let cluster_ref = Db.Cluster_host.get_cluster ~__context ~self:ch in - let cluster_rec = - Db.Cluster.get_record ~__context ~self:cluster_ref - in - if - Cluster_stack.of_version - ( cluster_rec.API.cluster_cluster_stack - , cluster_rec.API.cluster_cluster_stack_version - ) - = Cluster_stack.Corosync2 - then ( - debug "%s: Detected Corosync 2 running as cluster stack" - __FUNCTION__ ; - let body = - "The current cluster stack version of Corosync 2 is out of date, \ - consider updating to Corosync 3" - in - let name, priority = Api_messages.cluster_stack_out_of_date in - let host_uuid = Db.Host.get_uuid ~__context ~self:host in - - Helpers.call_api_functions ~__context (fun rpc session_id -> - let _ : [> `message] Ref.t = - Client.Client.Message.create ~rpc ~session_id ~name ~priority - ~cls:`Host ~obj_uuid:host_uuid ~body - in - () + match find_cluster_host ~__context ~host with + | Some ch -> + let cluster_ref = Db.Cluster_host.get_cluster ~__context ~self:ch in + let cluster_rec = Db.Cluster.get_record ~__context ~self:cluster_ref in + if + Cluster_stack.of_version + ( cluster_rec.API.cluster_cluster_stack + , cluster_rec.API.cluster_cluster_stack_version ) + = Cluster_stack.Corosync2 + then ( + debug "%s: Detected Corosync 2 running as cluster stack" __FUNCTION__ ; + let body = + "The current cluster stack version of Corosync 2 is out of date, \ + consider updating to Corosync 3" + in + let name, priority = Api_messages.cluster_stack_out_of_date in + let host_uuid = Db.Host.get_uuid ~__context ~self:host in + + Helpers.call_api_functions ~__context (fun rpc session_id -> + let _ : [> `message] Ref.t = + Client.Client.Message.create ~rpc ~session_id ~name ~priority + ~cls:`Host ~obj_uuid:host_uuid ~body + in + () ) - | None -> - debug "%s: No cluster host, no need to watch" __FUNCTION__ + ) else + debug + "%s: Detected Corosync 3 as cluster stack, not generating a \ + warning messsage" + __FUNCTION__ + | None -> + debug "%s: No cluster host, no need to watch" __FUNCTION__ (** [create_as_necessary] will create cluster watchers on the coordinator if they are not already created. There is no need to destroy them: once the clustering daemon is disabled, these threads will exit as well. *) let create_as_necessary ~__context ~host = - if Helpers.is_pool_master ~__context ~host then ( + let is_master = Helpers.is_pool_master ~__context ~host in + let daemon_enabled = Daemon.is_enabled () in + if is_master && daemon_enabled then ( if Xapi_cluster_helpers.cluster_health_enabled ~__context then if Atomic.compare_and_set cluster_change_watcher false true then ( debug "%s: create watcher for corosync-notifyd on coordinator" __FUNCTION__ ; + Atomic.set finish_watch false ; let _ : Thread.t = Thread.create (fun () -> watch_cluster_change ~__context ~host) () in @@ -666,5 +705,9 @@ module Watcher = struct ) else debug "%s: not create watcher for cluster stack as it already exists" __FUNCTION__ - ) + ) else + debug + "%s not create watcher because we are %b master and clustering is \ + enabled %b " + __FUNCTION__ is_master daemon_enabled end diff --git a/ocaml/xapi/xapi_clustering.mli b/ocaml/xapi/xapi_clustering.mli new file mode 100644 index 00000000000..7fceae58118 --- /dev/null +++ b/ocaml/xapi/xapi_clustering.mli @@ -0,0 +1,91 @@ +(* Copyright (C) Cloud Software Group Inc. + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +val set_ha_cluster_stack : __context:Context.t -> unit + +val with_clustering_lock : string -> (unit -> 'a) -> 'a + +val pif_of_host : + __context:Context.t -> API.ref_network -> API.ref_host -> 'a Ref.t * API.pIF_t + +val ip_of_pif : 'a Ref.t * API.pIF_t -> Cluster_interface.address + +val assert_pif_prerequisites : 'a Ref.t * API.pIF_t -> unit + +val assert_pif_attached_to : + __context:Context.t -> host:[`host] Ref.t -> pIF:[`PIF] Ref.t -> unit + +val handle_error : Cluster_interface.error -> 'a + +val assert_cluster_host_can_be_created : + __context:Context.t -> host:'a Ref.t -> unit + +val get_required_cluster_stacks : + __context:Context.t -> sr_sm_type:string -> string list + +val assert_cluster_stack_valid : cluster_stack:string -> unit + +val with_clustering_lock_if_needed : + __context:Context.t -> sr_sm_type:string -> string -> (unit -> 'a) -> 'a + +val with_clustering_lock_if_cluster_exists : + __context:Context.t -> string -> (unit -> 'a) -> 'a + +val find_cluster_host : + __context:Context.t -> host:[`host] Ref.t -> 'a Ref.t option + +val get_network_internal : + __context:Context.t -> self:[`Cluster] Ref.t -> [`network] Ref.t + +val assert_cluster_host_enabled : + __context:Context.t -> self:[`Cluster_host] Ref.t -> expected:bool -> unit + +val assert_operation_host_target_is_localhost : + __context:Context.t -> host:[`host] Ref.t -> unit + +val assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack : + __context:Context.t -> self:[`Cluster_host] Ref.t -> unit + +module Daemon : sig + val is_enabled : unit -> bool + + val enable : __context:Context.t -> unit + + val disable : __context:Context.t -> unit + + val restart : __context:Context.t -> unit +end + +val rpc : __context:Context.t -> Rpc.call -> Rpc.response Idl.IdM.t + +val maybe_switch_cluster_stack_version : + __context:Context.t + -> self:'a Ref.t + -> cluster_stack:Cluster_interface.Cluster_stack.t + -> unit + +val assert_cluster_host_is_enabled_for_matching_sms : + __context:Context.t -> host:[`host] Ref.t -> sr_sm_type:string -> unit + +val is_clustering_disabled_on_host : + __context:Context.t -> [`host] Ref.t -> bool + +val compute_corosync_max_host_failures : __context:Context.t -> int + +module Watcher : sig + val on_corosync_update : + __context:Context.t -> cluster:[`Cluster] Ref.t -> string list -> unit + + val signal_exit : unit -> unit + + val create_as_necessary : __context:Context.t -> host:[`host] Ref.t -> unit +end diff --git a/ocaml/xapi/xapi_observer_components.ml b/ocaml/xapi/xapi_observer_components.ml index 797c236b248..d3e0587b143 100644 --- a/ocaml/xapi/xapi_observer_components.ml +++ b/ocaml/xapi/xapi_observer_components.ml @@ -48,7 +48,9 @@ let all = List.map of_string Constants.observer_components_all This does mean that observer will always be enabled for clusterd. *) let startup_components () = List.filter - (function Xapi_clusterd -> !Xapi_clustering.Daemon.enabled | _ -> true) + (function + | Xapi_clusterd -> Xapi_clustering.Daemon.is_enabled () | _ -> true + ) all let assert_valid_components components = diff --git a/ocaml/xapi/xapi_pool_transition.ml b/ocaml/xapi/xapi_pool_transition.ml index 6ff8f892bd9..8f6a315f591 100644 --- a/ocaml/xapi/xapi_pool_transition.ml +++ b/ocaml/xapi/xapi_pool_transition.ml @@ -215,6 +215,8 @@ let become_another_masters_slave master_address = if Pool_role.get_role () = new_role then debug "We are already a slave of %s; nothing to do" master_address else ( + if Pool_role.is_master () then (* I am the old master *) + Xapi_clustering.Watcher.signal_exit () ; debug "Setting pool.conf to point to %s" master_address ; set_role new_role ; run_external_scripts false ; diff --git a/quality-gate.sh b/quality-gate.sh index 4c3c1da3a3a..ab741ef3445 100755 --- a/quality-gate.sh +++ b/quality-gate.sh @@ -25,7 +25,7 @@ verify-cert () { } mli-files () { - N=511 + N=510 # do not count ml files from the tests in ocaml/{tests/perftest/quicktest} MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;) MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)