diff --git a/ocaml/xapi-storage-script/main.ml b/ocaml/xapi-storage-script/main.ml index e29f4937ab..33f29b4efe 100644 --- a/ocaml/xapi-storage-script/main.ml +++ b/ocaml/xapi-storage-script/main.ml @@ -814,924 +814,1003 @@ let choose_datapath ?(persistent = true) domain response = | (script_dir, scheme, u) :: _us -> return (fork_exec_rpc ~script_dir, scheme, u, domain) -(* Bind the implementations *) -let bind ~volume_script_dir = - (* Each plugin has its own version, see the call to listen - where `process` is partially applied. *) - let module S = Storage_interface.StorageAPI (Rpc_lwt.GenServer ()) in - let version = ref None in - let volume_rpc = fork_exec_rpc ~script_dir:volume_script_dir in - let module Compat = Compat (struct let version = version end) in - let stat ~dbg ~sr ~vdi = - (* TODO add default value to sharable? *) - return_volume_rpc (fun () -> - Volume_client.stat - (volume_rpc ~dbg ~compat_out:Compat.compat_out_volume) - dbg sr vdi - ) - in - let clone ~dbg ~sr ~vdi = - return_volume_rpc (fun () -> - Volume_client.clone (volume_rpc ~dbg) dbg sr vdi - ) - in - let destroy ~dbg ~sr ~vdi = - return_volume_rpc (fun () -> - Volume_client.destroy (volume_rpc ~dbg) dbg sr vdi - ) - in - let set ~dbg ~sr ~vdi ~key ~value = - (* this is wrong, we loose the VDI type, but old pvsproxy didn't have - * Volume.set and Volume.unset *) - (* TODO handle this properly? *) - let missing = - Option.bind !version (fun v -> - if String.equal v pvs_version then Some (R.rpc_of_unit ()) else None - ) - in - return_volume_rpc (fun () -> - Volume_client.set (volume_rpc ~dbg ?missing) dbg sr vdi key value - ) - in - let unset ~dbg ~sr ~vdi ~key = - let missing = - Option.bind !version (fun v -> - if String.equal v pvs_version then Some (R.rpc_of_unit ()) else None - ) - in - return_volume_rpc (fun () -> - Volume_client.unset (volume_rpc ~dbg ?missing) dbg sr vdi key - ) - in - let update_keys ~dbg ~sr ~key ~value response = - match value with - | None -> - return response - | Some value -> - set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key ~key ~value - >>>= fun () -> - return {response with keys= (key, value) :: response.keys} - in - let vdi_attach_common dbg sr vdi domain = - Attached_SRs.find sr >>>= fun sr -> - (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>>= fun response -> - (* If we have a clone-on-boot volume then use that instead *) - ( match - List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys - with - | None -> - return response - | Some temporary -> - stat ~dbg ~sr ~vdi:temporary - ) - >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> - return_data_rpc (fun () -> Datapath_client.attach (rpc ~dbg) dbg uri domain) - in - let wrap th = Rpc_lwt.T.put th in - (* the actual API call for this plugin, sharing same version ref across all calls *) - let query_impl dbg = - let th = - return_plugin_rpc (fun () -> Plugin_client.query (volume_rpc ~dbg) dbg) - >>>= fun response -> - let required_api_version = - response.Xapi_storage.Plugin.required_api_version +let wrap = Rpc_lwt.T.put + +let volume_rpc ~volume_script_dir = fork_exec_rpc ~script_dir:volume_script_dir + +let version = ref None + +(** This module contains the metadata needed for translations to SMAPIv3 to work*) +module type META = sig + val volume_script_dir : string + + val version : string option ref + (** This field will be populated once Query.query is called when each plugin + is registered, after which it will be used in the [Compat] module, which is used + in various volume function implementations. + It is an alias to the global reference cell declared above *) +end + +module QueryImpl = +functor + (M : META) + -> + struct + let volume_rpc = volume_rpc ~volume_script_dir:M.volume_script_dir + + (* the actual API call for this plugin, sharing same version ref across all calls *) + let query_impl dbg = + let th = + return_plugin_rpc (fun () -> Plugin_client.query (volume_rpc ~dbg) dbg) + >>>= fun response -> + let required_api_version = + response.Xapi_storage.Plugin.required_api_version + in + (* the first call to a plugin must be a Query.query that sets the version *) + version := Some required_api_version ; + check_plugin_version_compatible response >>>= fun () -> + (* Convert between the xapi-storage interface and the SMAPI *) + let features = + List.map + (function "VDI_DESTROY" -> "VDI_DELETE" | x -> x) + response.Xapi_storage.Plugin.features + in + (* Look for executable scripts and automatically add capabilities *) + let rec loop acc = function + | [] -> + return acc + | (script_name, capability) :: rest -> ( + Script.path ~script_dir:M.volume_script_dir ~script_name + >>= function + | Error _ -> + loop acc rest + | Ok _ -> + loop (capability :: acc) rest + ) + in + loop [] + [ + ("SR.attach", "SR_ATTACH") + ; ("SR.create", "SR_CREATE") + ; ("SR.destroy", "SR_DELETE") + ; ("SR.detach", "SR_DETACH") + ; ("SR.ls", "SR_SCAN") + ; ("SR.stat", "SR_UPDATE") + ; ("SR.probe", "SR_PROBE") + ; ("Volume.create", "VDI_CREATE") + ; ("Volume.clone", "VDI_CLONE") + ; ("Volume.snapshot", "VDI_SNAPSHOT") + ; ("Volume.resize", "VDI_RESIZE") + ; ("Volume.destroy", "VDI_DELETE") + ; ("Volume.stat", "VDI_UPDATE") + ] + >>>= fun x -> + let features = features @ x in + (* Add the features we always have *) + let features = + features + @ [ + "VDI_ATTACH" + ; "VDI_DETACH" + ; "VDI_ACTIVATE" + ; "VDI_DEACTIVATE" + ; "VDI_INTRODUCE" + ] + in + (* If we have the ability to clone a disk then we can provide + clone on boot. *) + let features = + if List.mem "VDI_CLONE" features then + "VDI_RESET_ON_BOOT/2" :: features + else + features + in + let name = response.Xapi_storage.Plugin.name in + return + { + Storage_interface.driver= response.Xapi_storage.Plugin.plugin + ; name + ; description= response.Xapi_storage.Plugin.description + ; vendor= response.Xapi_storage.Plugin.vendor + ; copyright= response.Xapi_storage.Plugin.copyright + ; version= response.Xapi_storage.Plugin.version + ; required_api_version + ; features + ; configuration= response.Xapi_storage.Plugin.configuration + ; required_cluster_stack= + response.Xapi_storage.Plugin.required_cluster_stack + } in - (* the first call to a plugin must be a Query.query that sets the version *) - version := Some required_api_version ; - check_plugin_version_compatible response >>>= fun () -> - (* Convert between the xapi-storage interface and the SMAPI *) - let features = - List.map - (function "VDI_DESTROY" -> "VDI_DELETE" | x -> x) - response.Xapi_storage.Plugin.features + wrap th + + let query_diagnostics_impl dbg = + let th = + return_plugin_rpc (fun () -> + Plugin_client.diagnostics (volume_rpc ~dbg) dbg + ) + >>>= fun response -> return response in - (* Look for executable scripts and automatically add capabilities *) - let rec loop acc = function - | [] -> - return acc - | (script_name, capability) :: rest -> ( - Script.path ~script_dir:volume_script_dir ~script_name >>= function - | Error _ -> - loop acc rest - | Ok _ -> - loop (capability :: acc) rest - ) + wrap th + end + +module SRImpl = +functor + (M : META) + -> + struct + let volume_rpc = volume_rpc ~volume_script_dir:M.volume_script_dir + + module Compat = Compat (struct let version = M.version end) + + let sr_attach_impl dbg sr device_config = + let th = + Compat.sr_attach device_config >>>= fun compat_in -> + return_volume_rpc (fun () -> + Sr_client.attach (volume_rpc ~dbg ~compat_in) dbg device_config + ) + >>>= fun attach_response -> + (* Stat the SR to look for datasources *) + (* SR.stat should take the attached URI *) + return_volume_rpc (fun () -> + Sr_client.stat (volume_rpc ~dbg) dbg attach_response + ) + >>>= fun stat -> + let rec loop acc = function + | [] -> + Lwt.return acc + | datasource :: datasources -> ( + let uri = Uri.of_string datasource in + match Uri.scheme uri with + | Some "xeno+shm" -> ( + let uid = Uri.path_unencoded uri in + let uid = + if String.length uid > 1 then + String.sub uid 1 (String.length uid - 1) + else + uid + in + RRD.Client.Plugin.Local.register RRD.rpc uid Rrd.Five_Seconds + Rrd_interface.V2 + |> Rpc_lwt.T.get + >>= function + | Ok _ -> + loop (uid :: acc) datasources + | Error x -> + raise Rrd_interface.(Rrdd_error x) + ) + | _ -> + loop acc datasources + ) + in + loop [] stat.Xapi_storage.Control.datasources >>= fun uids -> + (* associate the 'sr' from the plugin with the SR reference passed in *) + Attached_SRs.add sr attach_response uids >>>= fun () -> return () in - loop [] - [ - ("SR.attach", "SR_ATTACH") - ; ("SR.create", "SR_CREATE") - ; ("SR.destroy", "SR_DELETE") - ; ("SR.detach", "SR_DETACH") - ; ("SR.ls", "SR_SCAN") - ; ("SR.stat", "SR_UPDATE") - ; ("SR.probe", "SR_PROBE") - ; ("Volume.create", "VDI_CREATE") - ; ("Volume.clone", "VDI_CLONE") - ; ("Volume.snapshot", "VDI_SNAPSHOT") - ; ("Volume.resize", "VDI_RESIZE") - ; ("Volume.destroy", "VDI_DELETE") - ; ("Volume.stat", "VDI_UPDATE") - ] - >>>= fun x -> - let features = features @ x in - (* Add the features we always have *) - let features = - features - @ [ - "VDI_ATTACH" - ; "VDI_DETACH" - ; "VDI_ACTIVATE" - ; "VDI_DEACTIVATE" - ; "VDI_INTRODUCE" - ] + wrap th + + let sr_detach_impl dbg sr = + let th = + Attached_SRs.find sr >>= function + | Error _ -> + (* ensure SR.detach is idempotent *) + return () + | Ok sr' -> + return_volume_rpc (fun () -> + Sr_client.detach (volume_rpc ~dbg) dbg sr' + ) + >>>= fun response -> + Attached_SRs.get_uids sr >>>= fun uids -> + let rec loop = function + | [] -> + Lwt.return_unit + | datasource :: datasources -> ( + let uri = Uri.of_string datasource in + match Uri.scheme uri with + | Some "xeno+shm" -> ( + let uid = Uri.path_unencoded uri in + let uid = + if String.length uid > 1 then + String.sub uid 1 (String.length uid - 1) + else + uid + in + RRD.Client.Plugin.Local.deregister RRD.rpc uid + |> Rpc_lwt.T.get + >>= function + | Ok _ -> + loop datasources + | Error x -> + raise Rrd_interface.(Rrdd_error x) + ) + | _ -> + loop datasources + ) + in + loop uids >>= fun () -> + Attached_SRs.remove sr >>>= fun () -> return response in - (* If we have the ability to clone a disk then we can provide - clone on boot. *) - let features = - if List.mem "VDI_CLONE" features then - "VDI_RESET_ON_BOOT/2" :: features - else - features + wrap th + + let sr_probe_impl dbg _queue device_config _sm_config = + let th = + return_volume_rpc (fun () -> + Sr_client.probe (volume_rpc ~dbg) dbg device_config + ) + >>>= fun response -> + let pp_probe_result () probe_result = + Rpcmarshal.marshal Xapi_storage.Control.typ_of_probe_result + probe_result + |> Jsonrpc.to_string + in + response + |> List.map (fun probe_result -> + let uuid = + List.assoc_opt "sr_uuid" + probe_result.Xapi_storage.Control.configuration + in + let smapiv2_probe ?sr_info () = + { + Storage_interface.configuration= probe_result.configuration + ; complete= probe_result.complete + ; sr= sr_info + ; extra_info= probe_result.extra_info + } + in + match + ( probe_result.Xapi_storage.Control.sr + , probe_result.Xapi_storage.Control.complete + , uuid + ) + with + | _, false, Some _uuid -> + Deferred.errorf + "A configuration with a uuid cannot be incomplete: %a" + pp_probe_result probe_result + | Some sr_stat, true, Some _uuid -> + let sr_info = + { + Storage_interface.name_label= + sr_stat.Xapi_storage.Control.name + ; sr_uuid= sr_stat.Xapi_storage.Control.uuid + ; name_description= + sr_stat.Xapi_storage.Control.description + ; total_space= sr_stat.Xapi_storage.Control.total_space + ; free_space= sr_stat.Xapi_storage.Control.free_space + ; clustered= sr_stat.Xapi_storage.Control.clustered + ; health= + ( match sr_stat.Xapi_storage.Control.health with + | Xapi_storage.Control.Healthy _ -> + Healthy + | Xapi_storage.Control.Recovering _ -> + Recovering + | Xapi_storage.Control.Unreachable _ -> + Unreachable + | Xapi_storage.Control.Unavailable _ -> + Unavailable + ) + } + in + return (smapiv2_probe ~sr_info ()) + | Some _sr, _, None -> + Deferred.errorf + "A configuration is not attachable without a uuid: %a" + pp_probe_result probe_result + | None, false, None -> + return (smapiv2_probe ()) + | None, true, _ -> + return (smapiv2_probe ()) + ) + |> Deferred.combine_errors + |> Lwt_result.map_error (fun err -> + backend_error "SCRIPT_FAILED" + ["SR.probe"; Base.Error.to_string_hum err] + ) + >>>= fun results -> return (Storage_interface.Probe results) in - let name = response.Xapi_storage.Plugin.name in - return - { - Storage_interface.driver= response.Xapi_storage.Plugin.plugin - ; name - ; description= response.Xapi_storage.Plugin.description - ; vendor= response.Xapi_storage.Plugin.vendor - ; copyright= response.Xapi_storage.Plugin.copyright - ; version= response.Xapi_storage.Plugin.version - ; required_api_version - ; features - ; configuration= response.Xapi_storage.Plugin.configuration - ; required_cluster_stack= - response.Xapi_storage.Plugin.required_cluster_stack - } - in - wrap th - in - S.Query.query query_impl ; - let query_diagnostics_impl dbg = - let th = - return_plugin_rpc (fun () -> - Plugin_client.diagnostics (volume_rpc ~dbg) dbg + wrap th + + let sr_create_impl dbg sr_uuid name_label description device_config _size = + let th = + let uuid = Storage_interface.Sr.string_of sr_uuid in + Compat.sr_create device_config + >>>= fun (device_config, compat_in, compat_out) -> + return_volume_rpc (fun () -> + Sr_client.create + (volume_rpc ~dbg ~compat_in ~compat_out) + dbg uuid device_config name_label description + ) + >>>= fun new_device_config -> return new_device_config + in + wrap th + + let sr_set_name_label_impl dbg sr new_name_label = + Attached_SRs.find sr + >>>= (fun sr -> + return_volume_rpc (fun () -> + Sr_client.set_name (volume_rpc ~dbg) dbg sr new_name_label + ) + ) + |> wrap + + let sr_set_name_description_impl dbg sr new_name_description = + Attached_SRs.find sr + >>>= (fun sr -> + return_volume_rpc (fun () -> + Sr_client.set_description (volume_rpc ~dbg) dbg sr + new_name_description + ) + ) + |> wrap + + let sr_destroy_impl dbg sr = + Attached_SRs.find sr + >>>= (fun sr -> + return_volume_rpc (fun () -> + Sr_client.destroy (volume_rpc ~dbg) dbg sr + ) + ) + |> wrap + + let sr_scan_impl dbg sr = + Attached_SRs.find sr + >>>= (fun sr -> + return_volume_rpc (fun () -> + Sr_client.ls + (volume_rpc ~dbg ~compat_out:Compat.compat_out_volumes) + dbg sr + ) + >>>= fun response -> + let response = Array.to_list response in + (* Filter out volumes which are clone-on-boot transients *) + let transients = + List.fold_left + (fun set x -> + match + List.assoc_opt _clone_on_boot_key + x.Xapi_storage.Control.keys + with + | None -> + set + | Some transient -> + Base.Set.add set transient + ) + (Base.Set.empty (module Base.String)) + response + in + let response = + List.filter + (fun x -> + not (Base.Set.mem transients x.Xapi_storage.Control.key) + ) + response + in + return (List.map vdi_of_volume response) + ) + |> wrap + + let sr_scan2_impl dbg sr = + let sr_uuid = Storage_interface.Sr.string_of sr in + let get_sr_info sr = + return_volume_rpc (fun () -> Sr_client.stat (volume_rpc ~dbg) dbg sr) + >>>= fun response -> + return + { + Storage_interface.sr_uuid= response.Xapi_storage.Control.uuid + ; name_label= response.Xapi_storage.Control.name + ; name_description= response.Xapi_storage.Control.description + ; total_space= response.Xapi_storage.Control.total_space + ; free_space= response.Xapi_storage.Control.free_space + ; clustered= response.Xapi_storage.Control.clustered + ; health= + ( match response.Xapi_storage.Control.health with + | Xapi_storage.Control.Healthy _ -> + Healthy + | Xapi_storage.Control.Recovering _ -> + Recovering + | Xapi_storage.Control.Unreachable _ -> + Unreachable + | Xapi_storage.Control.Unavailable _ -> + Unavailable + ) + } + in + let get_volume_info sr sr_info = + return_volume_rpc (fun () -> + Sr_client.ls + (volume_rpc ~dbg ~compat_out:Compat.compat_out_volumes) + dbg sr + ) + >>>= fun response -> + let response = Array.to_list response in + (* Filter out volumes which are clone-on-boot transients *) + let transients = + List.fold_left + (fun set x -> + match + Base.List.Assoc.find x.Xapi_storage.Control.keys + _clone_on_boot_key ~equal:String.equal + with + | None -> + set + | Some transient -> + Base.Set.add set transient + ) + (Base.Set.empty (module Base.String)) + response + in + let response = + List.filter + (fun x -> not (Base.Set.mem transients x.Xapi_storage.Control.key)) + response + in + return (List.map vdi_of_volume response, sr_info) + in + let rec stat_with_retry ?(times = 3) sr = + get_sr_info sr >>>= fun sr_info -> + match sr_info.health with + | Healthy -> + let* () = + debug (fun m -> m "%s sr %s is healthy" __FUNCTION__ sr_uuid) + in + get_volume_info sr sr_info + | Unreachable when times > 0 -> + let* () = + debug (fun m -> + m "%s: sr %s is unreachable, remaining %d retries" + __FUNCTION__ sr_uuid times + ) + in + Clock.after ~seconds:1. >>= fun () -> + stat_with_retry ~times:(times - 1) sr + | health -> + let* () = + debug (fun m -> + m "%s: sr unhealthy because it is %s" __FUNCTION__ + (Storage_interface.show_sr_health health) + ) + in + fail Storage_interface.(Errors.Sr_unhealthy (sr_uuid, health)) + in + Attached_SRs.find sr >>>= stat_with_retry |> wrap + + let sr_stat_impl dbg sr = + Attached_SRs.find sr + >>>= (fun sr -> + return_volume_rpc (fun () -> + Sr_client.stat (volume_rpc ~dbg) dbg sr + ) + >>>= fun response -> + return + { + Storage_interface.sr_uuid= response.Xapi_storage.Control.uuid + ; name_label= response.Xapi_storage.Control.name + ; name_description= response.Xapi_storage.Control.description + ; total_space= response.Xapi_storage.Control.total_space + ; free_space= response.Xapi_storage.Control.free_space + ; clustered= response.Xapi_storage.Control.clustered + ; health= + ( match response.Xapi_storage.Control.health with + | Xapi_storage.Control.Healthy _ -> + Healthy + | Xapi_storage.Control.Recovering _ -> + Recovering + | Xapi_storage.Control.Unreachable _ -> + Unreachable + | Xapi_storage.Control.Unavailable _ -> + Unavailable + ) + } + ) + |> wrap + + let sr_list _dbg = Attached_SRs.list () >>>= (fun srs -> return srs) |> wrap + + let sr_reset _ _ = return () |> wrap + end + +module VDIImpl = +functor + (M : META) + -> + struct + let volume_rpc = volume_rpc ~volume_script_dir:M.volume_script_dir + + module Compat = Compat (struct let version = M.version end) + + let set ~dbg ~sr ~vdi ~key ~value = + (* this is wrong, we loose the VDI type, but old pvsproxy didn't have + * Volume.set and Volume.unset *) + (* TODO handle this properly? *) + let missing = + Option.bind !version (fun v -> + if String.equal v pvs_version then Some (R.rpc_of_unit ()) else None + ) + in + return_volume_rpc (fun () -> + Volume_client.set (volume_rpc ~dbg ?missing) dbg sr vdi key value ) - >>>= fun response -> return response - in - wrap th - in - S.Query.diagnostics query_diagnostics_impl ; - let sr_attach_impl dbg sr device_config = - let th = - Compat.sr_attach device_config >>>= fun compat_in -> + + let unset ~dbg ~sr ~vdi ~key = + let missing = + Option.bind !version (fun v -> + if String.equal v pvs_version then Some (R.rpc_of_unit ()) else None + ) + in return_volume_rpc (fun () -> - Sr_client.attach (volume_rpc ~dbg ~compat_in) dbg device_config + Volume_client.unset (volume_rpc ~dbg ?missing) dbg sr vdi key ) - >>>= fun attach_response -> - (* Stat the SR to look for datasources *) - (* SR.stat should take the attached URI *) + + let stat ~dbg ~sr ~vdi = + (* TODO add default value to sharable? *) return_volume_rpc (fun () -> - Sr_client.stat (volume_rpc ~dbg) dbg attach_response + Volume_client.stat + (volume_rpc ~dbg ~compat_out:Compat.compat_out_volume) + dbg sr vdi ) - >>>= fun stat -> - let rec loop acc = function - | [] -> - Lwt.return acc - | datasource :: datasources -> ( - let uri = Uri.of_string datasource in - match Uri.scheme uri with - | Some "xeno+shm" -> ( - let uid = Uri.path_unencoded uri in - let uid = - if String.length uid > 1 then - String.sub uid 1 (String.length uid - 1) - else - uid - in - RRD.Client.Plugin.Local.register RRD.rpc uid Rrd.Five_Seconds - Rrd_interface.V2 - |> Rpc_lwt.T.get - >>= function - | Ok _ -> - loop (uid :: acc) datasources - | Error x -> - raise Rrd_interface.(Rrdd_error x) - ) - | _ -> - loop acc datasources - ) - in - loop [] stat.Xapi_storage.Control.datasources >>= fun uids -> - (* associate the 'sr' from the plugin with the SR reference passed in *) - Attached_SRs.add sr attach_response uids >>>= fun () -> return () - in - wrap th - in - S.SR.attach sr_attach_impl ; - let sr_detach_impl dbg sr = - let th = - Attached_SRs.find sr >>= function - | Error _ -> - (* ensure SR.detach is idempotent *) - return () - | Ok sr' -> - return_volume_rpc (fun () -> - Sr_client.detach (volume_rpc ~dbg) dbg sr' - ) - >>>= fun response -> - Attached_SRs.get_uids sr >>>= fun uids -> - let rec loop = function - | [] -> - Lwt.return_unit - | datasource :: datasources -> ( - let uri = Uri.of_string datasource in - match Uri.scheme uri with - | Some "xeno+shm" -> ( - let uid = Uri.path_unencoded uri in - let uid = - if String.length uid > 1 then - String.sub uid 1 (String.length uid - 1) - else - uid - in - RRD.Client.Plugin.Local.deregister RRD.rpc uid - |> Rpc_lwt.T.get - >>= function - | Ok _ -> - loop datasources - | Error x -> - raise Rrd_interface.(Rrdd_error x) - ) - | _ -> - loop datasources - ) - in - loop uids >>= fun () -> - Attached_SRs.remove sr >>>= fun () -> return response - in - wrap th - in - S.SR.detach sr_detach_impl ; - let sr_probe_impl dbg _queue device_config _sm_config = - let th = + + let update_keys ~dbg ~sr ~key ~value response = + match value with + | None -> + return response + | Some value -> + set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key ~key ~value + >>>= fun () -> + return {response with keys= (key, value) :: response.keys} + + let clone ~dbg ~sr ~vdi = + return_volume_rpc (fun () -> + Volume_client.clone (volume_rpc ~dbg) dbg sr vdi + ) + + let destroy ~dbg ~sr ~vdi = return_volume_rpc (fun () -> - Sr_client.probe (volume_rpc ~dbg) dbg device_config + Volume_client.destroy (volume_rpc ~dbg) dbg sr vdi + ) + + let vdi_attach_common dbg sr vdi domain = + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + stat ~dbg ~sr ~vdi >>>= fun response -> + (* If we have a clone-on-boot volume then use that instead *) + ( match + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys + with + | None -> + return response + | Some temporary -> + stat ~dbg ~sr ~vdi:temporary ) >>>= fun response -> - let pp_probe_result () probe_result = - Rpcmarshal.marshal Xapi_storage.Control.typ_of_probe_result probe_result - |> Jsonrpc.to_string - in - response - |> List.map (fun probe_result -> - let uuid = - List.assoc_opt "sr_uuid" - probe_result.Xapi_storage.Control.configuration + choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + return_data_rpc (fun () -> + Datapath_client.attach (rpc ~dbg) dbg uri domain + ) + + let vdi_create_impl dbg sr (vdi_info : Storage_interface.vdi_info) = + Attached_SRs.find sr + >>>= (fun sr -> + return_volume_rpc (fun () -> + Volume_client.create + (volume_rpc ~dbg ~compat_out:Compat.compat_out_volume) + dbg sr vdi_info.Storage_interface.name_label + vdi_info.name_description vdi_info.virtual_size + vdi_info.sharable + ) + >>>= update_keys ~dbg ~sr ~key:_vdi_type_key + ~value:(match vdi_info.ty with "" -> None | s -> Some s) + >>>= fun response -> return (vdi_of_volume response) + ) + |> wrap + + let vdi_destroy_impl dbg sr vdi' = + (let vdi = Storage_interface.Vdi.string_of vdi' in + Attached_SRs.find sr >>>= fun sr -> + stat ~dbg ~sr ~vdi >>>= fun response -> + (* Destroy any clone-on-boot volume that might exist *) + ( match + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys + with + | None -> + return () + | Some _temporary -> + (* Destroy the temporary disk we made earlier *) + destroy ~dbg ~sr ~vdi + ) + >>>= fun () -> destroy ~dbg ~sr ~vdi + ) + |> wrap + + let vdi_snapshot_impl dbg sr vdi_info = + Attached_SRs.find sr + >>>= (fun sr -> + let vdi = + Storage_interface.Vdi.string_of vdi_info.Storage_interface.vdi in - let smapiv2_probe ?sr_info () = + return_volume_rpc (fun () -> + Volume_client.snapshot (volume_rpc ~dbg) dbg sr vdi + ) + >>>= fun response -> + let now = Xapi_stdext_date.Date.(to_rfc3339 (now ())) in + set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key + ~key:_snapshot_time_key ~value:now + >>>= fun () -> + set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key + ~key:_is_a_snapshot_key ~value:(string_of_bool true) + >>>= fun () -> + set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key + ~key:_snapshot_of_key ~value:vdi + >>>= fun () -> + let response = { - Storage_interface.configuration= probe_result.configuration - ; complete= probe_result.complete - ; sr= sr_info - ; extra_info= probe_result.extra_info + (vdi_of_volume response) with + snapshot_time= now + ; is_a_snapshot= true + ; snapshot_of= Storage_interface.Vdi.of_string vdi } in - match - ( probe_result.Xapi_storage.Control.sr - , probe_result.Xapi_storage.Control.complete - , uuid - ) - with - | _, false, Some _uuid -> - Deferred.errorf - "A configuration with a uuid cannot be incomplete: %a" - pp_probe_result probe_result - | Some sr_stat, true, Some _uuid -> - let sr_info = - { - Storage_interface.name_label= - sr_stat.Xapi_storage.Control.name - ; sr_uuid= sr_stat.Xapi_storage.Control.uuid - ; name_description= sr_stat.Xapi_storage.Control.description - ; total_space= sr_stat.Xapi_storage.Control.total_space - ; free_space= sr_stat.Xapi_storage.Control.free_space - ; clustered= sr_stat.Xapi_storage.Control.clustered - ; health= - ( match sr_stat.Xapi_storage.Control.health with - | Xapi_storage.Control.Healthy _ -> - Healthy - | Xapi_storage.Control.Recovering _ -> - Recovering - | Xapi_storage.Control.Unreachable _ -> - Unreachable - | Xapi_storage.Control.Unavailable _ -> - Unavailable - ) - } - in - return (smapiv2_probe ~sr_info ()) - | Some _sr, _, None -> - Deferred.errorf - "A configuration is not attachable without a uuid: %a" - pp_probe_result probe_result - | None, false, None -> - return (smapiv2_probe ()) - | None, true, _ -> - return (smapiv2_probe ()) - ) - |> Deferred.combine_errors - |> Lwt_result.map_error (fun err -> - backend_error "SCRIPT_FAILED" - ["SR.probe"; Base.Error.to_string_hum err] - ) - >>>= fun results -> return (Storage_interface.Probe results) - in - wrap th - in - S.SR.probe sr_probe_impl ; - let sr_create_impl dbg sr_uuid name_label description device_config _size = - let th = - let uuid = Storage_interface.Sr.string_of sr_uuid in - Compat.sr_create device_config - >>>= fun (device_config, compat_in, compat_out) -> - return_volume_rpc (fun () -> - Sr_client.create - (volume_rpc ~dbg ~compat_in ~compat_out) - dbg uuid device_config name_label description - ) - >>>= fun new_device_config -> return new_device_config - in - wrap th - in - S.SR.create sr_create_impl ; - let sr_set_name_label_impl dbg sr new_name_label = - Attached_SRs.find sr - >>>= (fun sr -> - return_volume_rpc (fun () -> - Sr_client.set_name (volume_rpc ~dbg) dbg sr new_name_label + return response ) - ) - |> wrap - in - S.SR.set_name_label sr_set_name_label_impl ; - let sr_set_name_description_impl dbg sr new_name_description = - Attached_SRs.find sr - >>>= (fun sr -> - return_volume_rpc (fun () -> - Sr_client.set_description (volume_rpc ~dbg) dbg sr - new_name_description - ) - ) - |> wrap - in - S.SR.set_name_description sr_set_name_description_impl ; - let sr_destroy_impl dbg sr = - Attached_SRs.find sr - >>>= (fun sr -> - return_volume_rpc (fun () -> - Sr_client.destroy (volume_rpc ~dbg) dbg sr + |> wrap + + let vdi_clone_impl dbg sr vdi_info = + Attached_SRs.find sr + >>>= (fun sr -> + clone ~dbg ~sr + ~vdi: + (Storage_interface.Vdi.string_of vdi_info.Storage_interface.vdi) + >>>= fun response -> return (vdi_of_volume response) ) - ) - |> wrap - in - S.SR.destroy sr_destroy_impl ; - let sr_scan_impl dbg sr = - Attached_SRs.find sr - >>>= (fun sr -> - return_volume_rpc (fun () -> - Sr_client.ls - (volume_rpc ~dbg ~compat_out:Compat.compat_out_volumes) - dbg sr - ) - >>>= fun response -> - let response = Array.to_list response in - (* Filter out volumes which are clone-on-boot transients *) - let transients = - List.fold_left - (fun set x -> - match - List.assoc_opt _clone_on_boot_key x.Xapi_storage.Control.keys - with - | None -> - set - | Some transient -> - Base.Set.add set transient - ) - (Base.Set.empty (module Base.String)) - response - in - let response = - List.filter - (fun x -> - not (Base.Set.mem transients x.Xapi_storage.Control.key) - ) - response - in - return (List.map vdi_of_volume response) - ) - |> wrap - in - S.SR.scan sr_scan_impl ; - let sr_scan2_impl dbg sr = - let sr_uuid = Storage_interface.Sr.string_of sr in - let get_sr_info sr = - return_volume_rpc (fun () -> Sr_client.stat (volume_rpc ~dbg) dbg sr) - >>>= fun response -> - return - { - Storage_interface.sr_uuid= response.Xapi_storage.Control.uuid - ; name_label= response.Xapi_storage.Control.name - ; name_description= response.Xapi_storage.Control.description - ; total_space= response.Xapi_storage.Control.total_space - ; free_space= response.Xapi_storage.Control.free_space - ; clustered= response.Xapi_storage.Control.clustered - ; health= - ( match response.Xapi_storage.Control.health with - | Xapi_storage.Control.Healthy _ -> - Healthy - | Xapi_storage.Control.Recovering _ -> - Recovering - | Xapi_storage.Control.Unreachable _ -> - Unreachable - | Xapi_storage.Control.Unavailable _ -> - Unavailable - ) - } - in - let get_volume_info sr sr_info = - return_volume_rpc (fun () -> - Sr_client.ls - (volume_rpc ~dbg ~compat_out:Compat.compat_out_volumes) - dbg sr + |> wrap + + let vdi_set_name_label_impl dbg sr vdi' new_name_label = + (let vdi = Storage_interface.Vdi.string_of vdi' in + Attached_SRs.find sr >>>= fun sr -> + return_volume_rpc (fun () -> + Volume_client.set_name (volume_rpc ~dbg) dbg sr vdi new_name_label + ) ) - >>>= fun response -> - let response = Array.to_list response in - (* Filter out volumes which are clone-on-boot transients *) - let transients = - List.fold_left - (fun set x -> - match - Base.List.Assoc.find x.Xapi_storage.Control.keys - _clone_on_boot_key ~equal:String.equal - with - | None -> - set - | Some transient -> - Base.Set.add set transient - ) - (Base.Set.empty (module Base.String)) - response - in - let response = - List.filter - (fun x -> not (Base.Set.mem transients x.Xapi_storage.Control.key)) - response - in - return (List.map vdi_of_volume response, sr_info) - in - let rec stat_with_retry ?(times = 3) sr = - get_sr_info sr >>>= fun sr_info -> - match sr_info.health with - | Healthy -> - let* () = - debug (fun m -> m "%s sr %s is healthy" __FUNCTION__ sr_uuid) - in - get_volume_info sr sr_info - | Unreachable when times > 0 -> - let* () = - debug (fun m -> - m "%s: sr %s is unreachable, remaining %d retries" __FUNCTION__ - sr_uuid times - ) - in - Clock.after ~seconds:1. >>= fun () -> - stat_with_retry ~times:(times - 1) sr - | health -> - let* () = - debug (fun m -> - m "%s: sr unhealthy because it is %s" __FUNCTION__ - (Storage_interface.show_sr_health health) - ) - in - fail Storage_interface.(Errors.Sr_unhealthy (sr_uuid, health)) - in - Attached_SRs.find sr >>>= stat_with_retry |> wrap - in - S.SR.scan2 sr_scan2_impl ; - let vdi_create_impl dbg sr (vdi_info : Storage_interface.vdi_info) = - Attached_SRs.find sr - >>>= (fun sr -> - return_volume_rpc (fun () -> - Volume_client.create - (volume_rpc ~dbg ~compat_out:Compat.compat_out_volume) - dbg sr vdi_info.Storage_interface.name_label - vdi_info.name_description vdi_info.virtual_size - vdi_info.sharable - ) - >>>= update_keys ~dbg ~sr ~key:_vdi_type_key - ~value:(match vdi_info.ty with "" -> None | s -> Some s) - >>>= fun response -> return (vdi_of_volume response) - ) - |> wrap - in - S.VDI.create vdi_create_impl ; - let vdi_destroy_impl dbg sr vdi' = - (let vdi = Storage_interface.Vdi.string_of vdi' in - Attached_SRs.find sr >>>= fun sr -> - stat ~dbg ~sr ~vdi >>>= fun response -> - (* Destroy any clone-on-boot volume that might exist *) - ( match - List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys - with - | None -> - return () - | Some _temporary -> - (* Destroy the temporary disk we made earlier *) - destroy ~dbg ~sr ~vdi - ) - >>>= fun () -> destroy ~dbg ~sr ~vdi - ) - |> wrap - in - S.VDI.destroy vdi_destroy_impl ; - let vdi_snapshot_impl dbg sr vdi_info = - Attached_SRs.find sr - >>>= (fun sr -> - let vdi = - Storage_interface.Vdi.string_of vdi_info.Storage_interface.vdi - in - return_volume_rpc (fun () -> - Volume_client.snapshot (volume_rpc ~dbg) dbg sr vdi + |> wrap + + let vdi_set_name_description_impl dbg sr vdi' new_name_description = + (let vdi = Storage_interface.Vdi.string_of vdi' in + Attached_SRs.find sr >>>= fun sr -> + return_volume_rpc (fun () -> + Volume_client.set_description (volume_rpc ~dbg) dbg sr vdi + new_name_description + ) + ) + |> wrap + + let vdi_resize_impl dbg sr vdi' new_size = + (let vdi = Storage_interface.Vdi.string_of vdi' in + Attached_SRs.find sr >>>= fun sr -> + return_volume_rpc (fun () -> + Volume_client.resize (volume_rpc ~dbg) dbg sr vdi new_size + ) + >>>= fun () -> + (* Now call Volume.stat to discover the size *) + stat ~dbg ~sr ~vdi >>>= fun response -> + return response.Xapi_storage.Control.virtual_size + ) + |> wrap + + let vdi_stat_impl dbg sr vdi' = + (let vdi = Storage_interface.Vdi.string_of vdi' in + Attached_SRs.find sr >>>= fun sr -> + stat ~dbg ~sr ~vdi >>>= fun response -> return (vdi_of_volume response) + ) + |> wrap + + let vdi_introduce_impl dbg sr _uuid _sm_config location = + Attached_SRs.find sr + >>>= (fun sr -> + let vdi = location in + stat ~dbg ~sr ~vdi >>>= fun response -> + return (vdi_of_volume response) ) - >>>= fun response -> - let now = Xapi_stdext_date.Date.(to_rfc3339 (now ())) in - set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key - ~key:_snapshot_time_key ~value:now - >>>= fun () -> - set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key - ~key:_is_a_snapshot_key ~value:(string_of_bool true) - >>>= fun () -> - set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key - ~key:_snapshot_of_key ~value:vdi - >>>= fun () -> - let response = - { - (vdi_of_volume response) with - snapshot_time= now - ; is_a_snapshot= true - ; snapshot_of= Storage_interface.Vdi.of_string vdi - } - in + |> wrap + + let vdi_attach3_impl dbg dp sr vdi' vm' _readwrite = + (let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = domain_of ~dp ~vm' in + vdi_attach_common dbg sr vdi domain >>>= fun response -> + let convert_implementation = function + | Xapi_storage.Data.XenDisk {params; extra; backend_type} -> + Storage_interface.XenDisk {params; extra; backend_type} + | BlockDevice {path} -> + BlockDevice {path} + | File {path} -> + File {path} + | Nbd {uri} -> + Nbd {uri} + in + return + { + Storage_interface.implementations= + List.map convert_implementation + response.Xapi_storage.Data.implementations + } + ) + |> wrap + + let vdi_activate_common dbg dp sr vdi' vm' readonly = + (let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = domain_of ~dp ~vm' in + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + stat ~dbg ~sr ~vdi >>>= fun response -> + (* If we have a clone-on-boot volume then use that instead *) + ( match + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys + with + | None -> return response - ) - |> wrap - in - S.VDI.snapshot vdi_snapshot_impl ; - let vdi_clone_impl dbg sr vdi_info = - Attached_SRs.find sr - >>>= (fun sr -> - clone ~dbg ~sr - ~vdi: - (Storage_interface.Vdi.string_of vdi_info.Storage_interface.vdi) - >>>= fun response -> return (vdi_of_volume response) - ) - |> wrap - in - S.VDI.clone vdi_clone_impl ; - let vdi_set_name_label_impl dbg sr vdi' new_name_label = - (let vdi = Storage_interface.Vdi.string_of vdi' in - Attached_SRs.find sr >>>= fun sr -> - return_volume_rpc (fun () -> - Volume_client.set_name (volume_rpc ~dbg) dbg sr vdi new_name_label - ) - ) - |> wrap - in - S.VDI.set_name_label vdi_set_name_label_impl ; - let vdi_set_name_description_impl dbg sr vdi' new_name_description = - (let vdi = Storage_interface.Vdi.string_of vdi' in - Attached_SRs.find sr >>>= fun sr -> - return_volume_rpc (fun () -> - Volume_client.set_description (volume_rpc ~dbg) dbg sr vdi - new_name_description - ) - ) - |> wrap - in - S.VDI.set_name_description vdi_set_name_description_impl ; - let vdi_resize_impl dbg sr vdi' new_size = - (let vdi = Storage_interface.Vdi.string_of vdi' in - Attached_SRs.find sr >>>= fun sr -> - return_volume_rpc (fun () -> - Volume_client.resize (volume_rpc ~dbg) dbg sr vdi new_size - ) - >>>= fun () -> - (* Now call Volume.stat to discover the size *) - stat ~dbg ~sr ~vdi >>>= fun response -> - return response.Xapi_storage.Control.virtual_size - ) - |> wrap - in - S.VDI.resize vdi_resize_impl ; - let vdi_stat_impl dbg sr vdi' = - (let vdi = Storage_interface.Vdi.string_of vdi' in - Attached_SRs.find sr >>>= fun sr -> - stat ~dbg ~sr ~vdi >>>= fun response -> return (vdi_of_volume response) - ) - |> wrap - in - S.VDI.stat vdi_stat_impl ; - let vdi_introduce_impl dbg sr _uuid _sm_config location = - Attached_SRs.find sr - >>>= (fun sr -> - let vdi = location in - stat ~dbg ~sr ~vdi >>>= fun response -> - return (vdi_of_volume response) - ) - |> wrap - in - S.VDI.introduce vdi_introduce_impl ; - let vdi_attach3_impl dbg dp sr vdi' vm' _readwrite = - (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in - vdi_attach_common dbg sr vdi domain >>>= fun response -> - let convert_implementation = function - | Xapi_storage.Data.XenDisk {params; extra; backend_type} -> - Storage_interface.XenDisk {params; extra; backend_type} - | BlockDevice {path} -> - BlockDevice {path} - | File {path} -> - File {path} - | Nbd {uri} -> - Nbd {uri} - in - return - { - Storage_interface.implementations= - List.map convert_implementation - response.Xapi_storage.Data.implementations - } - ) - |> wrap - in - S.VDI.attach3 vdi_attach3_impl ; - let vdi_activate_common dbg dp sr vdi' vm' readonly = - (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in - Attached_SRs.find sr >>>= fun sr -> - (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>>= fun response -> - (* If we have a clone-on-boot volume then use that instead *) - ( match - List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys - with - | None -> - return response - | Some temporary -> - stat ~dbg ~sr ~vdi:temporary - ) - >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> - return_data_rpc (fun () -> - let rpc = rpc ~dbg in - if readonly then - Datapath_client.activate_readonly rpc dbg uri domain - else - Datapath_client.activate rpc dbg uri domain - ) - ) - |> wrap - in - let vdi_activate3_impl dbg dp sr vdi' vm' = - vdi_activate_common dbg dp sr vdi' vm' false - in - S.VDI.activate3 vdi_activate3_impl ; - let vdi_activate_readonly_impl dbg dp sr vdi' vm' = - vdi_activate_common dbg dp sr vdi' vm' true - in - S.VDI.activate_readonly vdi_activate_readonly_impl ; - let vdi_deactivate_impl dbg dp sr vdi' vm' = - (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in - Attached_SRs.find sr >>>= fun sr -> - (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>>= fun response -> - ( match - List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys - with - | None -> - return response - | Some temporary -> - stat ~dbg ~sr ~vdi:temporary - ) - >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> - return_data_rpc (fun () -> - Datapath_client.deactivate (rpc ~dbg) dbg uri domain - ) - ) - |> wrap - in - S.VDI.deactivate vdi_deactivate_impl ; - let vdi_detach_impl dbg dp sr vdi' vm' = - (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in - Attached_SRs.find sr >>>= fun sr -> - (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>>= fun response -> - ( match - List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys - with - | None -> - return response - | Some temporary -> - stat ~dbg ~sr ~vdi:temporary - ) - >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> - return_data_rpc (fun () -> Datapath_client.detach (rpc ~dbg) dbg uri domain) - ) - |> wrap - in - S.VDI.detach vdi_detach_impl ; - let sr_stat_impl dbg sr = - Attached_SRs.find sr - >>>= (fun sr -> - return_volume_rpc (fun () -> Sr_client.stat (volume_rpc ~dbg) dbg sr) - >>>= fun response -> - return - { - Storage_interface.sr_uuid= response.Xapi_storage.Control.uuid - ; name_label= response.Xapi_storage.Control.name - ; name_description= response.Xapi_storage.Control.description - ; total_space= response.Xapi_storage.Control.total_space - ; free_space= response.Xapi_storage.Control.free_space - ; clustered= response.Xapi_storage.Control.clustered - ; health= - ( match response.Xapi_storage.Control.health with - | Xapi_storage.Control.Healthy _ -> - Healthy - | Xapi_storage.Control.Recovering _ -> - Recovering - | Xapi_storage.Control.Unreachable _ -> - Unreachable - | Xapi_storage.Control.Unavailable _ -> - Unavailable - ) - } - ) - |> wrap - in - S.SR.stat sr_stat_impl ; - let vdi_epoch_begin_impl dbg sr vdi' vm' persistent = - (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = Storage_interface.Vm.string_of vm' in - Attached_SRs.find sr >>>= fun sr -> - (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>>= fun response -> - choose_datapath ~persistent domain response - >>>= fun (rpc, datapath, uri, _domain) -> - (* If non-persistent and the datapath plugin supports NONPERSISTENT - then we delegate this to the datapath plugin. Otherwise we will - make a temporary clone now and attach/detach etc this file. *) - if Datapath_plugins.supports_feature datapath _nonpersistent then - (* We delegate handling non-persistent disks to the datapath plugin. *) + | Some temporary -> + stat ~dbg ~sr ~vdi:temporary + ) + >>>= fun response -> + choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> return_data_rpc (fun () -> - Datapath_client.open_ (rpc ~dbg) dbg uri persistent + let rpc = rpc ~dbg in + if readonly then + Datapath_client.activate_readonly rpc dbg uri domain + else + Datapath_client.activate rpc dbg uri domain ) - else if not persistent then - (* We create a non-persistent disk here with Volume.clone, and store - the name of the cloned disk in the metadata of the original. *) + ) + |> wrap + + let vdi_activate3_impl dbg dp sr vdi' vm' = + vdi_activate_common dbg dp sr vdi' vm' false + + let vdi_activate_readonly_impl dbg dp sr vdi' vm' = + vdi_activate_common dbg dp sr vdi' vm' true + + let vdi_deactivate_impl dbg dp sr vdi' vm' = + (let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = domain_of ~dp ~vm' in + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + stat ~dbg ~sr ~vdi >>>= fun response -> ( match List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - return () + return response | Some temporary -> - (* Destroy the temporary disk we made earlier *) - destroy ~dbg ~sr ~vdi:temporary + stat ~dbg ~sr ~vdi:temporary ) - >>>= fun () -> - clone ~dbg ~sr ~vdi >>>= fun vdi' -> - set ~dbg ~sr ~vdi ~key:_clone_on_boot_key - ~value:vdi'.Xapi_storage.Control.key - else - return () - ) - |> wrap - in - S.VDI.epoch_begin vdi_epoch_begin_impl ; - let vdi_epoch_end_impl dbg sr vdi' vm' = - (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = Storage_interface.Vm.string_of vm' in - Attached_SRs.find sr >>>= fun sr -> - (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, datapath, uri, _domain) -> - if Datapath_plugins.supports_feature datapath _nonpersistent then - return_data_rpc (fun () -> Datapath_client.close (rpc ~dbg) dbg uri) - else - match - List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys - with + >>>= fun response -> + choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + return_data_rpc (fun () -> + Datapath_client.deactivate (rpc ~dbg) dbg uri domain + ) + ) + |> wrap + + let vdi_detach_impl dbg dp sr vdi' vm' = + (let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = domain_of ~dp ~vm' in + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + stat ~dbg ~sr ~vdi >>>= fun response -> + ( match + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys + with | None -> - return () + return response | Some temporary -> - (* Destroy the temporary disk we made earlier *) - destroy ~dbg ~sr ~vdi:temporary >>>= fun () -> - unset ~dbg ~sr ~vdi ~key:_clone_on_boot_key >>>= fun () -> return () - ) - |> wrap - in - S.VDI.epoch_end vdi_epoch_end_impl ; - let vdi_set_persistent_impl _dbg _sr _vdi _persistent = return () |> wrap in - S.VDI.set_persistent vdi_set_persistent_impl ; - let dp_destroy2 dbg dp sr vdi' vm' _allow_leak = - (let vdi = Storage_interface.Vdi.string_of vdi' in - let domain = domain_of ~dp ~vm' in - Attached_SRs.find sr >>>= fun sr -> - (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>>= fun response -> - ( match - List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys - with - | None -> - return response - | Some temporary -> - stat ~dbg ~sr ~vdi:temporary - ) - >>>= fun response -> - choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> - return_data_rpc (fun () -> - Datapath_client.deactivate (rpc ~dbg) dbg uri domain - ) - >>>= fun () -> - return_data_rpc (fun () -> Datapath_client.detach (rpc ~dbg) dbg uri domain) - ) - |> wrap - in - S.DP.destroy2 dp_destroy2 ; - let sr_list _dbg = - Attached_SRs.list () >>>= (fun srs -> return srs) |> wrap - in - S.SR.list sr_list ; - (* SR.reset is a no op in SMAPIv3 *) - S.SR.reset (fun _ _ -> return () |> wrap) ; - let ( let* ) = Lwt_result.bind in - let vdi_enable_cbt_impl dbg sr vdi = - wrap - @@ - let* sr = Attached_SRs.find sr in - let vdi = Storage_interface.Vdi.string_of vdi in - return_volume_rpc (fun () -> - Volume_client.enable_cbt (volume_rpc ~dbg) dbg sr vdi - ) - in - S.VDI.enable_cbt vdi_enable_cbt_impl ; - let vdi_disable_cbt_impl dbg sr vdi = - wrap - @@ - let* sr = Attached_SRs.find sr in - let vdi = Storage_interface.Vdi.string_of vdi in - return_volume_rpc (fun () -> - Volume_client.disable_cbt (volume_rpc ~dbg) dbg sr vdi - ) - in - S.VDI.disable_cbt vdi_disable_cbt_impl ; - let vdi_list_changed_blocks_impl dbg sr vdi vdi' = - wrap - @@ - let* sr = Attached_SRs.find sr in - let vdi, vdi' = Storage_interface.Vdi.(string_of vdi, string_of vdi') in - let ( let* ) = Lwt.bind in - let* result = + stat ~dbg ~sr ~vdi:temporary + ) + >>>= fun response -> + choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + return_data_rpc (fun () -> + Datapath_client.detach (rpc ~dbg) dbg uri domain + ) + ) + |> wrap + + let vdi_epoch_begin_impl dbg sr vdi' vm' persistent = + (let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = Storage_interface.Vm.string_of vm' in + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + stat ~dbg ~sr ~vdi >>>= fun response -> + choose_datapath ~persistent domain response + >>>= fun (rpc, datapath, uri, _domain) -> + (* If non-persistent and the datapath plugin supports NONPERSISTENT + then we delegate this to the datapath plugin. Otherwise we will + make a temporary clone now and attach/detach etc this file. *) + if Datapath_plugins.supports_feature datapath _nonpersistent then + (* We delegate handling non-persistent disks to the datapath plugin. *) + return_data_rpc (fun () -> + Datapath_client.open_ (rpc ~dbg) dbg uri persistent + ) + else if not persistent then + (* We create a non-persistent disk here with Volume.clone, and store + the name of the cloned disk in the metadata of the original. *) + ( match + List.assoc_opt _clone_on_boot_key + response.Xapi_storage.Control.keys + with + | None -> + return () + | Some temporary -> + (* Destroy the temporary disk we made earlier *) + destroy ~dbg ~sr ~vdi:temporary + ) + >>>= fun () -> + clone ~dbg ~sr ~vdi >>>= fun vdi' -> + set ~dbg ~sr ~vdi ~key:_clone_on_boot_key + ~value:vdi'.Xapi_storage.Control.key + else + return () + ) + |> wrap + + let vdi_epoch_end_impl dbg sr vdi' vm' = + (let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = Storage_interface.Vm.string_of vm' in + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + stat ~dbg ~sr ~vdi >>>= fun response -> + choose_datapath domain response >>>= fun (rpc, datapath, uri, _domain) -> + if Datapath_plugins.supports_feature datapath _nonpersistent then + return_data_rpc (fun () -> Datapath_client.close (rpc ~dbg) dbg uri) + else + match + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys + with + | None -> + return () + | Some temporary -> + (* Destroy the temporary disk we made earlier *) + destroy ~dbg ~sr ~vdi:temporary >>>= fun () -> + unset ~dbg ~sr ~vdi ~key:_clone_on_boot_key >>>= fun () -> + return () + ) + |> wrap + + let vdi_set_persistent_impl _dbg _sr _vdi _persistent = return () |> wrap + + let ( let* ) = Lwt_result.bind + + let vdi_enable_cbt_impl dbg sr vdi = + wrap + @@ + let* sr = Attached_SRs.find sr in + let vdi = Storage_interface.Vdi.string_of vdi in return_volume_rpc (fun () -> - (* Negative lengths indicate that we want the full length. *) - Volume_client.list_changed_blocks (volume_rpc ~dbg) dbg sr vdi vdi' 0L - (-1) + Volume_client.enable_cbt (volume_rpc ~dbg) dbg sr vdi ) - in - let proj_bitmap r = r.Xapi_storage.Control.bitmap in - Lwt.return (Result.map proj_bitmap result) - in - S.VDI.list_changed_blocks vdi_list_changed_blocks_impl ; - let vdi_data_destroy_impl dbg sr vdi = - wrap - @@ - let* sr = Attached_SRs.find sr in - let vdi = Storage_interface.Vdi.string_of vdi in - let* () = + + let vdi_disable_cbt_impl dbg sr vdi = + wrap + @@ + let* sr = Attached_SRs.find sr in + let vdi = Storage_interface.Vdi.string_of vdi in return_volume_rpc (fun () -> - Volume_client.data_destroy (volume_rpc ~dbg) dbg sr vdi + Volume_client.disable_cbt (volume_rpc ~dbg) dbg sr vdi ) - in - set ~dbg ~sr ~vdi ~key:_vdi_type_key ~value:"cbt_metadata" - in - S.VDI.data_destroy vdi_data_destroy_impl ; + + let vdi_list_changed_blocks_impl dbg sr vdi vdi' = + wrap + @@ + let* sr = Attached_SRs.find sr in + let vdi, vdi' = Storage_interface.Vdi.(string_of vdi, string_of vdi') in + let ( let* ) = Lwt.bind in + let* result = + return_volume_rpc (fun () -> + (* Negative lengths indicate that we want the full length. *) + Volume_client.list_changed_blocks (volume_rpc ~dbg) dbg sr vdi vdi' + 0L (-1) + ) + in + let proj_bitmap r = r.Xapi_storage.Control.bitmap in + Lwt.return (Result.map proj_bitmap result) + + let vdi_data_destroy_impl dbg sr vdi = + wrap + @@ + let* sr = Attached_SRs.find sr in + let vdi = Storage_interface.Vdi.string_of vdi in + let* () = + return_volume_rpc (fun () -> + Volume_client.data_destroy (volume_rpc ~dbg) dbg sr vdi + ) + in + set ~dbg ~sr ~vdi ~key:_vdi_type_key ~value:"cbt_metadata" + end + +module DPImpl = +functor + (M : META) + -> + struct + module VDI = VDIImpl (M) + + let dp_destroy2 dbg dp sr vdi' vm' _allow_leak = + (let vdi = Storage_interface.Vdi.string_of vdi' in + let domain = domain_of ~dp ~vm' in + Attached_SRs.find sr >>>= fun sr -> + (* Discover the URIs using Volume.stat *) + VDI.stat ~dbg ~sr ~vdi >>>= fun response -> + ( match + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys + with + | None -> + return response + | Some temporary -> + VDI.stat ~dbg ~sr ~vdi:temporary + ) + >>>= fun response -> + choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> + return_data_rpc (fun () -> + Datapath_client.deactivate (rpc ~dbg) dbg uri domain + ) + >>>= fun () -> + return_data_rpc (fun () -> + Datapath_client.detach (rpc ~dbg) dbg uri domain + ) + ) + |> wrap + end + +(* Bind the implementations *) +let bind ~volume_script_dir = + (* Each plugin has its own version, see the call to listen + where `process` is partially applied. *) + let module S = Storage_interface.StorageAPI (Rpc_lwt.GenServer ()) in + let module RuntimeMeta = struct + let volume_script_dir = volume_script_dir + + let version = version + end in + let module Query = QueryImpl (RuntimeMeta) in + S.Query.query Query.query_impl ; + S.Query.diagnostics Query.query_diagnostics_impl ; + + let module SR = SRImpl (RuntimeMeta) in + S.SR.attach SR.sr_attach_impl ; + S.SR.detach SR.sr_detach_impl ; + S.SR.probe SR.sr_probe_impl ; + S.SR.create SR.sr_create_impl ; + S.SR.set_name_label SR.sr_set_name_label_impl ; + S.SR.set_name_description SR.sr_set_name_description_impl ; + S.SR.destroy SR.sr_destroy_impl ; + S.SR.scan SR.sr_scan_impl ; + S.SR.scan2 SR.sr_scan2_impl ; + S.SR.stat SR.sr_stat_impl ; + S.SR.list SR.sr_list ; + (* SR.reset is a no op in SMAPIv3 *) + S.SR.reset SR.sr_reset ; + + let module VDI = VDIImpl (RuntimeMeta) in + S.VDI.create VDI.vdi_create_impl ; + S.VDI.destroy VDI.vdi_destroy_impl ; + S.VDI.snapshot VDI.vdi_snapshot_impl ; + S.VDI.clone VDI.vdi_clone_impl ; + S.VDI.set_name_label VDI.vdi_set_name_label_impl ; + S.VDI.set_name_description VDI.vdi_set_name_description_impl ; + S.VDI.resize VDI.vdi_resize_impl ; + S.VDI.stat VDI.vdi_stat_impl ; + S.VDI.introduce VDI.vdi_introduce_impl ; + S.VDI.attach3 VDI.vdi_attach3_impl ; + S.VDI.activate3 VDI.vdi_activate3_impl ; + S.VDI.activate_readonly VDI.vdi_activate_readonly_impl ; + S.VDI.deactivate VDI.vdi_deactivate_impl ; + S.VDI.detach VDI.vdi_detach_impl ; + S.VDI.epoch_begin VDI.vdi_epoch_begin_impl ; + S.VDI.epoch_end VDI.vdi_epoch_end_impl ; + S.VDI.set_persistent VDI.vdi_set_persistent_impl ; + S.VDI.enable_cbt VDI.vdi_enable_cbt_impl ; + S.VDI.disable_cbt VDI.vdi_disable_cbt_impl ; + S.VDI.list_changed_blocks VDI.vdi_list_changed_blocks_impl ; + S.VDI.data_destroy VDI.vdi_data_destroy_impl ; + + let module DP = DPImpl (RuntimeMeta) in + S.DP.destroy2 DP.dp_destroy2 ; + let u name _ = failwith ("Unimplemented: " ^ name) in S.get_by_name (u "get_by_name") ; S.VDI.compose (u "VDI.compose") ;