diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 06bd08f2e4a..721e2c63221 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -30,6 +30,8 @@ ff39018fd6d91985f9c893a56928771dfe9fa48d cbb9edb17dfd122c591beb14d1275acc39492335 d6ab15362548b8fe270bd14d5153b8d94e1b15c0 b12cf444edea15da6274975e1b2ca6a7fce2a090 +364c27f5d18ab9dd31825e67a93efabecad06823 +d8b4de9076531dd13bdffa20cc10c72290a52356 # ocp-indent d018d26d6acd4707a23288b327b49e44f732725e diff --git a/Makefile b/Makefile index 53d01a4b063..7f7386bf6b1 100644 --- a/Makefile +++ b/Makefile @@ -150,9 +150,9 @@ install-extra: DUNE_IU_PACKAGES1=-j $(JOBS) --destdir=$(DESTDIR) --prefix=$(PREFIX) --libdir=$(LIBDIR) --mandir=$(MANDIR) DUNE_IU_PACKAGES1+=--libexecdir=$(XENOPSD_LIBEXECDIR) --datadir=$(SDKDIR) DUNE_IU_PACKAGES1+=xapi-client xapi-schema xapi-consts xapi-cli-protocol xapi-datamodel xapi-types -DUNE_IU_PACKAGES1+=xen-api-client xen-api-client-lwt xen-api-client-async rrdd-plugin rrd-transport +DUNE_IU_PACKAGES1+=xen-api-client xen-api-client-lwt rrdd-plugin rrd-transport DUNE_IU_PACKAGES1+=gzip http-lib pciutil sexpr stunnel uuid xml-light2 zstd xapi-compression safe-resources -DUNE_IU_PACKAGES1+=message-switch message-switch-async message-switch-cli message-switch-core message-switch-lwt +DUNE_IU_PACKAGES1+=message-switch message-switch-cli message-switch-core message-switch-lwt DUNE_IU_PACKAGES1+=message-switch-unix xapi-idl forkexec xapi-forkexecd xapi-storage xapi-storage-script xapi-storage-cli DUNE_IU_PACKAGES1+=xapi-nbd varstored-guard xapi-log xapi-open-uri xapi-tracing xapi-tracing-export xapi-expiry-alerts cohttp-posix DUNE_IU_PACKAGES1+=xapi-rrd xapi-inventory clock xapi-sdk diff --git a/dune-project b/dune-project index 36e6e4e5766..4e6e0446c30 100644 --- a/dune-project +++ b/dune-project @@ -61,10 +61,6 @@ ) -(package - (name xen-api-client-async) -) - (package (name xen-api-client) (synopsis "Xen-API client library for remotely-controlling a xapi host") @@ -527,10 +523,6 @@ This package provides an Lwt compatible interface to the library.") (name pciutil) ) -(package - (name message-switch-async) -) - (package (name message-switch-lwt) ) diff --git a/message-switch-async.opam b/message-switch-async.opam deleted file mode 100644 index ac53e522c21..00000000000 --- a/message-switch-async.opam +++ /dev/null @@ -1,35 +0,0 @@ -# This file is generated by dune, edit dune-project instead -license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception" -opam-version: "2.0" -name: "message-switch-async" -maintainer: "xen-api@lists.xen.org" -authors: [ "xen-api@lists.xen.org" ] -homepage: "https://github.com/xapi-project/xen-api" -bug-reports: "https://github.com/xapi-project/xen-api/issues" -dev-repo: "git+https://github.com/xapi-project/xen-api.git" -tags: [ "org:xapi-project" ] -build: [ - ["./configure" "--prefix" "%{prefix}%"] - [ "dune" "build" "-p" name "-j" jobs ] -] -depends: [ - "ocaml" - "dune" {>= "3.15"} - "odoc" {with-doc} - "async" {>= "v0.9.0"} - "async_kernel" - "async_unix" - "base" - "core" - "core_kernel" - "core_unix" - "cohttp-async" {>= "1.0.2"} - "message-switch-core" -] -synopsis: "A simple store-and-forward message switch" -description: """ -The switch stores messages in queues with well-known names. Clients use -a simple HTTP protocol to enqueue and dequeue messages.""" -url { - src: "https://github.com/xapi-project/xen-api/archive/master.tar.gz" -} diff --git a/message-switch-async.opam.template b/message-switch-async.opam.template deleted file mode 100644 index aaa69dc257e..00000000000 --- a/message-switch-async.opam.template +++ /dev/null @@ -1,33 +0,0 @@ -opam-version: "2.0" -name: "message-switch-async" -maintainer: "xen-api@lists.xen.org" -authors: [ "xen-api@lists.xen.org" ] -homepage: "https://github.com/xapi-project/xen-api" -bug-reports: "https://github.com/xapi-project/xen-api/issues" -dev-repo: "git+https://github.com/xapi-project/xen-api.git" -tags: [ "org:xapi-project" ] -build: [ - ["./configure" "--prefix" "%{prefix}%"] - [ "dune" "build" "-p" name "-j" jobs ] -] -depends: [ - "ocaml" - "dune" {>= "3.15"} - "odoc" {with-doc} - "async" {>= "v0.9.0"} - "async_kernel" - "async_unix" - "base" - "core" - "core_kernel" - "core_unix" - "cohttp-async" {>= "1.0.2"} - "message-switch-core" -] -synopsis: "A simple store-and-forward message switch" -description: """ -The switch stores messages in queues with well-known names. Clients use -a simple HTTP protocol to enqueue and dequeue messages.""" -url { - src: "https://github.com/xapi-project/xen-api/archive/master.tar.gz" -} diff --git a/message-switch.opam b/message-switch.opam index 4ee77fdca5d..f0dcf7ff224 100644 --- a/message-switch.opam +++ b/message-switch.opam @@ -18,11 +18,9 @@ depends: [ "dune" {>= "3.15"} "odoc" {with-doc} "cmdliner" - "cohttp-async" {with-test} "cohttp-lwt-unix" "io-page" {>= "2.4.0"} "lwt_log" - "message-switch-async" {with-test} "message-switch-lwt" "message-switch-unix" "mirage-block-unix" {>= "2.4.0"} diff --git a/message-switch.opam.template b/message-switch.opam.template index 8a898c41747..a33fe27cb3e 100644 --- a/message-switch.opam.template +++ b/message-switch.opam.template @@ -16,11 +16,9 @@ depends: [ "dune" {>= "3.15"} "odoc" {with-doc} "cmdliner" - "cohttp-async" {with-test} "cohttp-lwt-unix" "io-page" {>= "2.4.0"} "lwt_log" - "message-switch-async" {with-test} "message-switch-lwt" "message-switch-unix" "mirage-block-unix" {>= "2.4.0"} diff --git a/ocaml/message-switch/async/dune b/ocaml/message-switch/async/dune deleted file mode 100644 index 89f2c3a5ff4..00000000000 --- a/ocaml/message-switch/async/dune +++ /dev/null @@ -1,17 +0,0 @@ -(library - (name message_switch_async) - (public_name message-switch-async) - (libraries - (re_export async) - (re_export async_unix) - async_kernel - base - cohttp-async - (re_export core) - core_unix - core_kernel - core_unix.time_unix - message-switch-core - ) -) - diff --git a/ocaml/message-switch/async/protocol_async.ml b/ocaml/message-switch/async/protocol_async.ml deleted file mode 100644 index 2bc34621563..00000000000 --- a/ocaml/message-switch/async/protocol_async.ml +++ /dev/null @@ -1,141 +0,0 @@ -(* - * Copyright (c) Citrix Systems Inc. - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -let whoami () = - Printf.sprintf "%s:%d" (Filename.basename Sys.argv.(0)) (Unix.getpid ()) - -open Core -open Async - -module M = struct - let whoami = whoami - - module IO = struct - include Cohttp_async.Io - - let map f t = Deferred.map ~f t - - let iter f t = Deferred.List.iter t ~f - - let iter_dontwait f t = - Deferred.don't_wait_for @@ Deferred.List.iter ~how:`Parallel t ~f - - let any = Deferred.any - - let all = Deferred.all - - let is_determined = Deferred.is_determined - - let return_unit = Deferred.unit - end - - let connect path = - let maximum_delay = 30. in - let connect () = - let s = Socket.create Socket.Type.unix in - Monitor.try_with ~extract_exn:true (fun () -> - Socket.connect s (Socket.Address.Unix.create path) - ) - >>= function - | Ok _x -> - let fd = Socket.fd s in - let reader = Reader.create fd in - let writer = Writer.create fd in - return (fd, reader, writer) - | Error e -> - Socket.shutdown s `Both ; raise e - in - let rec retry delay = - Monitor.try_with ~extract_exn:true connect >>= function - | Error - (Unix.Unix_error - (Core_unix.(ECONNREFUSED | ECONNABORTED | ENOENT), _, _) - ) -> - let delay = Float.min maximum_delay delay in - Clock.after (Time.Span.of_sec delay) >>= fun () -> - retry (delay +. delay) - | Error e -> - raise e - | Ok (_, reader, writer) -> - return (reader, writer) - in - retry 0.5 - - let disconnect (_, writer) = Writer.close writer - - module Ivar = struct include Ivar end - - module Mutex = struct - type t = {mutable m: bool; c: unit Condition.t} - - let create () = - let m = false in - let c = Condition.create () in - {m; c} - - let with_lock t f = - let rec wait () = - if Bool.(t.m = false) then ( - t.m <- true ; - return () - ) else - Condition.wait t.c >>= wait - in - wait () >>= fun () -> - Monitor.protect f ~finally:(fun () -> - t.m <- false ; - Condition.broadcast t.c () ; - return () - ) - end - - module Condition = struct - open Async_kernel - - type 'a t = 'a Condition.t - - let create = Condition.create - - let wait = Condition.wait - - let broadcast = Condition.broadcast - - let signal = Condition.signal - end - - module Clock = struct - type timer = {cancel: unit Ivar.t} - - let run_after timeout f = - let timer = {cancel= Ivar.create ()} in - let cancelled = Ivar.read timer.cancel in - let sleep = Clock.after (Time.Span.of_sec (Float.of_int timeout)) in - let _ = - Deferred.any [cancelled; sleep] >>= fun () -> - if Deferred.is_determined cancelled then - return () - else - return (f ()) - in - timer - - let cancel t = Ivar.fill t.cancel () - end -end - -module Client = Message_switch_core.Make.Client (M) -module Server = Message_switch_core.Make.Server (M) -module Mtest = Message_switch_core.Mtest.Make (M) diff --git a/ocaml/message-switch/async/protocol_async.mli b/ocaml/message-switch/async/protocol_async.mli deleted file mode 100644 index d18b37b742c..00000000000 --- a/ocaml/message-switch/async/protocol_async.mli +++ /dev/null @@ -1,23 +0,0 @@ -(* - * Copyright (c) Citrix Systems Inc. - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) -open Async -open Message_switch_core - -module Client : S.CLIENT with type 'a io = 'a Deferred.t - -module Server : S.SERVER with type 'a io = 'a Deferred.t - -module Mtest : Mtest.MTEST with type 'a io = 'a Deferred.t diff --git a/ocaml/message-switch/core_test/async/client_async_main.ml b/ocaml/message-switch/core_test/async/client_async_main.ml deleted file mode 100644 index daedfe59bae..00000000000 --- a/ocaml/message-switch/core_test/async/client_async_main.ml +++ /dev/null @@ -1,94 +0,0 @@ -(* - * Copyright (c) Citrix Systems Inc. - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -module P = Printf -open Core -open Async -open Message_switch_async.Protocol_async - -let path = ref "/var/run/message-switch/sock" - -let name = ref "server" - -let payload = ref "hello" - -let timeout = ref None - -let shutdown = "shutdown" - -let ( >>|= ) m f = - m >>= function - | Ok x -> - f x - | Error y -> - let b = Buffer.create 16 in - let fmt = Format.formatter_of_buffer b in - Client.pp_error fmt y ; - Format.pp_print_flush fmt () ; - raise (Failure (Buffer.contents b)) - -let main () = - Client.connect ~switch:!path () >>|= fun t -> - let counter = ref 0 in - let one () = - incr counter ; - Client.rpc ~t ~queue:!name ~body:!payload () >>|= fun _ -> return () - in - let start = Time.now () in - ( match !timeout with - | None -> - one () - | Some t -> - let rec loop () = - let sofar = Time.diff (Time.now ()) start in - if Time.Span.(sofar > of_sec t) then - return () - else - one () >>= fun () -> loop () - in - loop () - ) - >>= fun () -> - let time = Time.diff (Time.now ()) start in - P.printf "Finished %d RPCs in %.02f\n%!" !counter (Time.Span.to_sec time) ; - Client.rpc ~t ~queue:!name ~body:shutdown () >>|= fun _ -> Shutdown.exit 0 - -let _ = - Arg.parse - [ - ( "-path" - , Arg.Set_string path - , Printf.sprintf "path broker listens on (default %s)" !path - ) - ; ( "-name" - , Arg.Set_string name - , Printf.sprintf "name to send message to (default %s)" !name - ) - ; ( "-payload" - , Arg.Set_string payload - , Printf.sprintf "payload of message to send (default %s)" !payload - ) - ; ( "-secs" - , Arg.String (fun x -> timeout := Some (Float.of_string x)) - , Printf.sprintf - "number of seconds to repeat the same message for (default %s)" - (match !timeout with None -> "None" | Some x -> Float.to_string x) - ) - ] - (fun x -> P.fprintf stderr "Ignoring unexpected argument: %s" x) - "Send a message to a name, optionally waiting for a response" ; - let (_ : 'a Deferred.t) = main () in - never_returns (Scheduler.go ()) diff --git a/ocaml/message-switch/core_test/async/dune b/ocaml/message-switch/core_test/async/dune deleted file mode 100644 index 6e690c35e1d..00000000000 --- a/ocaml/message-switch/core_test/async/dune +++ /dev/null @@ -1,21 +0,0 @@ -(executables - (modes exe) - (names - client_async_main - server_async_main - ) - (libraries - async - async_kernel - async_unix - base - base.caml - cohttp-async - core - core_kernel - core_unix - core_unix.time_unix - message-switch-async - ) -) - diff --git a/ocaml/message-switch/core_test/async/server_async_main.ml b/ocaml/message-switch/core_test/async/server_async_main.ml deleted file mode 100644 index cd7984bec27..00000000000 --- a/ocaml/message-switch/core_test/async/server_async_main.ml +++ /dev/null @@ -1,66 +0,0 @@ -(* - * Copyright (c) Citrix Systems Inc. - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -module P = Printf -open Core -open Async -open Message_switch_async.Protocol_async - -let path = ref "/var/run/message-switch/sock" - -let name = ref "server" - -let concurrent = ref false - -let shutdown = Ivar.create () - -let process = function - | "shutdown" -> - Ivar.fill shutdown () ; return "ok" - | x -> - return x - -let main () = - let (_ : 'a Deferred.t) = - if !concurrent then - Server.listen_p ~process ~switch:!path ~queue:!name () - else - Server.listen ~process ~switch:!path ~queue:!name () - in - Ivar.read shutdown >>= fun () -> - Clock.after (Time.Span.of_sec 1.) >>= fun () -> exit 0 - -let _ = - Arg.parse - [ - ( "-path" - , Arg.Set_string path - , Printf.sprintf "path broker listens on (default %s)" !path - ) - ; ( "-name" - , Arg.Set_string name - , Printf.sprintf "name to send message to (default %s)" !name - ) - ; ( "-concurrent" - , Arg.Set concurrent - , Printf.sprintf "set concurrent processing of messages (default %b)" - !concurrent - ) - ] - (fun x -> P.fprintf stderr "Ignoring unexpected argument: %s" x) - "Respond to RPCs on a name" ; - let (_ : 'a Deferred.t) = main () in - never_returns (Scheduler.go ()) diff --git a/ocaml/message-switch/core_test/basic-rpc-test.sh b/ocaml/message-switch/core_test/basic-rpc-test.sh index bc281c65f45..851c972b831 100755 --- a/ocaml/message-switch/core_test/basic-rpc-test.sh +++ b/ocaml/message-switch/core_test/basic-rpc-test.sh @@ -29,16 +29,6 @@ SERVER=$! lwt/client_main.exe -path "${SPATH}" -secs "${SECS}" wait "${SERVER}" -echo Performance test of Async to Lwt +echo Performance test of Lwt to Unix lwt/server_main.exe -path "${SPATH}" & -SERVER=$! -async/client_async_main.exe -path "${SPATH}" -secs "${SECS}" -wait "${SERVER}" - -echo Performance test of Async to Async -async/server_async_main.exe -path "${SPATH}" & -SERVER=$! -async/client_async_main.exe -path "${SPATH}" -secs "${SECS}" -wait "${SERVER}" - -../cli/main.exe shutdown --path "${SPATH}" +./client_unix_main.exe -path "${SPATH}" -secs "${SECS}" diff --git a/ocaml/message-switch/core_test/concur-rpc-test.sh b/ocaml/message-switch/core_test/concur-rpc-test.sh index 1403946ba5b..c861516f3c0 100755 --- a/ocaml/message-switch/core_test/concur-rpc-test.sh +++ b/ocaml/message-switch/core_test/concur-rpc-test.sh @@ -29,16 +29,15 @@ SERVER=$! lwt/client_main.exe -path "${SPATH}" -secs "${SECS}" wait "${SERVER}" -echo Performance test of Async to Lwt +echo Performance test of Unix to Lwt lwt/server_main.exe -path "${SPATH}" -concurrent & SERVER=$! -async/client_async_main.exe -path "${SPATH}" -secs "${SECS}" +./client_unix_main.exe -path "${SPATH}" -secs "${SECS}" wait "${SERVER}" - -echo Performance test of Async to Async -async/server_async_main.exe -path "${SPATH}" -concurrent & +echo Performance test of Lwt to Unix +./server_unix_main.exe -path "${SPATH}" & SERVER=$! -async/client_async_main.exe -path "${SPATH}" -secs "${SECS}" +lwt/client_main.exe -path "${SPATH}" -secs "${SECS}" wait "${SERVER}" ../cli/main.exe shutdown --path "${SPATH}" diff --git a/ocaml/message-switch/core_test/dune b/ocaml/message-switch/core_test/dune index cda5c5125aa..a7f0396538d 100644 --- a/ocaml/message-switch/core_test/dune +++ b/ocaml/message-switch/core_test/dune @@ -3,33 +3,21 @@ (names client_unix_main server_unix_main - lock_test_async lock_test_lwt ) (modules client_unix_main - server_unix_main - lock_test_async + server_unix_main lock_test_lwt ) (libraries message-switch-unix message-switch-core - message-switch-async message-switch-lwt threads.posix ) ) -(rule - (alias runtest) - (deps - lock_test_async.exe - ) - (action (run ./lock_test_async.exe)) - (package message-switch) -) - (rule (alias runtest) (deps @@ -45,8 +33,6 @@ (deps client_unix_main.exe server_unix_main.exe - async/client_async_main.exe - async/server_async_main.exe lwt/client_main.exe lwt/server_main.exe lwt/link_test_main.exe @@ -80,8 +66,6 @@ (deps client_unix_main.exe server_unix_main.exe - async/client_async_main.exe - async/server_async_main.exe lwt/client_main.exe lwt/server_main.exe lwt/link_test_main.exe diff --git a/ocaml/message-switch/core_test/lock_test_async.ml b/ocaml/message-switch/core_test/lock_test_async.ml deleted file mode 100644 index 85cde8eaecb..00000000000 --- a/ocaml/message-switch/core_test/lock_test_async.ml +++ /dev/null @@ -1,13 +0,0 @@ -open Core -open Async -open Message_switch_async - -let ( >>= ) = Deferred.( >>= ) - -let test_async_lock () = Protocol_async.Mtest.mutex_provides_mutal_exclusion () - -let () = - don't_wait_for - (test_async_lock () >>= fun () -> shutdown 0 ; Deferred.return ()) - -let () = never_returns (Scheduler.go ()) diff --git a/ocaml/xapi-storage-script/dune b/ocaml/xapi-storage-script/dune index a3b86f166b4..06e912ee9bb 100644 --- a/ocaml/xapi-storage-script/dune +++ b/ocaml/xapi-storage-script/dune @@ -1,23 +1,42 @@ +(library + (name private) + (modules lib) + (libraries + fmt + inotify + inotify.lwt + lwt + lwt.unix + rpclib.core + ) + (preprocess (pps ppx_deriving_rpc)) + ) + +(test + (name test_lib) + (modules test_lib) + (libraries alcotest alcotest-lwt lwt fmt private) + ) + (executable (name main) + (modules main) (libraries - async - async_inotify - async_kernel - async_unix base - base.caml - core - core_unix - core_unix.time_unix - - message-switch-async + + fmt + logs + logs.lwt + lwt + lwt.unix + message-switch-lwt message-switch-unix ppx_deriving.runtime + private result rpclib.core rpclib.json - rpclib-async + rpclib-lwt sexplib sexplib0 uri @@ -33,7 +52,7 @@ xapi-stdext-date xapi-storage ) - (preprocess (pps ppx_deriving_rpc ppx_sexp_conv)) + (preprocess (pps ppx_sexp_conv)) ) (install diff --git a/ocaml/xapi-storage-script/lib.ml b/ocaml/xapi-storage-script/lib.ml new file mode 100644 index 00000000000..9c9059432bf --- /dev/null +++ b/ocaml/xapi-storage-script/lib.ml @@ -0,0 +1,260 @@ +(* Copyright (C) Cloud Software Group Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +module StringMap = Map.Make (String) + +module Types = struct + type backtrace = { + error: string + ; (* Python json.dumps and rpclib are not very friendly *) + files: string list + ; lines: int list + } + [@@deriving rpc] + + (* This matches xapi.py:exception *) + type error = {code: string; params: string list; backtrace: backtrace} + [@@deriving rpc] +end + +let ( >>= ) = Lwt.bind + +let return = Lwt_result.return + +let fail = Lwt_result.fail + +let ( // ) = Filename.concat + +module Sys = struct + type file = Regular | Directory | Other | Missing | Unknown + + let file_kind ~follow_symlinks path = + Lwt.try_bind + (fun () -> + ( if follow_symlinks then + Lwt_unix.LargeFile.stat + else + Lwt_unix.LargeFile.lstat + ) + path + ) + (function + | s -> ( + match s.Unix.LargeFile.st_kind with + | Unix.S_REG -> + Lwt.return Regular + | Unix.S_DIR -> + Lwt.return Directory + | _ -> + Lwt.return Other + ) + ) + (function + | Unix.Unix_error (Unix.ENOENT, _, _) -> + Lwt.return Missing + | Unix.Unix_error ((Unix.EACCES | Unix.ELOOP), _, _) -> + Lwt.return Unknown + | e -> + Lwt.fail e + ) + + let access path modes = + Lwt.try_bind + (fun () -> Lwt_unix.access path modes) + Lwt_result.return + (fun exn -> fail (`not_executable (path, exn))) + + let assert_is_executable path = + file_kind ~follow_symlinks:true path >>= function + | Directory | Other | Missing | Unknown -> + fail (`missing path) + | Regular -> ( + access path [Unix.X_OK] >>= function + | Error exn -> + fail exn + | Ok () -> + return () + ) + + let read_file_contents path = + Lwt_io.(with_file ~mode:input ~flags:[O_RDONLY] ~perm:0o000 path read) + + let save ~contents path = + Lwt_io.(with_file ~mode:output path (Fun.flip write contents)) + + let readdir path = + path |> Lwt_unix.files_of_directory |> Lwt_stream.to_list >>= fun listing -> + List.filter (function "." | ".." -> false | _ -> true) listing + |> Lwt.return + + let mkdir_p ?(perm = 0o755) path = + let rec loop acc path = + let create_dir () = Lwt_unix.mkdir path perm in + let create_subdirs () = Lwt_list.iter_s (fun f -> f ()) acc in + Lwt.try_bind create_dir create_subdirs (function + | Unix.(Unix_error (EEXIST, _, _)) -> + (* create directories, parents first *) + create_subdirs () + | Unix.(Unix_error (ENOENT, _, _)) -> + let parent = Filename.dirname path in + loop (create_dir :: acc) parent + | exn -> + let msg = + Printf.sprintf {|Could not create directory "%s" because: %s|} + path (Printexc.to_string exn) + in + Lwt.fail (Failure msg) + ) + in + loop [] path +end + +module Signal = struct + type t = int + + let to_string s = Fmt.(str "%a" Dump.signal s) +end + +module Process = struct + module Output = struct + type exit_or_signal = Exit_non_zero of int | Signal of Signal.t + + type t = { + exit_status: (unit, exit_or_signal) Result.t + ; stdout: string + ; stderr: string + } + + let exit_or_signal_of_unix = function + | Unix.WEXITED 0 -> + Ok () + | WEXITED n -> + Error (Exit_non_zero n) + | WSIGNALED n -> + Error (Signal n) + | WSTOPPED n -> + Error (Signal n) + end + + let with_process ~env ~prog ~args f = + let args = Array.of_list (prog :: args) in + let cmd = (prog, args) in + + let env = + Unix.environment () + |> Array.to_seq + |> Seq.map (fun kv -> + let k, v = Scanf.sscanf kv "%s@=%s" (fun k v -> (k, v)) in + (k, v) + ) + |> StringMap.of_seq + |> StringMap.add_seq (List.to_seq env) + |> StringMap.to_seq + |> Seq.map (fun (k, v) -> Printf.sprintf "%s=%s" k v) + |> Array.of_seq + in + + Lwt_process.with_process_full ~env cmd f + + let close chan () = Lwt_io.close chan + + let send chan data = + Lwt.finalize (fun () -> Lwt_io.write chan data) (close chan) + + let receive chan = Lwt.finalize (fun () -> Lwt_io.read chan) (close chan) + + let run ~env ~prog ~args ~input = + let ( let@ ) f x = f x in + let@ p = with_process ~env ~prog ~args in + let sender = send p#stdin input in + let receiver_out = receive p#stdout in + let receiver_err = receive p#stderr in + Lwt.catch + (fun () -> + let receiver = Lwt.both receiver_out receiver_err in + Lwt.both sender receiver >>= fun ((), (stdout, stderr)) -> + p#status >>= fun status -> + let exit_status = Output.exit_or_signal_of_unix status in + Lwt.return {Output.exit_status; stdout; stderr} + ) + (function + | Lwt.Canceled as exn -> + Lwt.cancel receiver_out ; Lwt.cancel receiver_err ; Lwt.fail exn + | exn -> + Lwt.fail exn + ) +end + +module DirWatcher = struct + type event = Modified of string | Changed + + let create path = + Lwt_inotify.create () >>= fun desc -> + let watches = Hashtbl.create 32 in + let selectors = + Inotify.[S_Close; S_Create; S_Delete; S_Delete_self; S_Modify; S_Move] + in + Lwt_inotify.add_watch desc path selectors >>= fun watch -> + (* Deduplicate the watches by removing the previous one from inotify and + replacing it in the table *) + let maybe_remove = + if Hashtbl.mem watches watch then + Lwt_inotify.rm_watch desc watch + else + Lwt.return_unit + in + maybe_remove >>= fun () -> + Hashtbl.replace watches watch path ; + Lwt.return (watches, desc) + + let read (watches, desc) = + Lwt_inotify.read desc >>= fun (wd, mask, _cookie, filename) -> + let overflowed = + Inotify.int_of_watch wd = -1 && mask = [Inotify.Q_overflow] + in + let watch_path = Hashtbl.find_opt watches wd in + match (overflowed, watch_path) with + | true, _ -> + Lwt.return [Changed] + | _, None -> + Lwt.return [] + | _, Some base_path -> + let path = + match filename with + | None -> + base_path + | Some name -> + base_path // name + in + + List.filter_map + (function + | Inotify.Access + | Attrib + | Isdir + | Open + | Close_nowrite + | Ignored + | Unmount -> + None + | Close_write | Modify | Move_self -> + Some (Modified path) + | Create | Delete | Delete_self | Moved_from | Moved_to | Q_overflow + -> + Some Changed + ) + mask + |> Lwt.return +end + +module Clock = struct let after ~seconds = Lwt_unix.sleep seconds end diff --git a/ocaml/xapi-storage-script/lib.mli b/ocaml/xapi-storage-script/lib.mli new file mode 100644 index 00000000000..a55c4b81fbc --- /dev/null +++ b/ocaml/xapi-storage-script/lib.mli @@ -0,0 +1,98 @@ +(* Copyright (C) Cloud Software Group Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +module Types : sig + type backtrace = {error: string; files: string list; lines: int list} + + val rpc_of_backtrace : backtrace -> Rpc.t + + val backtrace_of_rpc : Rpc.t -> backtrace + + type error = {code: string; params: string list; backtrace: backtrace} + + val rpc_of_error : error -> Rpc.t + + val error_of_rpc : Rpc.t -> error +end + +module Sys : sig + type file = Regular | Directory | Other | Missing | Unknown + + val file_kind : follow_symlinks:bool -> string -> file Lwt.t + + val access : + string + -> Unix.access_permission list + -> (unit, [> `not_executable of string * exn]) result Lwt.t + + val assert_is_executable : + string + -> (unit, [> `missing of string | `not_executable of string * exn]) result + Lwt.t + (** [assert_is_executable path] returns [Ok ()] when [path] is an executable + regular file, [Error `not_executable] when the file is a non-executable + regular file, and [Error `missing] otherwise. The [Errors] return the + queried path as a string. *) + + val read_file_contents : string -> string Lwt.t + + val save : contents:string -> string -> unit Lwt.t + + val readdir : string -> string list Lwt.t + + val mkdir_p : ?perm:int -> string -> unit Lwt.t +end + +module Signal : sig + type t + + val to_string : t -> string +end + +module Process : sig + module Output : sig + type exit_or_signal = Exit_non_zero of int | Signal of Signal.t + + type t = { + exit_status: (unit, exit_or_signal) result + ; stdout: string + ; stderr: string + } + end + + val run : + env:(string * string) list + -> prog:string + -> args:string list + -> input:string + -> Output.t Lwt.t + (** Runs a cli program, writes [input] into its stdin, then closing the fd, + and finally waits for the program to finish and returns the exit status, + its stdout and stderr. *) +end + +module DirWatcher : sig + type event = + | Modified of string (** File contents changed *) + | Changed (** Something in the directory changed, read anew *) + + val create : + string -> ((Inotify.watch, string) Hashtbl.t * Lwt_inotify.t) Lwt.t + + val read : + (Inotify.watch, string) Hashtbl.t * Lwt_inotify.t -> event list Lwt.t +end + +module Clock : sig + val after : seconds:float -> unit Lwt.t +end diff --git a/ocaml/xapi-storage-script/main.ml b/ocaml/xapi-storage-script/main.ml index fb4ac093489..96c68e73a82 100644 --- a/ocaml/xapi-storage-script/main.ml +++ b/ocaml/xapi-storage-script/main.ml @@ -11,18 +11,35 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. *) -module U = Unix module R = Rpc -module B = Backtrace -open Core -open Async -open Xapi_storage_script_types -module Plugin_client = Xapi_storage.Plugin.Plugin (Rpc_async.GenClient ()) -module Volume_client = Xapi_storage.Control.Volume (Rpc_async.GenClient ()) -module Sr_client = Xapi_storage.Control.Sr (Rpc_async.GenClient ()) -module Datapath_client = Xapi_storage.Data.Datapath (Rpc_async.GenClient ()) +module Plugin_client = Xapi_storage.Plugin.Plugin (Rpc_lwt.GenClient ()) +module Volume_client = Xapi_storage.Control.Volume (Rpc_lwt.GenClient ()) +module Sr_client = Xapi_storage.Control.Sr (Rpc_lwt.GenClient ()) +module Datapath_client = Xapi_storage.Data.Datapath (Rpc_lwt.GenClient ()) +open Private.Lib -let ( >>>= ) = Deferred.Result.( >>= ) +let ( >>= ) = Lwt.bind + +let ( let* ) = Lwt.bind + +let ( >>| ) = Fun.flip Lwt.map + +let ( >>>= ) = Lwt_result.bind + +let return = Lwt_result.return + +let fail = Lwt_result.fail + +let ( // ) = Filename.concat + +module Deferred = struct + let errorf fmt = + Printf.ksprintf (fun m -> Lwt.return (Base.Or_error.error_string m)) fmt + + let combine_errors lst = Lwt.all lst >>| Base.Or_error.combine_errors + + let try_with f = Lwt.try_bind f return fail +end type config = {mutable use_observer: bool} @@ -43,7 +60,7 @@ let backend_backtrace_error name args backtrace = | ["Activated_on_another_host"; uuid] -> Errors.Activated_on_another_host uuid | _ -> - let backtrace = rpc_of_backtrace backtrace |> Jsonrpc.to_string in + let backtrace = Types.rpc_of_backtrace backtrace |> Jsonrpc.to_string in Errors.Backend_error_with_backtrace (name, backtrace :: args) let missing_uri () = @@ -58,36 +75,28 @@ let missing_uri () = (* fork_exec_rpc either raises a Fork_exec_error exception or returns a successful RPC response *) let return_rpc typ result = - (* Operator to unwrap the wrapped async return type of ocaml-rpc's Rpc_async *) - let ( >*= ) a b = a |> Rpc_async.T.get >>= b in - Monitor.try_with ~extract_exn:true (fun () -> + Lwt.catch + (fun () -> (* We need to delay the evaluation of [result] until now, because when fork_exec_rpc is called by GenClient.declare, it might immediately raise a Fork_exec_error *) - result () >*= fun result -> - (* In practice we'll always get a successful RPC response here (Ok), - but we still have to transform the Error to make the types match: *) - let result = - Result.map_error result ~f:(fun err -> - backend_error "SCRIPT_RETURNED_RPC_ERROR" - [Rpcmarshal.marshal typ err |> R.to_string] - ) - in - return result - ) - >>= function - | Ok result -> - return result - | Error (Fork_exec_error err) -> - return (Error err) - (* We should not get any other exception from fork_exec_rpc: *) - | Error e -> - return - (Error - (backend_error "SCRIPT_FAILED" - ["Unexpected exception:" ^ Exn.to_string e] - ) + Fun.flip Lwt.map + (Rpc_lwt.T.get (result ())) + (* In practice we'll always get a successful RPC response here (Ok), + but we still have to transform the Error to make the types match: *) + (Base.Result.map_error ~f:(fun err -> + backend_error "SCRIPT_RETURNED_RPC_ERROR" + [Rpcmarshal.marshal typ err |> R.to_string] + ) ) + ) + (function + | Fork_exec_error err -> + fail err + | e -> + let msg = ["Unexpected exception:" ^ Base.Exn.to_string e] in + fail (backend_error "SCRIPT_FAILED" msg) + ) let return_volume_rpc result = return_rpc Xapi_storage.Control.typ_of_exns result @@ -96,37 +105,67 @@ let return_plugin_rpc result = return_rpc Xapi_storage.Common.typ_of_exnt result let return_data_rpc result = return_rpc Xapi_storage.Common.typ_of_exnt result -let use_syslog = ref false - -let log level fmt = - Printf.ksprintf - (fun s -> - if !use_syslog then - (* FIXME: this is synchronous and will block other I/O. - * This should use Log_extended.Syslog, but that brings in Core's Syslog module - * which conflicts with ours *) - Syslog.log Syslog.Daemon level s - else - let w = Lazy.force Writer.stderr in - Writer.write w s ; Writer.newline w +(* Reporter taken from + https://erratique.ch/software/logs/doc/Logs_lwt/index.html#report_ex + under ISC License *) +let lwt_reporter () = + let buf_fmt ~like = + let b = Buffer.create 512 in + ( Fmt.with_buffer ~like b + , fun () -> + let m = Buffer.contents b in + Buffer.reset b ; m ) - fmt + in + let app, app_flush = buf_fmt ~like:Fmt.stdout in + let dst, dst_flush = buf_fmt ~like:Fmt.stderr in + (* The default pretty-printer adds the binary name to the loglines, which + results in appearing twice per logline, override it instead *) + let pp_header = + let pf = Format.fprintf in + let pp_header ppf (l, h) = + if l = Logs.App then + match h with None -> () | Some h -> pf ppf "[%s] " h + else + match h with + | None -> + pf ppf "[%a] " Logs.pp_level l + | Some h -> + pf ppf "[%s] " h + in + pp_header + in + let reporter = Logs.format_reporter ~app ~dst ~pp_header () in + let report src level ~over k msgf = + let k () = + let write () = + match level with + | Logs.App -> + Lwt_io.write Lwt_io.stdout (app_flush ()) + | _ -> + Lwt_io.write Lwt_io.stderr (dst_flush ()) + in + let unblock () = over () |> Lwt.return in + Lwt.finalize write unblock |> Lwt.ignore_result ; + k () + in + reporter.Logs.report src level ~over:(fun () -> ()) k msgf + in + {Logs.report} -let debug fmt = log Syslog.Debug fmt +let debug = Logs_lwt.debug -let info fmt = log Syslog.Info fmt +let info = Logs_lwt.info -let warn fmt = log Syslog.Warning fmt +let warn = Logs_lwt.warn -let error fmt = log Syslog.Err fmt +let error = Logs_lwt.err let pvs_version = "3.0" let supported_api_versions = [pvs_version; "5.0"] -let api_max = List.fold_left ~f:String.max supported_api_versions ~init:"" - -let id x = x +let api_max = List.fold_left Base.String.max "" supported_api_versions (** A function that changes the input to make it compatible with an older script *) @@ -143,7 +182,7 @@ end) : sig (** Module for making the inputs and outputs compatible with the old PVS version of the storage scripts. *) - type device_config = (Core.String.t, string) Core.List.Assoc.t + type device_config = (string * string) list val compat_out_volume : compat_out (** Add the missing [sharable] field to the Dict in [rpc], to ensure the @@ -160,21 +199,20 @@ end) : sig -> ( device_config * compat_in * compat_out , Storage_interface.Errors.error ) - Deferred.Result.t + Lwt_result.t (** Compatiblity for the old PVS version of SR.create, which had signature [uri -> name -> desc -> config -> unit] *) val sr_attach : - device_config - -> (compat_in, Storage_interface.Errors.error) Deferred.Result.t + device_config -> (compat_in, Storage_interface.Errors.error) Lwt_result.t (** Compatiblity for the old PVS version of SR.attach, which had signature [uri -> sr (=string)] *) end = struct - type device_config = (Core.String.t, string) Core.List.Assoc.t + type device_config = (string * string) list let with_pvs_version f rpc = match !V.version with - | Some v when String.(v = pvs_version) -> + | Some v when Base.String.(v = pvs_version) -> f rpc | _ -> rpc @@ -206,7 +244,7 @@ end = struct let add_fields_to_record_list_output fields = with_pvs_version (function | R.Enum l -> - R.Enum (List.map ~f:(add_fields_to_dict fields) l) + R.Enum (List.map (add_fields_to_dict fields) l) | rpc -> rpc ) @@ -221,21 +259,21 @@ end = struct old PVS scripts *) let compat_uri device_config = match !V.version with - | Some version when String.(version = pvs_version) -> ( - match List.Assoc.find ~equal:String.equal device_config "uri" with + | Some version when Base.String.(version = pvs_version) -> ( + match Base.List.Assoc.find ~equal:String.equal device_config "uri" with | None -> - return (Error (missing_uri ())) + fail (missing_uri ()) | Some uri -> - return (Ok (add_param_to_input [("uri", R.String uri)])) + return (add_param_to_input [("uri", R.String uri)]) ) | _ -> - return (Ok id) + return Fun.id let sr_create device_config = compat_uri device_config >>>= fun compat_in -> let compat_out = match !V.version with - | Some v when String.(v = pvs_version) -> ( + | Some v when Base.String.(v = pvs_version) -> ( function (* The PVS version will return nothing *) | R.Null -> @@ -245,55 +283,54 @@ end = struct rpc ) | _ -> - id + Fun.id in - return (Ok (device_config, compat_in, compat_out)) + return (device_config, compat_in, compat_out) let sr_attach = compat_uri end let check_plugin_version_compatible query_result = let Xapi_storage.Plugin.{name; required_api_version; _} = query_result in - if String.(required_api_version <> api_max) then - warn - "Using deprecated SMAPIv3 API version %s, latest is %s. Update your %s \ - plugin!" - required_api_version api_max name ; - if List.mem ~equal:String.equal supported_api_versions required_api_version - then - Deferred.Result.return () + ( if Base.String.(required_api_version <> api_max) then + warn (fun m -> + m + "Using deprecated SMAPIv3 API version %s, latest is %s. Update \ + your %s plugin!" + required_api_version api_max name + ) + else + Lwt.return_unit + ) + >>= fun () -> + if List.mem required_api_version supported_api_versions then + return () else let msg = Printf.sprintf "%s requires unknown SMAPI API version %s, supported: %s" name required_api_version - (String.concat ~sep:"," supported_api_versions) + (String.concat "," supported_api_versions) in - return (Error (Storage_interface.Errors.No_storage_plugin_for_sr msg)) + fail (Storage_interface.Errors.No_storage_plugin_for_sr msg) module RRD = struct - open Message_switch_async.Protocol_async + open Message_switch_lwt.Protocol_lwt let ( >>|= ) m f = - m >>= function - | Ok x -> - f x - | Error y -> - let b = Buffer.create 16 in - let fmt = Format.formatter_of_buffer b in - Client.pp_error fmt y ; - Format.pp_print_flush fmt () ; - raise (Failure (Buffer.contents b)) + m >>= fun x -> + Client.error_to_msg x + |> Result.fold ~ok:f ~error:(function `Msg err -> failwith err) let switch_rpc queue_name string_of_call response_of_string call = Client.connect ~switch:queue_name () >>|= fun t -> Client.rpc ~t ~queue:queue_name ~body:(string_of_call call) () >>|= fun s -> - return (response_of_string s) + Lwt.return (response_of_string s) let rpc = switch_rpc !Rrd_interface.queue_name Jsonrpc.string_of_call Jsonrpc.response_of_string - module Client = Rrd_interface.RPC_API (Rpc_async.GenClient ()) + module Client = Rrd_interface.RPC_API (Rpc_lwt.GenClient ()) end let _nonpersistent = "NONPERSISTENT" @@ -308,49 +345,39 @@ let _is_a_snapshot_key = "is_a_snapshot" let _snapshot_of_key = "snapshot_of" -let is_executable path = - Sys.is_file ~follow_symlinks:true path >>= function - | `No | `Unknown -> - return (Error (`missing path)) - | `Yes -> ( - Unix.access path [`Exec] >>= function - | Error exn -> - return (Error (`not_executable (path, exn))) - | Ok () -> - return (Ok ()) - ) - module Script = struct (** We cache (lowercase script name -> original script name) mapping for the scripts in the root directory of every registered plugin. *) - let name_mapping = String.Table.create ~size:4 () + let name_mapping = Base.Hashtbl.create ~size:4 (module Base.String) let update_mapping ~script_dir = - Sys.readdir script_dir >>| Array.to_list >>| fun files -> + Sys.readdir script_dir >>= fun files -> (* If there are multiple files which map to the same lowercase string, we just take the first one, instead of failing *) let mapping = - List.zip_exn files files - |> String.Caseless.Map.of_alist_reduce ~f:String.min + List.combine files files + |> Base.Map.of_alist_reduce + (module Base.String.Caseless) + ~f:Base.String.min in - Hashtbl.set name_mapping ~key:script_dir ~data:mapping + return @@ Base.Hashtbl.set name_mapping ~key:script_dir ~data:mapping let path ~script_dir ~script_name = let find () = let cached_script_name = - let ( >>?= ) = Option.( >>= ) in - Hashtbl.find name_mapping script_dir >>?= fun mapping -> - Core.String.Caseless.Map.find mapping script_name + let ( let* ) = Option.bind in + let* mapping = Base.Hashtbl.find name_mapping script_dir in + Base.Map.find mapping script_name in let script_name = Option.value cached_script_name ~default:script_name in - let path = Filename.concat script_dir script_name in - is_executable path >>| function Ok () -> Ok path | Error _ as e -> e + let path = script_dir // script_name in + Sys.assert_is_executable path >>>= fun () -> return path in find () >>= function | Ok path -> - return (Ok path) + return path | Error _ -> - update_mapping ~script_dir >>= fun () -> find () + update_mapping ~script_dir >>>= fun () -> find () end let observer_config_dir = @@ -365,14 +392,13 @@ let observer_config_dir = would consist of querying the 'components' field of an observer from the xapi database. *) let observer_is_component_enabled () = - let ( let* ) = ( >>= ) in let is_enabled () = let is_config_file path = Filename.check_suffix path ".observer.conf" in let* files = Sys.readdir observer_config_dir in - return (Array.exists files ~f:is_config_file) + Lwt.return (List.exists is_config_file files) in - let* result = Monitor.try_with ~extract_exn:true is_enabled in - return (Option.value (Result.ok result) ~default:false) + let* result = Deferred.try_with is_enabled in + Lwt.return (Option.value (Result.to_option result) ~default:false) (** Call the script named after the RPC method in the [script_dir] directory. The arguments (not the whole JSON-RPC call) are passed as JSON @@ -394,11 +420,11 @@ let fork_exec_rpc : -> ?compat_out:compat_out -> ?dbg:string -> R.call - -> R.response Deferred.t = - fun ~script_dir ?missing ?(compat_in = id) ?(compat_out = id) ?dbg -> + -> R.response Lwt.t = + fun ~script_dir ?missing ?(compat_in = Fun.id) ?(compat_out = Fun.id) ?dbg -> let invoke_script call script_name : - (R.response, Storage_interface.Errors.error) Deferred.Result.t = - let traceparent = Option.bind dbg ~f:Debug_info.traceparent_of_dbg in + (R.response, Storage_interface.Errors.error) Lwt_result.t = + let traceparent = Option.bind dbg Debug_info.traceparent_of_dbg in let args = ["--json"] in let script_name, args, env = match (traceparent, config.use_observer) with @@ -414,164 +440,139 @@ let fork_exec_rpc : | _ -> (script_name, args, []) in - Process.create ~env:(`Extend env) ~prog:script_name ~args () >>= function - | Error e -> - error "%s failed: %s" script_name (Error.to_string_hum e) ; - return - (Error - (backend_error "SCRIPT_FAILED" [script_name; Error.to_string_hum e]) + (* We pass just the args, not the complete JSON-RPC call. + Currently the Python code generated by rpclib requires all params to + be named - they will be converted into a name->value Python dict. + Rpclib currently puts all named params into a dict, so we expect + params to be a single Dict, if all the params are named. *) + ( match call.R.params with + | [(R.Dict _ as d)] -> + return d + | _ -> + fail + (backend_error "INCORRECT_PARAMETERS" + [ + script_name + ; "All the call parameters should be named and should be in a RPC \ + Dict" + ] ) - | Ok p -> ( - (* Send the request as json on stdin *) - let w = Process.stdin p in - (* We pass just the args, not the complete JSON-RPC call. - Currently the Python code generated by rpclib requires all params to - be named - they will be converted into a name->value Python dict. - Rpclib currently puts all named params into a dict, so we expect - params to be a single Dict, if all the params are named. *) - ( match call.R.params with - | [(R.Dict _ as d)] -> - return (Ok d) - | _ -> - return - (Error - (backend_error "INCORRECT_PARAMETERS" - [ - script_name - ; "All the call parameters should be named and should be \ - in a RPC Dict" - ] - ) - ) + ) + >>>= fun input -> + let input = compat_in input |> Jsonrpc.to_string in + Process.run ~env ~prog:script_name ~args ~input >>= fun output -> + let fail_because ~cause description = + fail + (backend_error "SCRIPT_FAILED" + [ + script_name + ; description + ; cause + ; output.Process.Output.stdout + ; output.Process.Output.stdout + ] ) - >>>= fun args -> - let args = compat_in args in - Writer.write w (Jsonrpc.to_string args) ; - Writer.close w >>= fun () -> - Process.collect_output_and_wait p >>= fun output -> - match output.Process.Output.exit_status with - | Error (`Exit_non_zero code) -> ( - (* Expect an exception and backtrace on stdout *) - match - Or_error.try_with (fun () -> - Jsonrpc.of_string output.Process.Output.stdout - ) - with - | Error _ -> - error "%s failed and printed bad error json: %s" script_name - output.Process.Output.stdout ; - error "%s failed, stderr: %s" script_name - output.Process.Output.stderr ; - return - (Error - (backend_error "SCRIPT_FAILED" - [ - script_name - ; "non-zero exit and bad json on stdout" - ; string_of_int code - ; output.Process.Output.stdout - ; output.Process.Output.stdout - ] - ) - ) - | Ok response -> ( - match Or_error.try_with (fun () -> error_of_rpc response) with - | Error _ -> - error "%s failed and printed bad error json: %s" script_name - output.Process.Output.stdout ; - error "%s failed, stderr: %s" script_name - output.Process.Output.stderr ; - return - (Error - (backend_error "SCRIPT_FAILED" - [ - script_name - ; "non-zero exit and bad json on stdout" - ; string_of_int code - ; output.Process.Output.stdout - ; output.Process.Output.stdout - ] - ) - ) - | Ok x -> - return - (Error (backend_backtrace_error x.code x.params x.backtrace)) - ) + in + match output.Process.Output.exit_status with + | Error (Exit_non_zero code) -> ( + (* Expect an exception and backtrace on stdout *) + match + Base.Or_error.try_with (fun () -> + Jsonrpc.of_string output.Process.Output.stdout ) - | Error (`Signal signal) -> - error "%s caught a signal and failed" script_name ; - return - (Error - (backend_error "SCRIPT_FAILED" - [ - script_name - ; "signalled" - ; Signal.to_string signal - ; output.Process.Output.stdout - ; output.Process.Output.stdout - ] - ) - ) - | Ok () -> ( - (* Parse the json on stdout. We get back a JSON-RPC - value from the scripts, not a complete JSON-RPC response *) - match - Or_error.try_with (fun () -> - Jsonrpc.of_string output.Process.Output.stdout + with + | Error _ -> + error (fun m -> + m "%s failed and printed bad error json: %s" script_name + output.Process.Output.stdout + ) + >>= fun () -> + error (fun m -> + m "%s failed, stderr: %s" script_name output.Process.Output.stderr + ) + >>= fun () -> + fail_because "non-zero exit and bad json on stdout" + ~cause:(string_of_int code) + | Ok response -> ( + match + Base.Or_error.try_with (fun () -> Types.error_of_rpc response) + with + | Error _ -> + error (fun m -> + m "%s failed and printed bad error json: %s" script_name + output.Process.Output.stdout ) - with - | Error _ -> - error "%s succeeded but printed bad json: %s" script_name - output.Process.Output.stdout ; - return - (Error - (backend_error "SCRIPT_FAILED" - [ - script_name - ; "bad json on stdout" - ; output.Process.Output.stdout - ] - ) - ) - | Ok response -> - info "%s succeeded: %s" script_name output.Process.Output.stdout ; - let response = compat_out response in - let response = R.success response in - return (Ok response) - ) + >>= fun () -> + error (fun m -> + m "%s failed, stderr: %s" script_name + output.Process.Output.stderr + ) + >>= fun () -> + fail_because "non-zero exit and bad json on stdout" + ~cause:(string_of_int code) + | Ok x -> + fail (backend_backtrace_error x.code x.params x.backtrace) ) + ) + | Error (Signal signal) -> + error (fun m -> m "%s caught a signal and failed" script_name) + >>= fun () -> fail_because "signalled" ~cause:(Signal.to_string signal) + | Ok () -> ( + (* Parse the json on stdout. We get back a JSON-RPC + value from the scripts, not a complete JSON-RPC response *) + match + Base.Or_error.try_with (fun () -> + Jsonrpc.of_string output.Process.Output.stdout + ) + with + | Error _ -> + error (fun m -> + m "%s succeeded but printed bad json: %s" script_name + output.Process.Output.stdout + ) + >>= fun () -> + fail + (backend_error "SCRIPT_FAILED" + [script_name; "bad json on stdout"; output.Process.Output.stdout] + ) + | Ok response -> + info (fun m -> + m "%s succeeded: %s" script_name output.Process.Output.stdout + ) + >>= fun () -> + let response = compat_out response in + let response = R.success response in + return response + ) in let script_rpc call : - (R.response, Storage_interface.Errors.error) Deferred.Result.t = - info "%s" (Jsonrpc.string_of_call call) ; + (R.response, Storage_interface.Errors.error) Lwt_result.t = + info (fun m -> m "%s" (Jsonrpc.string_of_call call)) >>= fun () -> Script.path ~script_dir ~script_name:call.R.name >>= function | Error (`missing path) -> ( - error "%s is not a file" path ; + error (fun m -> m "%s is not a file" path) >>= fun () -> match missing with | None -> - return - (Error - (backend_error "SCRIPT_MISSING" - [ - path - ; "Check whether the file exists and has correct \ - permissions" - ] - ) + fail + (backend_error "SCRIPT_MISSING" + [ + path + ; "Check whether the file exists and has correct permissions" + ] ) | Some m -> - warn - "Deprecated: script '%s' is missing, treating as no-op. Update \ - your plugin!" - path ; - return (Ok (R.success m)) + warn (fun m -> + m + "Deprecated: script '%s' is missing, treating as no-op. \ + Update your plugin!" + path + ) + >>= fun () -> return (R.success m) ) | Error (`not_executable (path, exn)) -> - error "%s is not executable" path ; - return - (Error - (backend_error "SCRIPT_NOT_EXECUTABLE" [path; Exn.to_string exn]) - ) + error (fun m -> m "%s is not executable" path) >>= fun () -> + fail + (backend_error "SCRIPT_NOT_EXECUTABLE" [path; Base.Exn.to_string exn]) | Ok path -> invoke_script call path in @@ -582,136 +583,145 @@ let fork_exec_rpc : to unmarshal that error. Therefore we either return a successful RPC response, or raise Fork_exec_error with a suitable SMAPIv2 error if the call failed. *) - let rpc : R.call -> R.response Deferred.t = + let rpc : R.call -> R.response Lwt.t = fun call -> script_rpc call >>= fun result -> - Result.map_error ~f:(fun e -> Fork_exec_error e) result - |> Result.ok_exn - |> return + Base.Result.map_error ~f:(fun e -> Fork_exec_error e) result + |> Base.Result.ok_exn + |> Lwt.return in rpc +let string_of_sexp = Sexplib0.Sexp_conv.string_of_sexp + +let sexp_of_string = Sexplib0.Sexp_conv.sexp_of_string + +let list_of_sexp = Sexplib0.Sexp_conv.list_of_sexp + +let sexp_of_list = Sexplib0.Sexp_conv.sexp_of_list + module Attached_SRs = struct type state = {sr: string; uids: string list} [@@deriving sexp] - let sr_table : state String.Table.t ref = ref (String.Table.create ()) + let sr_table : (string, state) Base.Hashtbl.t ref = + ref (Base.Hashtbl.create (module Base.String)) let state_path = ref None let add smapiv2 plugin uids = let key = Storage_interface.Sr.string_of smapiv2 in - Hashtbl.set !sr_table ~key ~data:{sr= plugin; uids} ; + Base.Hashtbl.set !sr_table ~key ~data:{sr= plugin; uids} ; ( match !state_path with | None -> - return () + Lwt.return_unit | Some path -> let contents = - String.Table.sexp_of_t sexp_of_state !sr_table + Base.Hashtbl.sexp_of_t sexp_of_string sexp_of_state !sr_table |> Sexplib.Sexp.to_string in let dir = Filename.dirname path in - Unix.mkdir ~p:() dir >>= fun () -> Writer.save path ~contents + Sys.mkdir_p dir >>= fun () -> Sys.save path ~contents ) - >>= fun () -> return (Ok ()) + >>= fun () -> return () let find smapiv2 = let key = Storage_interface.Sr.string_of smapiv2 in - match Hashtbl.find !sr_table key with + match Base.Hashtbl.find !sr_table key with | None -> let open Storage_interface in - return (Error (Errors.Sr_not_attached key)) + fail (Errors.Sr_not_attached key) | Some {sr; _} -> - return (Ok sr) + return sr let get_uids smapiv2 = let key = Storage_interface.Sr.string_of smapiv2 in - match Hashtbl.find !sr_table key with + match Base.Hashtbl.find !sr_table key with | None -> let open Storage_interface in - return (Error (Errors.Sr_not_attached key)) + fail (Errors.Sr_not_attached key) | Some {uids; _} -> - return (Ok uids) + return uids let remove smapiv2 = let key = Storage_interface.Sr.string_of smapiv2 in - Hashtbl.remove !sr_table key ; - return (Ok ()) + Base.Hashtbl.remove !sr_table key ; + return () let list () = let srs = - Hashtbl.fold !sr_table + Base.Hashtbl.fold !sr_table ~f:(fun ~key ~data:_ ac -> Storage_interface.Sr.of_string key :: ac) ~init:[] in - return (Ok srs) + return srs let reload path = state_path := Some path ; - Sys.is_file ~follow_symlinks:true path >>= function - | `No | `Unknown -> - return () - | `Yes -> - Reader.file_contents path >>= fun contents -> + Sys.file_kind ~follow_symlinks:true path >>= function + | Regular -> + Sys.read_file_contents path >>= fun contents -> sr_table := contents |> Sexplib.Sexp.of_string - |> String.Table.t_of_sexp state_of_sexp ; - return () + |> Base.Hashtbl.Poly.t_of_sexp string_of_sexp state_of_sexp ; + Lwt.return_unit + | _ -> + Lwt.return_unit end module Datapath_plugins = struct - let table = String.Table.create () + let table = Base.Hashtbl.create (module Base.String) let register ~datapath_root datapath_plugin_name = let result = - let script_dir = Filename.concat datapath_root datapath_plugin_name in + let script_dir = datapath_root // datapath_plugin_name in return_plugin_rpc (fun () -> Plugin_client.query (fork_exec_rpc ~script_dir) "register" ) >>>= fun response -> check_plugin_version_compatible response >>= function | Ok () -> - info "Registered datapath plugin %s" datapath_plugin_name ; - Hashtbl.set table ~key:datapath_plugin_name + info (fun m -> m "Registered datapath plugin %s" datapath_plugin_name) + >>= fun () -> + Base.Hashtbl.set table ~key:datapath_plugin_name ~data:(script_dir, response) ; - return (Ok ()) + return () | Error e -> let err_msg = Storage_interface.(rpc_of Errors.error) e |> Jsonrpc.to_string in - info "Failed to register datapath plugin %s: %s" datapath_plugin_name - err_msg ; - return (Error e) + info (fun m -> + m "Failed to register datapath plugin %s: %s" datapath_plugin_name + err_msg + ) + >>= fun () -> fail e in (* We just do not register the plugin if we've encountered any error. In the future we might want to change that, so we keep the error result above. *) - result >>= fun _ -> return () + result >>= fun _ -> Lwt.return_unit let unregister datapath_plugin_name = - Hashtbl.remove table datapath_plugin_name ; - return () + Base.Hashtbl.remove table datapath_plugin_name ; + Lwt.return_unit let supports_feature scheme feature = - match Hashtbl.find table scheme with + match Base.Hashtbl.find table scheme with | None -> false | Some (_script_dir, query_result) -> - List.mem query_result.Xapi_storage.Plugin.features feature - ~equal:String.equal + List.mem feature query_result.Xapi_storage.Plugin.features end let vdi_of_volume x = let find key ~default ~of_string = - match - List.Assoc.find x.Xapi_storage.Control.keys key ~equal:String.equal - with + match List.assoc_opt key x.Xapi_storage.Control.keys with | None -> default | Some v -> v |> of_string in - let find_string = find ~of_string:id in + let find_string = find ~of_string:Fun.id in let open Storage_interface in { vdi= Vdi.of_string x.Xapi_storage.Control.key @@ -739,7 +749,7 @@ let choose_datapath ?(persistent = true) domain response = to name the datapath plugin. *) let possible = List.filter_map - ~f:(fun x -> + (fun x -> let uri = Uri.of_string x in match Uri.scheme uri with | None -> @@ -752,8 +762,8 @@ let choose_datapath ?(persistent = true) domain response = (* We can only use URIs whose schemes correspond to registered plugins *) let possible = List.filter_map - ~f:(fun (scheme, uri) -> - match Hashtbl.find Datapath_plugins.table scheme with + (fun (scheme, uri) -> + match Base.Hashtbl.find Datapath_plugins.table scheme with | Some (script_dir, _query_result) -> Some (script_dir, scheme, uri) | None -> @@ -767,8 +777,8 @@ let choose_datapath ?(persistent = true) domain response = possible else let supports_nonpersistent, others = - List.partition_tf - ~f:(fun (_script_dir, scheme, _uri) -> + List.partition + (fun (_script_dir, scheme, _uri) -> Datapath_plugins.supports_feature scheme _nonpersistent ) possible @@ -777,15 +787,15 @@ let choose_datapath ?(persistent = true) domain response = in match preference_order with | [] -> - return (Error (missing_uri ())) + fail (missing_uri ()) | (script_dir, scheme, u) :: _us -> - return (Ok (fork_exec_rpc ~script_dir, scheme, u, domain)) + return (fork_exec_rpc ~script_dir, scheme, u, domain) (* Bind the implementations *) let bind ~volume_script_dir = (* Each plugin has its own version, see the call to listen where `process` is partially applied. *) - let module S = Storage_interface.StorageAPI (Rpc_async.GenServer ()) in + let module S = Storage_interface.StorageAPI (Rpc_lwt.GenServer ()) in let version = ref None in let volume_rpc = fork_exec_rpc ~script_dir:volume_script_dir in let module Compat = Compat (struct let version = version end) in @@ -812,8 +822,8 @@ let bind ~volume_script_dir = * Volume.set and Volume.unset *) (* TODO handle this properly? *) let missing = - Option.bind !version ~f:(fun v -> - if String.(v = pvs_version) then Some (R.rpc_of_unit ()) else None + Option.bind !version (fun v -> + if String.equal v pvs_version then Some (R.rpc_of_unit ()) else None ) in return_volume_rpc (fun () -> @@ -822,8 +832,8 @@ let bind ~volume_script_dir = in let unset ~dbg ~sr ~vdi ~key = let missing = - Option.bind !version ~f:(fun v -> - if String.(v = pvs_version) then Some (R.rpc_of_unit ()) else None + Option.bind !version (fun v -> + if String.equal v pvs_version then Some (R.rpc_of_unit ()) else None ) in return_volume_rpc (fun () -> @@ -831,36 +841,32 @@ let bind ~volume_script_dir = ) in let update_keys ~dbg ~sr ~key ~value response = - let open Deferred.Result.Monad_infix in match value with | None -> - Deferred.Result.return response + return response | Some value -> set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key ~key ~value - >>= fun () -> - Deferred.Result.return - {response with keys= (key, value) :: response.keys} + >>>= fun () -> + return {response with keys= (key, value) :: response.keys} in let vdi_attach_common dbg sr vdi domain = - let open Deferred.Result.Monad_infix in - Attached_SRs.find sr >>= fun sr -> + Attached_SRs.find sr >>>= fun sr -> (* Discover the URIs using Volume.stat *) - stat ~dbg ~sr ~vdi >>= fun response -> + stat ~dbg ~sr ~vdi >>>= fun response -> (* If we have a clone-on-boot volume then use that instead *) ( match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - return (Ok response) + return response | Some temporary -> stat ~dbg ~sr ~vdi:temporary ) - >>= fun response -> - choose_datapath domain response >>= fun (rpc, _datapath, uri, domain) -> + >>>= fun response -> + choose_datapath domain response >>>= fun (rpc, _datapath, uri, domain) -> return_data_rpc (fun () -> Datapath_client.attach (rpc ~dbg) dbg uri domain) in - let wrap th = Rpc_async.T.put th in + let wrap th = Rpc_lwt.T.put th in (* the actual API call for this plugin, sharing same version ref across all calls *) let query_impl dbg = let th = @@ -875,13 +881,13 @@ let bind ~volume_script_dir = (* Convert between the xapi-storage interface and the SMAPI *) let features = List.map - ~f:(function "VDI_DESTROY" -> "VDI_DELETE" | x -> x) + (function "VDI_DESTROY" -> "VDI_DELETE" | x -> x) response.Xapi_storage.Plugin.features in (* Look for executable scripts and automatically add capabilities *) let rec loop acc = function | [] -> - return (Ok acc) + return acc | (script_name, capability) :: rest -> ( Script.path ~script_dir:volume_script_dir ~script_name >>= function | Error _ -> @@ -922,13 +928,13 @@ let bind ~volume_script_dir = (* If we have the ability to clone a disk then we can provide clone on boot. *) let features = - if List.mem features "VDI_CLONE" ~equal:String.equal then + if List.mem "VDI_CLONE" features then "VDI_RESET_ON_BOOT/2" :: features else features in let name = response.Xapi_storage.Plugin.name in - Deferred.Result.return + return { Storage_interface.driver= response.Xapi_storage.Plugin.plugin ; name @@ -948,11 +954,10 @@ let bind ~volume_script_dir = S.Query.query query_impl ; let query_diagnostics_impl dbg = let th = - let open Deferred.Result.Monad_infix in return_plugin_rpc (fun () -> Plugin_client.diagnostics (volume_rpc ~dbg) dbg ) - >>= fun response -> Deferred.Result.return response + >>>= fun response -> return response in wrap th in @@ -972,7 +977,7 @@ let bind ~volume_script_dir = >>>= fun stat -> let rec loop acc = function | [] -> - return acc + Lwt.return acc | datasource :: datasources -> ( let uri = Uri.of_string datasource in match Uri.scheme uri with @@ -980,13 +985,13 @@ let bind ~volume_script_dir = let uid = Uri.path_unencoded uri in let uid = if String.length uid > 1 then - String.sub uid ~pos:1 ~len:(String.length uid - 1) + String.sub uid 1 (String.length uid - 1) else uid in RRD.Client.Plugin.Local.register RRD.rpc uid Rrd.Five_Seconds Rrd_interface.V2 - |> Rpc_async.T.get + |> Rpc_lwt.T.get >>= function | Ok _ -> loop (uid :: acc) datasources @@ -999,8 +1004,7 @@ let bind ~volume_script_dir = in loop [] stat.Xapi_storage.Control.datasources >>= fun uids -> (* associate the 'sr' from the plugin with the SR reference passed in *) - Attached_SRs.add sr attach_response uids >>>= fun () -> - Deferred.Result.return () + Attached_SRs.add sr attach_response uids >>>= fun () -> return () in wrap th in @@ -1010,7 +1014,7 @@ let bind ~volume_script_dir = Attached_SRs.find sr >>= function | Error _ -> (* ensure SR.detach is idempotent *) - Deferred.Result.return () + return () | Ok sr' -> return_volume_rpc (fun () -> Sr_client.detach (volume_rpc ~dbg) dbg sr' @@ -1019,7 +1023,7 @@ let bind ~volume_script_dir = Attached_SRs.get_uids sr >>>= fun uids -> let rec loop = function | [] -> - return () + Lwt.return_unit | datasource :: datasources -> ( let uri = Uri.of_string datasource in match Uri.scheme uri with @@ -1027,12 +1031,12 @@ let bind ~volume_script_dir = let uid = Uri.path_unencoded uri in let uid = if String.length uid > 1 then - String.sub uid ~pos:1 ~len:(String.length uid - 1) + String.sub uid 1 (String.length uid - 1) else uid in RRD.Client.Plugin.Local.deregister RRD.rpc uid - |> Rpc_async.T.get + |> Rpc_lwt.T.get >>= function | Ok _ -> loop datasources @@ -1044,8 +1048,7 @@ let bind ~volume_script_dir = ) in loop uids >>= fun () -> - let open Deferred.Result.Monad_infix in - Attached_SRs.remove sr >>= fun () -> Deferred.Result.return response + Attached_SRs.remove sr >>>= fun () -> return response in wrap th in @@ -1061,12 +1064,11 @@ let bind ~volume_script_dir = |> Jsonrpc.to_string in response - |> List.map ~f:(fun probe_result -> + |> List.map (fun probe_result -> let uuid = - List.Assoc.find probe_result.Xapi_storage.Control.configuration - ~equal:String.equal "sr_uuid" + List.assoc_opt "sr_uuid" + probe_result.Xapi_storage.Control.configuration in - let open Deferred.Or_error in let smapiv2_probe ?sr_info () = { Storage_interface.configuration= probe_result.configuration @@ -1082,7 +1084,8 @@ let bind ~volume_script_dir = ) with | _, false, Some _uuid -> - errorf "A configuration with a uuid cannot be incomplete: %a" + Deferred.errorf + "A configuration with a uuid cannot be incomplete: %a" pp_probe_result probe_result | Some sr_stat, true, Some _uuid -> let sr_info = @@ -1109,19 +1112,20 @@ let bind ~volume_script_dir = in return (smapiv2_probe ~sr_info ()) | Some _sr, _, None -> - errorf "A configuration is not attachable without a uuid: %a" + Deferred.errorf + "A configuration is not attachable without a uuid: %a" pp_probe_result probe_result | None, false, None -> return (smapiv2_probe ()) | None, true, _ -> return (smapiv2_probe ()) ) - |> Deferred.Or_error.combine_errors - |> Deferred.Result.map_error ~f:(fun err -> - backend_error "SCRIPT_FAILED" ["SR.probe"; Error.to_string_hum err] + |> Deferred.combine_errors + |> Lwt_result.map_error (fun err -> + backend_error "SCRIPT_FAILED" + ["SR.probe"; Base.Error.to_string_hum err] ) - >>>= fun results -> - Deferred.Result.return (Storage_interface.Probe results) + >>>= fun results -> return (Storage_interface.Probe results) in wrap th in @@ -1136,7 +1140,7 @@ let bind ~volume_script_dir = (volume_rpc ~dbg ~compat_in ~compat_out) dbg uuid device_config name_label description ) - >>>= fun new_device_config -> Deferred.Result.return new_device_config + >>>= fun new_device_config -> return new_device_config in wrap th in @@ -1184,25 +1188,27 @@ let bind ~volume_script_dir = let response = Array.to_list response in (* Filter out volumes which are clone-on-boot transients *) let transients = - List.fold - ~f:(fun set x -> + List.fold_left + (fun set x -> match - List.Assoc.find x.Xapi_storage.Control.keys - _clone_on_boot_key ~equal:String.equal + List.assoc_opt _clone_on_boot_key x.Xapi_storage.Control.keys with | None -> set | Some transient -> - Set.add set transient + Base.Set.add set transient ) - ~init:Core.String.Set.empty response + (Base.Set.empty (module Base.String)) + response in let response = List.filter - ~f:(fun x -> not (Set.mem transients x.Xapi_storage.Control.key)) + (fun x -> + not (Base.Set.mem transients x.Xapi_storage.Control.key) + ) response in - Deferred.Result.return (List.map ~f:vdi_of_volume response) + return (List.map vdi_of_volume response) ) |> wrap in @@ -1212,7 +1218,7 @@ let bind ~volume_script_dir = let get_sr_info sr = return_volume_rpc (fun () -> Sr_client.stat (volume_rpc ~dbg) dbg sr) >>>= fun response -> - Deferred.Result.return + return { Storage_interface.sr_uuid= response.Xapi_storage.Control.uuid ; name_label= response.Xapi_storage.Control.name @@ -1243,42 +1249,52 @@ let bind ~volume_script_dir = let response = Array.to_list response in (* Filter out volumes which are clone-on-boot transients *) let transients = - List.fold - ~f:(fun set x -> + List.fold_left + (fun set x -> match - List.Assoc.find x.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + Base.List.Assoc.find x.Xapi_storage.Control.keys + _clone_on_boot_key ~equal:String.equal with | None -> set | Some transient -> - Set.add set transient + Base.Set.add set transient ) - ~init:Core.String.Set.empty response + (Base.Set.empty (module Base.String)) + response in let response = List.filter - ~f:(fun x -> not (Set.mem transients x.Xapi_storage.Control.key)) + (fun x -> not (Base.Set.mem transients x.Xapi_storage.Control.key)) response in - Deferred.Result.return (List.map ~f:vdi_of_volume response, sr_info) + return (List.map vdi_of_volume response, sr_info) in let rec stat_with_retry ?(times = 3) sr = get_sr_info sr >>>= fun sr_info -> match sr_info.health with | Healthy -> - debug "%s sr %s is healthy" __FUNCTION__ sr_uuid ; + let* () = + debug (fun m -> m "%s sr %s is healthy" __FUNCTION__ sr_uuid) + in get_volume_info sr sr_info | Unreachable when times > 0 -> - debug "%s: sr %s is unreachable, remaining %d retries" __FUNCTION__ - sr_uuid times ; - Clock.after Time.Span.second >>= fun () -> + let* () = + debug (fun m -> + m "%s: sr %s is unreachable, remaining %d retries" __FUNCTION__ + sr_uuid times + ) + in + Clock.after ~seconds:1. >>= fun () -> stat_with_retry ~times:(times - 1) sr | health -> - debug "%s: sr unhealthy because it is %s" __FUNCTION__ - (Storage_interface.show_sr_health health) ; - Deferred.Result.fail - Storage_interface.(Errors.Sr_unhealthy (sr_uuid, health)) + let* () = + debug (fun m -> + m "%s: sr unhealthy because it is %s" __FUNCTION__ + (Storage_interface.show_sr_health health) + ) + in + fail Storage_interface.(Errors.Sr_unhealthy (sr_uuid, health)) in Attached_SRs.find sr >>>= stat_with_retry |> wrap in @@ -1295,7 +1311,7 @@ let bind ~volume_script_dir = ) >>>= update_keys ~dbg ~sr ~key:_vdi_type_key ~value:(match vdi_info.ty with "" -> None | s -> Some s) - >>>= fun response -> Deferred.Result.return (vdi_of_volume response) + >>>= fun response -> return (vdi_of_volume response) ) |> wrap in @@ -1306,11 +1322,10 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi >>>= fun response -> (* Destroy any clone-on-boot volume that might exist *) ( match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - return (Ok ()) + return () | Some _temporary -> (* Destroy the temporary disk we made earlier *) destroy ~dbg ~sr ~vdi @@ -1348,7 +1363,7 @@ let bind ~volume_script_dir = ; snapshot_of= Storage_interface.Vdi.of_string vdi } in - Deferred.Result.return response + return response ) |> wrap in @@ -1359,7 +1374,7 @@ let bind ~volume_script_dir = clone ~dbg ~sr ~vdi: (Storage_interface.Vdi.string_of vdi_info.Storage_interface.vdi) - >>>= fun response -> Deferred.Result.return (vdi_of_volume response) + >>>= fun response -> return (vdi_of_volume response) ) |> wrap in @@ -1394,7 +1409,7 @@ let bind ~volume_script_dir = >>>= fun () -> (* Now call Volume.stat to discover the size *) stat ~dbg ~sr ~vdi >>>= fun response -> - Deferred.Result.return response.Xapi_storage.Control.virtual_size + return response.Xapi_storage.Control.virtual_size ) |> wrap in @@ -1402,8 +1417,7 @@ let bind ~volume_script_dir = let vdi_stat_impl dbg sr vdi' = (let vdi = Storage_interface.Vdi.string_of vdi' in Attached_SRs.find sr >>>= fun sr -> - stat ~dbg ~sr ~vdi >>>= fun response -> - Deferred.Result.return (vdi_of_volume response) + stat ~dbg ~sr ~vdi >>>= fun response -> return (vdi_of_volume response) ) |> wrap in @@ -1413,7 +1427,7 @@ let bind ~volume_script_dir = >>>= (fun sr -> let vdi = location in stat ~dbg ~sr ~vdi >>>= fun response -> - Deferred.Result.return (vdi_of_volume response) + return (vdi_of_volume response) ) |> wrap in @@ -1432,10 +1446,10 @@ let bind ~volume_script_dir = | Nbd {uri} -> Nbd {uri} in - Deferred.Result.return + return { Storage_interface.implementations= - List.map ~f:convert_implementation + List.map convert_implementation response.Xapi_storage.Data.implementations } ) @@ -1450,11 +1464,10 @@ let bind ~volume_script_dir = stat ~dbg ~sr ~vdi >>>= fun response -> (* If we have a clone-on-boot volume then use that instead *) ( match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - return (Ok response) + return response | Some temporary -> stat ~dbg ~sr ~vdi:temporary ) @@ -1485,11 +1498,10 @@ let bind ~volume_script_dir = (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> ( match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - return (Ok response) + return response | Some temporary -> stat ~dbg ~sr ~vdi:temporary ) @@ -1509,11 +1521,10 @@ let bind ~volume_script_dir = (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> ( match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - return (Ok response) + return response | Some temporary -> stat ~dbg ~sr ~vdi:temporary ) @@ -1529,7 +1540,7 @@ let bind ~volume_script_dir = >>>= (fun sr -> return_volume_rpc (fun () -> Sr_client.stat (volume_rpc ~dbg) dbg sr) >>>= fun response -> - Deferred.Result.return + return { Storage_interface.sr_uuid= response.Xapi_storage.Control.uuid ; name_label= response.Xapi_storage.Control.name @@ -1573,11 +1584,10 @@ let bind ~volume_script_dir = (* We create a non-persistent disk here with Volume.clone, and store the name of the cloned disk in the metadata of the original. *) ( match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - Deferred.Result.return () + return () | Some temporary -> (* Destroy the temporary disk we made earlier *) destroy ~dbg ~sr ~vdi:temporary @@ -1587,7 +1597,7 @@ let bind ~volume_script_dir = set ~dbg ~sr ~vdi ~key:_clone_on_boot_key ~value:vdi'.Xapi_storage.Control.key else - Deferred.Result.return () + return () ) |> wrap in @@ -1603,23 +1613,19 @@ let bind ~volume_script_dir = return_data_rpc (fun () -> Datapath_client.close (rpc ~dbg) dbg uri) else match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - Deferred.Result.return () + return () | Some temporary -> (* Destroy the temporary disk we made earlier *) destroy ~dbg ~sr ~vdi:temporary >>>= fun () -> - unset ~dbg ~sr ~vdi ~key:_clone_on_boot_key >>>= fun () -> - Deferred.Result.return () + unset ~dbg ~sr ~vdi ~key:_clone_on_boot_key >>>= fun () -> return () ) |> wrap in S.VDI.epoch_end vdi_epoch_end_impl ; - let vdi_set_persistent_impl _dbg _sr _vdi _persistent = - Deferred.Result.return () |> wrap - in + let vdi_set_persistent_impl _dbg _sr _vdi _persistent = return () |> wrap in S.VDI.set_persistent vdi_set_persistent_impl ; let dp_destroy2 dbg _dp sr vdi' vm' _allow_leak = (let vdi = Storage_interface.Vdi.string_of vdi' in @@ -1628,11 +1634,10 @@ let bind ~volume_script_dir = (* Discover the URIs using Volume.stat *) stat ~dbg ~sr ~vdi >>>= fun response -> ( match - List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key - ~equal:String.equal + List.assoc_opt _clone_on_boot_key response.Xapi_storage.Control.keys with | None -> - return (Ok response) + return response | Some temporary -> stat ~dbg ~sr ~vdi:temporary ) @@ -1648,12 +1653,12 @@ let bind ~volume_script_dir = in S.DP.destroy2 dp_destroy2 ; let sr_list _dbg = - Attached_SRs.list () >>>= (fun srs -> Deferred.Result.return srs) |> wrap + Attached_SRs.list () >>>= (fun srs -> return srs) |> wrap in S.SR.list sr_list ; (* SR.reset is a no op in SMAPIv3 *) - S.SR.reset (fun _ _ -> Deferred.Result.return () |> wrap) ; - let ( let* ) = ( >>>= ) in + S.SR.reset (fun _ _ -> return () |> wrap) ; + let ( let* ) = Lwt_result.bind in let vdi_enable_cbt_impl dbg sr vdi = wrap @@ @@ -1679,7 +1684,7 @@ let bind ~volume_script_dir = @@ let* sr = Attached_SRs.find sr in let vdi, vdi' = Storage_interface.Vdi.(string_of vdi, string_of vdi') in - let ( let* ) = ( >>= ) in + let ( let* ) = Lwt.bind in let* result = return_volume_rpc (fun () -> (* Negative lengths indicate that we want the full length. *) @@ -1688,7 +1693,7 @@ let bind ~volume_script_dir = ) in let proj_bitmap r = r.Xapi_storage.Control.bitmap in - return (Result.map ~f:proj_bitmap result) + Lwt.return (Result.map proj_bitmap result) in S.VDI.list_changed_blocks vdi_list_changed_blocks_impl ; let vdi_data_destroy_impl dbg sr vdi = @@ -1696,13 +1701,12 @@ let bind ~volume_script_dir = @@ let* sr = Attached_SRs.find sr in let vdi = Storage_interface.Vdi.string_of vdi in - let* response = + let* () = return_volume_rpc (fun () -> Volume_client.data_destroy (volume_rpc ~dbg) dbg sr vdi ) in - let* () = set ~dbg ~sr ~vdi ~key:_vdi_type_key ~value:"cbt_metadata" in - Deferred.Result.return response + set ~dbg ~sr ~vdi ~key:_vdi_type_key ~value:"cbt_metadata" in S.VDI.data_destroy vdi_data_destroy_impl ; let u name _ = failwith ("Unimplemented: " ^ name) in @@ -1739,46 +1743,93 @@ let bind ~volume_script_dir = S.DATA.MIRROR.receive_cancel (u "DATA.MIRROR.receive_cancel") ; S.SR.update_snapshot_info_src (u "SR.update_snapshot_info_src") ; S.DATA.MIRROR.stop (u "DATA.MIRROR.stop") ; - Rpc_async.server S.implementation + Rpc_lwt.server S.implementation let process_smapiv2_requests server txt = let request = Jsonrpc.call_of_string txt in - server request >>= fun response -> - Deferred.return (Jsonrpc.string_of_response response) + let to_err e = + Storage_interface.(rpc_of Errors.error Errors.(Internal_error e)) + in + Lwt.try_bind + (fun () -> server request) + (fun response -> Lwt.return (Jsonrpc.string_of_response response)) + (fun exn -> + Printexc.to_string exn |> to_err |> Jsonrpc.to_string |> Lwt.return + ) (** Active servers, one per sub-directory of the volume_root_dir *) -let servers = String.Table.create () ~size:4 +let servers = Base.Hashtbl.create ~size:4 (module Base.String) (* XXX: need a better error-handling strategy *) -let get_ok = function - | Ok x -> - x - | Error e -> - let b = Buffer.create 16 in - let fmt = Format.formatter_of_buffer b in - Message_switch_unix.Protocol_unix.Server.pp_error fmt e ; - Format.pp_print_flush fmt () ; - failwith (Buffer.contents b) +let get_ok x = + Message_switch_unix.Protocol_unix.Server.error_to_msg x + |> Result.fold ~ok:Fun.id ~error:(function `Msg err -> failwith err) let rec diff a b = match a with | [] -> [] | a :: aa -> - if List.mem b a ~equal:String.( = ) then diff aa b else a :: diff aa b + if List.mem a b then diff aa b else a :: diff aa b (* default false due to bugs in SMAPIv3 plugins, once they are fixed this should be set to true *) let concurrent = ref false -let watch_volume_plugins ~volume_root ~switch_path ~pipe = +type reload = All | Files of string list | Nothing + +let actions_from events = + List.fold_left + (fun acc event -> + match (event, acc) with + | DirWatcher.Modified path, Nothing -> + Files [path] + | Modified path, Files files -> + Files (path :: files) + | Changed, _ | _, All -> + All + ) + Nothing events + +let reload_all root ~create ~destroy = + let* needed = Sys.readdir root in + let got_already = Base.Hashtbl.keys servers in + let* () = Lwt.join (List.map create (diff needed got_already)) in + Lwt.join (List.map destroy (diff got_already needed)) + +let reload_file ~create ~destroy path = + let name = Filename.basename path in + let* () = destroy name in + create name + +let reload root ~create ~destroy = function + | All -> + reload_all root ~create ~destroy + | Files files -> + Lwt_list.iter_p (reload_file ~create ~destroy) files + | Nothing -> + Lwt.return_unit + +let rec watch_loop pipe root ~create ~destroy = + let* () = Lwt_unix.sleep 0.5 in + let* () = + let* events = DirWatcher.read pipe in + reload root ~create ~destroy (actions_from events) + in + watch_loop pipe root ~create ~destroy + +let watch_plugins ~pipe ~root ~create ~destroy = + reload_all root ~create ~destroy >>= fun () -> + watch_loop pipe root ~create ~destroy + +let watch_volume_plugins ~volume_root ~switch_path ~pipe () = let create volume_plugin_name = - if Hashtbl.mem servers volume_plugin_name then - return () - else ( - info "Adding %s" volume_plugin_name ; - let volume_script_dir = Filename.concat volume_root volume_plugin_name in - Message_switch_async.Protocol_async.Server.( + if Base.Hashtbl.mem servers volume_plugin_name then + Lwt.return_unit + else + info (fun m -> m "Adding %s" volume_plugin_name) >>= fun () -> + let volume_script_dir = volume_root // volume_plugin_name in + Message_switch_lwt.Protocol_lwt.Server.( if !concurrent then listen_p else listen ) ~process:(process_smapiv2_requests (bind ~volume_script_dir)) @@ -1787,100 +1838,38 @@ let watch_volume_plugins ~volume_root ~switch_path ~pipe = () >>= fun result -> let server = get_ok result in - Hashtbl.add_exn servers ~key:volume_plugin_name ~data:server ; - return () - ) + Base.Hashtbl.add_exn servers ~key:volume_plugin_name ~data:server ; + Lwt.return_unit in let destroy volume_plugin_name = - info "Removing %s" volume_plugin_name ; - match Hashtbl.find servers volume_plugin_name with + info (fun m -> m "Removing %s" volume_plugin_name) >>= fun () -> + match Base.Hashtbl.find servers volume_plugin_name with | Some t -> - Message_switch_async.Protocol_async.Server.shutdown ~t () >>= fun () -> - Hashtbl.remove servers volume_plugin_name ; - return () + Message_switch_lwt.Protocol_lwt.Server.shutdown ~t () >>= fun () -> + Base.Hashtbl.remove servers volume_plugin_name ; + Lwt.return_unit | None -> - return () - in - let sync () = - Sys.readdir volume_root >>= fun names -> - let needed : string list = Array.to_list names in - let got_already : string list = Hashtbl.keys servers in - Deferred.all_unit (List.map ~f:create (diff needed got_already)) - >>= fun () -> - Deferred.all_unit (List.map ~f:destroy (diff got_already needed)) - in - sync () >>= fun () -> - let open Async_inotify.Event in - let rec loop () = - (Pipe.read pipe >>= function - | `Eof -> - info "Received EOF from inotify event pipe" ; - Shutdown.exit 1 - | `Ok (Created path) | `Ok (Moved (Into path)) -> - create (Filename.basename path) - | `Ok (Unlinked path) | `Ok (Moved (Away path)) -> - destroy (Filename.basename path) - | `Ok (Modified _) -> - return () - | `Ok (Moved (Move (path_a, path_b))) -> - destroy (Filename.basename path_a) >>= fun () -> - create (Filename.basename path_b) - | `Ok Queue_overflow -> - sync () - ) - >>= fun () -> loop () - in - loop () - -let watch_datapath_plugins ~datapath_root ~pipe = - let sync () = - Sys.readdir datapath_root >>= fun names -> - let needed : string list = Array.to_list names in - let got_already : string list = Hashtbl.keys servers in - Deferred.all_unit - (List.map - ~f:(Datapath_plugins.register ~datapath_root) - (diff needed got_already) - ) - >>= fun () -> - Deferred.all_unit - (List.map ~f:Datapath_plugins.unregister (diff got_already needed)) + Lwt.return_unit in - sync () >>= fun () -> - let open Async_inotify.Event in - let rec loop () = - (Pipe.read pipe >>= function - | `Eof -> - info "Received EOF from inotify event pipe" ; - Shutdown.exit 1 - | `Ok (Created path) | `Ok (Moved (Into path)) -> - Datapath_plugins.register ~datapath_root (Filename.basename path) - | `Ok (Unlinked path) | `Ok (Moved (Away path)) -> - Datapath_plugins.unregister (Filename.basename path) - | `Ok (Modified _) -> - return () - | `Ok (Moved (Move (path_a, path_b))) -> - Datapath_plugins.unregister (Filename.basename path_a) >>= fun () -> - Datapath_plugins.register ~datapath_root (Filename.basename path_b) - | `Ok Queue_overflow -> - sync () - ) - >>= fun () -> loop () - in - loop () + watch_plugins ~pipe ~root:volume_root ~create ~destroy + +let watch_datapath_plugins ~datapath_root ~pipe () = + let create = Datapath_plugins.register ~datapath_root in + let destroy = Datapath_plugins.unregister in + watch_plugins ~pipe ~root:datapath_root ~create ~destroy let self_test_plugin ~root_dir plugin = let volume_script_dir = Filename.(concat (concat root_dir "volume") plugin) in let process = process_smapiv2_requests (bind ~volume_script_dir) in let rpc call = call |> Jsonrpc.string_of_call |> process >>= fun r -> - debug "RPC: %s" r ; - return (Jsonrpc.response_of_string r) + debug (fun m -> m "RPC: %s" r) >>= fun () -> + Lwt.return (Jsonrpc.response_of_string r) in - let module Test = Storage_interface.StorageAPI (Rpc_async.GenClient ()) in + let module Test = Storage_interface.StorageAPI (Rpc_lwt.GenClient ()) in let dbg = "debug" in - Monitor.try_with (fun () -> - let open Rpc_async.ErrM in + Deferred.try_with (fun () -> + let open Rpc_lwt.ErrM in Test.Query.query rpc dbg >>= (fun query_result -> Test.Query.diagnostics rpc dbg >>= fun _msg -> @@ -1921,61 +1910,58 @@ let self_test_plugin ~root_dir plugin = Test.VDI.destroy rpc dbg sr vdi_info.vdi >>= fun () -> Test.SR.stat rpc dbg sr >>= fun _sr_info -> Test.SR.scan rpc dbg sr >>= fun _sr_list -> - if List.mem query_result.features "SR_PROBE" ~equal:String.equal - then + if List.mem "SR_PROBE" query_result.features then Test.SR.probe rpc dbg plugin device_config [] >>= fun _result -> return () else return () ) - |> Rpc_async.T.get + |> Rpc_lwt.T.get ) >>= function | Ok x -> - Async.Deferred.return x - | Error _y -> - failwith "self test failed" + Lwt.return x + | Error e -> + failwith (Printf.sprintf "self test failed with %s" (Printexc.to_string e)) let self_test ~root_dir = self_test_plugin ~root_dir "org.xen.xapi.storage.dummyv5" >>= function | Ok () -> - info "test thread shutdown cleanly" ; - Async_unix.exit 0 + info (fun m -> m "test thread shutdown cleanly") >>= fun () -> exit 0 | Error x -> - error "test thread failed with %s" - (Storage_interface.(rpc_of Errors.error) x |> Jsonrpc.to_string) ; - Async_unix.exit 2 + error (fun m -> + m "test thread failed with %s" + (Storage_interface.(rpc_of Errors.error) x |> Jsonrpc.to_string) + ) + >>= fun () -> exit 2 let main ~root_dir ~state_path ~switch_path = Attached_SRs.reload state_path >>= fun () -> - let datapath_root = Filename.concat root_dir "datapath" in - Async_inotify.create ~recursive:false ~watch_new_dirs:false datapath_root - >>= fun (_, _, datapath) -> - let volume_root = Filename.concat root_dir "volume" in - Async_inotify.create ~recursive:false ~watch_new_dirs:false volume_root - >>= fun (_, _, volume) -> - let rec loop () = - Monitor.try_with (fun () -> - Deferred.all_unit - [ - watch_volume_plugins ~volume_root ~switch_path ~pipe:volume - ; watch_datapath_plugins ~datapath_root ~pipe:datapath - ] - ) - >>= function + let datapath_root = root_dir // "datapath" in + DirWatcher.create datapath_root >>= fun datapath -> + let volume_root = root_dir // "volume" in + DirWatcher.create volume_root >>= fun volume -> + let rec retry_loop ((name, promise) as thread) () = + Deferred.try_with promise >>= function | Ok () -> - info "main thread shutdown cleanly" ; - return () + Lwt.return_unit | Error x -> - error "main thread failed with %s" (Exn.to_string x) ; - Clock.after (Time.Span.of_sec 5.) >>= fun () -> loop () + error (fun m -> m "%s thread failed with %s" name (Base.Exn.to_string x)) + >>= fun () -> Clock.after ~seconds:5. >>= retry_loop thread in - loop () + [ + ( "volume plugins" + , watch_volume_plugins ~volume_root ~switch_path ~pipe:volume + ) + ; ("datapath plugins", watch_datapath_plugins ~datapath_root ~pipe:datapath) + ] + |> List.map (fun thread -> retry_loop thread ()) + |> Lwt.join open Xcp_service let description = - String.concat ~sep:" " + String.concat " " [ "Allow xapi storage adapters to be written as individual scripts." ; "To add a storage adapter, create a sub-directory in the --root directory" @@ -2000,7 +1986,7 @@ let register_exn_pretty_printers () = assert false ) -let _ = +let () = register_exn_pretty_printers () ; let root_dir = ref "/var/lib/xapi/storage-scripts" in let state_path = ref "/var/run/nonpersistent/xapi-storage-script/state.db" in @@ -2013,7 +1999,7 @@ let _ = scripts, one sub-directory per queue name" ; essential= true ; path= root_dir - ; perms= [U.X_OK] + ; perms= [Unix.X_OK] } ; { Xcp_service.name= "state" @@ -2043,27 +2029,16 @@ let _ = in configure2 ~name:"xapi-script-storage" ~version:Xapi_version.version ~doc:description ~resources ~options () ; - let run () = - let ( let* ) = ( >>= ) in + + Logs.set_reporter (lwt_reporter ()) ; + Logs.set_level ~all:true (Some Logs.Info) ; + let main = let* observer_enabled = observer_is_component_enabled () in config.use_observer <- observer_enabled ; - let rec loop () = - Monitor.try_with (fun () -> - if !self_test_only then - self_test ~root_dir:!root_dir - else - main ~root_dir:!root_dir ~state_path:!state_path - ~switch_path:!Xcp_client.switch_path - ) - >>= function - | Ok () -> - info "main thread shutdown cleanly" ; - return () - | Error x -> - error "main thread failed with %s" (Exn.to_string x) ; - Clock.after (Time.Span.of_sec 5.) >>= fun () -> loop () - in - loop () + if !self_test_only then + self_test ~root_dir:!root_dir + else + main ~root_dir:!root_dir ~state_path:!state_path + ~switch_path:!Xcp_client.switch_path in - ignore (run ()) ; - never_returns (Scheduler.go ()) + Lwt_main.run main diff --git a/ocaml/xapi-storage-script/test_lib.ml b/ocaml/xapi-storage-script/test_lib.ml new file mode 100644 index 00000000000..e016d1368a4 --- /dev/null +++ b/ocaml/xapi-storage-script/test_lib.ml @@ -0,0 +1,143 @@ +(* Copyright (C) Cloud Software Group Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +module Sys = Private.Lib.Sys +module Signal = Private.Lib.Signal +module Process = Private.Lib.Process + +let ( let* ) = Lwt.bind + +let test_content_rountrip = + let test () = + let contents = "yes" in + let path = Filename.temp_file "" "" in + let* () = Sys.save ~contents path in + let* result = Sys.read_file_contents path in + Alcotest.(check string) "Write and read roundtrip" contents result ; + Lwt.return () + in + ("Write and read file", `Quick, test) + +let test_readdir = + let test () = + let path = Filename.temp_file "" "" in + let filename = Filename.basename path in + let tmpdir = Filename.dirname path in + let* dir_contents = Sys.readdir tmpdir in + let file_present = List.exists (String.equal filename) dir_contents in + Alcotest.(check bool) "Temp file detected" true file_present ; + Lwt.return () + in + ("Read directory", `Quick, test) + +let test_assert_is_exec = + let test name path is_expected = + let* result = Sys.assert_is_executable path in + Alcotest.(check bool) name true (is_expected result) ; + Lwt.return () + in + let test () = + let path = "/missing/path" in + let is_expected = function + | Error (`missing p) -> + Alcotest.(check string) "Missing paths match" path p ; + true + | _ -> + false + in + let* () = test "File is missing" path is_expected in + + let path = Filename.temp_file "" "" in + let is_expected = function + | Error (`not_executable (p, _)) -> + Alcotest.(check string) "Non-exec paths match" path p ; + true + | _ -> + false + in + let* () = test "File is not executable" path is_expected in + + let* () = Lwt_unix.chmod path 0o700 in + let is_expected = function Ok () -> true | _ -> false in + let* () = test "File is now executable" path is_expected in + + Lwt.return () + in + ("Executable file detection", `Quick, test) + +let test_sys = + ("Sys", [test_content_rountrip; test_readdir; test_assert_is_exec]) + +let exit_or_signal_pp ppf es = + match es with + | Process.Output.Signal s -> + Fmt.pf ppf "Signal %s" (Signal.to_string s) + | Process.Output.Exit_non_zero int -> + Fmt.pf ppf "Exit %i" int + +let output_pp = + let module O = Process.Output in + let module Dump = Fmt.Dump in + Dump.record + [ + Dump.field "exit_status" + (fun t -> t.O.exit_status) + (Dump.result ~ok:Fmt.(any "()") ~error:exit_or_signal_pp) + ; Dump.field "stdout" (fun t -> t.O.stdout) Dump.string + ; Dump.field "stderr" (fun t -> t.O.stderr) Dump.string + ] + +let output_c = Alcotest.testable output_pp Stdlib.( = ) + +let test_run_status = + let module P = Process in + let test () = + let* output = P.run ~prog:"true" ~args:[] ~input:"" ~env:[] in + let expected = P.Output.{exit_status= Ok (); stdout= ""; stderr= ""} in + Alcotest.(check output_c) "Exit status is correct" expected output ; + + let* output = P.run ~prog:"false" ~args:[] ~input:"" ~env:[] in + let expected = + P.Output.{exit_status= Error (Exit_non_zero 1); stdout= ""; stderr= ""} + in + Alcotest.(check output_c) "Exit status is correct" expected output ; + + Lwt.return () + in + ("Run's exit status", `Quick, test) + +let test_run_output = + let module P = Process in + let test () = + let content = "@@@@@@" in + let* output = P.run ~prog:"cat" ~args:["-"] ~input:content ~env:[] in + let expected = P.Output.{exit_status= Ok (); stdout= content; stderr= ""} in + Alcotest.(check output_c) "Stdout is correct" expected output ; + + let* output = P.run ~prog:"cat" ~args:[content] ~input:content ~env:[] in + let stderr = + Printf.sprintf "cat: %s: No such file or directory\n" content + in + let expected = + P.Output.{exit_status= Error (Exit_non_zero 1); stdout= ""; stderr} + in + Alcotest.(check output_c) "Stderr is correct" expected output ; + Lwt.return () + in + ("Run output collection", `Quick, test) + +let test_proc = ("Process", [test_run_status; test_run_output]) + +let tests = [test_sys; test_proc] + +let () = Lwt_main.run @@ Alcotest_lwt.run "xapi-storage-script lib" tests diff --git a/ocaml/xapi-storage-script/test_lib.mli b/ocaml/xapi-storage-script/test_lib.mli new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ocaml/xapi-storage-script/xapi_storage_script_types.ml b/ocaml/xapi-storage-script/xapi_storage_script_types.ml deleted file mode 100644 index 9b8d9456ccc..00000000000 --- a/ocaml/xapi-storage-script/xapi_storage_script_types.ml +++ /dev/null @@ -1,25 +0,0 @@ -(* - * Copyright (C) Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -type backtrace = { - error: string - ; (* Python json.dumps and rpclib are not very friendly *) - files: string list - ; lines: int list -} -[@@deriving rpc] - -(* This matches xapi.py:exception *) -type error = {code: string; params: string list; backtrace: backtrace} -[@@deriving rpc] diff --git a/ocaml/xen-api-client/async/dune b/ocaml/xen-api-client/async/dune deleted file mode 100644 index a3ed8b645b7..00000000000 --- a/ocaml/xen-api-client/async/dune +++ /dev/null @@ -1,25 +0,0 @@ -(library - (name xen_api_client_async) - (public_name xen-api-client-async) - (libraries - async - async_kernel - async_unix - base - cohttp - core - core_unix - core_unix.time_unix - core_kernel - rpclib.core - rpclib.json - rpclib.xml - uri - xapi-client - xapi-consts - xen-api-client - xmlm - ) - (wrapped false) -) - diff --git a/ocaml/xen-api-client/async/xen_api_async_unix.ml b/ocaml/xen-api-client/async/xen_api_async_unix.ml deleted file mode 100644 index 3e8092c1faf..00000000000 --- a/ocaml/xen-api-client/async/xen_api_async_unix.ml +++ /dev/null @@ -1,134 +0,0 @@ -(* - * Copyright (c) 2012 Anil Madhavapeddy - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - * - *) -open Core -open Async -open Xen_api - -module IO = struct - type 'a t = 'a Deferred.t - - let ( >>= ) = Deferred.( >>= ) - - (* let (>>) m n = m >>= fun _ -> n *) - let return = Deferred.return - - type ic = (unit -> unit Deferred.t) * Reader.t - - type oc = (unit -> unit Deferred.t) * Writer.t - - type conn = unit - - let read_line (_, ic) = - Reader.read_line ic >>| function `Ok s -> Some s | `Eof -> None - - let read (_, ic) len = - let buf = Bytes.create len in - Reader.read ic ~len buf >>| function - | `Ok len' -> - let content = Bytes.sub buf ~pos:0 ~len:len' in - Bytes.to_string content - | `Eof -> - "" - - (* let read_exactly (_, ic) len = - let buf = String.create len in - Reader.really_read ic ~pos:0 ~len buf >>= - function - |`Ok -> return (Some buf) - |`Eof _ -> return None *) - - let write (_, oc) buf = Writer.write oc buf ; return () - - (* let write_line (_, oc) buf = - Writer.write oc buf; - Writer.write oc "\r\n"; - return () *) - - let flush (_, oc) = Async.Writer.flushed oc - - let close ((close1, _), (close2, _)) = close1 () >>= fun () -> close2 () - - let open_connection uri = - match Uri.scheme uri with - | Some "http" -> ( - let port = match Uri.port uri with None -> 80 | Some port -> port in - match Uri.host uri with - | Some host -> - let endp = Host_and_port.create ~host ~port in - Tcp.connect (Tcp.Where_to_connect.of_host_and_port endp) - >>| fun (_, ic, oc) -> - Ok - ( ((fun () -> Reader.close ic), ic) - , ((fun () -> Writer.close oc), oc) - ) - | None -> - return (Error (Failed_to_resolve_hostname "")) - ) - | Some x -> - return (Error (Unsupported_scheme x)) - | None -> - return (Error (Unsupported_scheme "")) - - let sleep s = after (sec s) - - let gettimeofday = Unix.gettimeofday -end - -module M = Make (IO) - -let exn_to_string = function - | Api_errors.Server_error (code, params) -> - Printf.sprintf "%s %s" code (String.concat ~sep:" " params) - | e -> - Printf.sprintf "Caught unexpected exception: %s" (Exn.to_string e) - -let do_it uri string = - let uri = Uri.of_string uri in - let connection = M.make uri in - let ( >>= ) = Deferred.( >>= ) in - Monitor.protect - (fun () -> - M.rpc connection string >>= function - | Ok x -> - return x - | Error e -> - eprintf "Caught: %s\n%!" (exn_to_string e) ; - Exn.reraise e "connection error" - ) - ~finally:(fun () -> M.disconnect connection) - -(* TODO: modify do_it to accept the timeout and remove the warnings *) - -[@@@ocaml.warning "-27"] - -let make ?(timeout = 30.) uri call = - let req = Xmlrpc.string_of_call call in - do_it uri req >>| Xmlrpc.response_of_string - -[@@@ocaml.warning "-27"] - -let make_json ?(timeout = 30.) uri call = - let req = Jsonrpc.string_of_call call in - do_it uri req >>| Jsonrpc.response_of_string - -module Client = Client.ClientF (struct - include Deferred - - let bind a f = bind a ~f -end) - -include Client diff --git a/ocaml/xen-api-client/async/xen_api_async_unix.mli b/ocaml/xen-api-client/async/xen_api_async_unix.mli deleted file mode 100644 index 4d8ac0a2886..00000000000 --- a/ocaml/xen-api-client/async/xen_api_async_unix.mli +++ /dev/null @@ -1,28 +0,0 @@ -(* - * Copyright (C) 2012 Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -val make : ?timeout:float -> string -> Rpc.call -> Rpc.response Async.Deferred.t -(** [make ?timeout uri] returns an 'rpc' function which can be - passed to Client.* functions *) - -val make_json : - ?timeout:float -> string -> Rpc.call -> Rpc.response Async.Deferred.t -(** [make_json ?timeout uri] returns an 'rpc' function which can be - passed to Client.* functions *) - -include module type of Client.ClientF (struct - include Async.Deferred - - let bind a f = bind a ~f -end) diff --git a/ocaml/xen-api-client/async_examples/dune b/ocaml/xen-api-client/async_examples/dune deleted file mode 100644 index 7d39e42c902..00000000000 --- a/ocaml/xen-api-client/async_examples/dune +++ /dev/null @@ -1,48 +0,0 @@ -(executable - (modes exe) - (name list_vms) - (modules list_vms) - (libraries - async - async_unix - base - base.caml - core - core_kernel - - xapi-consts - xapi-types - xen-api-client - xen-api-client-async - ) -) - -(executable - (modes exe) - (name event_test) - (modules event_test) - (libraries - async - async_unix - base - base.caml - core - core_kernel - rpclib.json - sexplib0 - xapi-consts - xapi-types - xen-api-client - xen-api-client-async - ) -) - -(alias - (name examples) - (deps - list_vms.exe - event_test.exe - ) - (package xen-api-client-async) -) - diff --git a/ocaml/xen-api-client/async_examples/event_test.ml b/ocaml/xen-api-client/async_examples/event_test.ml deleted file mode 100644 index 7107a8bda8f..00000000000 --- a/ocaml/xen-api-client/async_examples/event_test.ml +++ /dev/null @@ -1,175 +0,0 @@ -(* - * Copyright (C) 2012-2014 Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -open Core -open Async -open Xen_api_async_unix - -let uri = ref "http://127.0.0.1/" - -let username = ref "root" - -let password = ref "password" - -let enable_debug = ref false - -let debug fmt = - Printf.ksprintf - (fun txt -> - if !enable_debug then - eprintf "%s\n%!" txt - ) - fmt - -let error fmt = Printf.ksprintf (fun txt -> eprintf "Error: %s\n%!" txt) fmt - -let info fmt = Printf.ksprintf (fun txt -> eprintf "%s\n%!" txt) fmt - -let watch_events rpc session_id = - let open Event_types in - let module StringMap = Map.Make (String) in - let root = ref StringMap.empty in - - let update map ev = - (* type-specific table *) - let ty = - match StringMap.find map ev.ty with - | None -> - StringMap.empty - | Some x -> - x - in - let ty = - match ev.op with - | `add | `_mod -> ( - match ev.snapshot with - | None -> - error "Event contained no snapshot" ; - ty - | Some s -> - StringMap.update ty ev.reference ~f:(fun _ -> s) - ) - | `del -> - StringMap.remove ty ev.reference - in - if StringMap.is_empty ty then - StringMap.remove map ev.ty - else - StringMap.update map ev.ty ~f:(fun _ -> ty) - in - - let compare () = - let open Event_types in - Event.from ~rpc ~session_id ~classes:["*"] ~token:"" ~timeout:0. - >>= fun rpc -> - let e = event_from_of_rpc rpc in - if List.is_empty e.events then error "Empty list of events" ; - let current = List.fold_left ~init:StringMap.empty ~f:update e.events in - Sequence.iter - ~f:(fun (key, diff) -> - match (key, diff) with - | key, `Left _ -> - error "Replica has extra table: %s" key - | key, `Right _ -> - error "Replica has missing table: %s" key - | _, `Unequal (_, _) -> - () - ) - (StringMap.symmetric_diff !root current ~data_equal:(fun _ _ -> true)) ; - List.iter - ~f:(fun key -> - match StringMap.find !root key with - | None -> - error "Table missing in replica: %s" key - | Some root_table -> - let current_table = StringMap.find_exn current key in - Sequence.iter - ~f:(fun (key, diff) -> - match (key, diff) with - | r, `Left rpc -> - error "Replica has extra object: %s: %s" r - (Jsonrpc.to_string rpc) - | r, `Right rpc -> - error "Replica has missing object: %s: %s" r - (Jsonrpc.to_string rpc) - | r, `Unequal (rpc1, rpc2) -> - error "Replica has out-of-sync object: %s: %s <> %s" r - (Jsonrpc.to_string rpc1) (Jsonrpc.to_string rpc2) - ) - (StringMap.symmetric_diff root_table current_table - ~data_equal:(fun a b -> Base.Poly.equal a b - ) - ) - ) - (StringMap.keys current) ; - return () - in - - let rec loop token = - Event.from ~rpc ~session_id ~classes:["*"] ~token ~timeout:30. - >>= fun rpc -> - debug "received event: %s" (Jsonrpc.to_string rpc) ; - let e = event_from_of_rpc rpc in - List.iter ~f:(fun ev -> root := update !root ev) e.events ; - compare () >>= fun () -> - info "object counts: %s" - (String.concat ~sep:", " - (List.map - ~f:(fun key -> - Printf.sprintf "%s (%d)" key - (StringMap.length (StringMap.find_exn !root key)) - ) - (StringMap.keys !root) - ) - ) ; - loop e.token - in - loop "" - -let main () = - let rpc = make !uri in - Session.login_with_password ~rpc ~uname:!username ~pwd:!password - ~version:"1.0" ~originator:"event_test" - >>= fun session_id -> - let a = watch_events rpc session_id in - let b = watch_events rpc session_id in - a >>= fun () -> - b >>= fun () -> - Session.logout ~rpc ~session_id >>= fun () -> shutdown 0 ; return () - -let _ = - Arg.parse - [ - ( "-uri" - , Arg.Set_string uri - , Printf.sprintf "URI of server to connect to (default %s)" !uri - ) - ; ( "-u" - , Arg.Set_string username - , Printf.sprintf "Username to log in with (default %s)" !username - ) - ; ( "-pw" - , Arg.Set_string password - , Printf.sprintf "Password to log in with (default %s)" !password - ) - ; ( "-debug" - , Arg.Set enable_debug - , Printf.sprintf "Enable debug logging (default %b)" !enable_debug - ) - ] - (fun x -> eprintf "Ignoring argument: %s\n" x) - "Simple example which tracks the server state via events" ; - - let (_ : unit Deferred.t) = main () in - never_returns (Scheduler.go ()) diff --git a/ocaml/xen-api-client/async_examples/list_vms.ml b/ocaml/xen-api-client/async_examples/list_vms.ml deleted file mode 100644 index 6aac0feb527..00000000000 --- a/ocaml/xen-api-client/async_examples/list_vms.ml +++ /dev/null @@ -1,56 +0,0 @@ -(* - * Copyright (C) 2012 Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -open Core -open Async -open Xen_api_async_unix - -let uri = ref "http://127.0.0.1/" - -let username = ref "root" - -let password = ref "password" - -let main () = - let rpc = make !uri in - Session.login_with_password ~rpc ~uname:!username ~pwd:!password - ~version:"1.0" ~originator:"list_vms" - >>= fun session_id -> - VM.get_all_records ~rpc ~session_id >>= fun vms -> - List.iter - ~f:(fun (_, vm_rec) -> printf "VM %s\n%!" vm_rec.API.vM_name_label) - vms ; - Session.logout ~rpc ~session_id >>= fun () -> shutdown 0 ; return () - -let _ = - Arg.parse - [ - ( "-uri" - , Arg.Set_string uri - , Printf.sprintf "URI of server to connect to (default %s)" !uri - ) - ; ( "-u" - , Arg.Set_string username - , Printf.sprintf "Username to log in with (default %s)" !username - ) - ; ( "-pw" - , Arg.Set_string password - , Printf.sprintf "Password to log in with (default %s)" !password - ) - ] - (fun x -> eprintf "Ignoring argument: %s\n" x) - "Simple example which lists VMs found on a pool" ; - - let (_ : unit Deferred.t) = main () in - never_returns (Scheduler.go ()) diff --git a/quality-gate.sh b/quality-gate.sh index eb2d4daed90..2629645e03a 100755 --- a/quality-gate.sh +++ b/quality-gate.sh @@ -25,7 +25,7 @@ verify-cert () { } mli-files () { - N=508 + N=505 # do not count ml files from the tests in ocaml/{tests/perftest/quicktest} MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;) MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;) diff --git a/xapi-idl.opam b/xapi-idl.opam index c1fff027077..20c9ea0f1af 100644 --- a/xapi-idl.opam +++ b/xapi-idl.opam @@ -22,7 +22,7 @@ depends: [ "ipaddr" "logs" "lwt" {>= "5.0.0"} - "message-switch-async" {with-test} + "message-switch-lwt" {with-test} "message-switch-core" "message-switch-unix" "mtime" diff --git a/xapi-idl.opam.template b/xapi-idl.opam.template index beea3845af6..5f6105ba5da 100644 --- a/xapi-idl.opam.template +++ b/xapi-idl.opam.template @@ -20,7 +20,7 @@ depends: [ "ipaddr" "logs" "lwt" {>= "5.0.0"} - "message-switch-async" {with-test} + "message-switch-lwt" {with-test} "message-switch-core" "message-switch-unix" "mtime" diff --git a/xapi-storage-script.opam b/xapi-storage-script.opam index a8df41ef405..0a974584ac2 100644 --- a/xapi-storage-script.opam +++ b/xapi-storage-script.opam @@ -14,23 +14,19 @@ depends: [ "ocaml" "dune" {>= "3.15"} "conf-python-3" {with-test} - "xapi-idl" {>= "0.10.0"} - "xapi-storage" - "async" {>= "v0.9.0"} - "async_inotify" - "async_unix" {>= "112.24.00"} - "core" + "base" + "inotify" + "lwt" + "message-switch-lwt" "message-switch-unix" - "message-switch-async" - "rpclib" - "rpclib-async" "ppx_deriving_rpc" "ppx_sexp_conv" + "rpclib" + "rpclib-lwt" + "sexplib0" + "xapi-idl" {>= "0.10.0"} "xapi-stdext-date" -] -# python 2.7 is not enough to ensure the availability of 'python' in these -depexts: [ - ["python"] {os-family = "debian" & with-test} + "xapi-storage" ] synopsis: "A directory full of scripts can be a Xapi storage implementation" description: """ diff --git a/xapi-storage-script.opam.template b/xapi-storage-script.opam.template index b40cc0880b5..d569fda47b8 100644 --- a/xapi-storage-script.opam.template +++ b/xapi-storage-script.opam.template @@ -12,23 +12,19 @@ depends: [ "ocaml" "dune" {>= "3.15"} "conf-python-3" {with-test} - "xapi-idl" {>= "0.10.0"} - "xapi-storage" - "async" {>= "v0.9.0"} - "async_inotify" - "async_unix" {>= "112.24.00"} - "core" + "base" + "inotify" + "lwt" + "message-switch-lwt" "message-switch-unix" - "message-switch-async" - "rpclib" - "rpclib-async" "ppx_deriving_rpc" "ppx_sexp_conv" + "rpclib" + "rpclib-lwt" + "sexplib0" + "xapi-idl" {>= "0.10.0"} "xapi-stdext-date" -] -# python 2.7 is not enough to ensure the availability of 'python' in these -depexts: [ - ["python"] {os-family = "debian" & with-test} + "xapi-storage" ] synopsis: "A directory full of scripts can be a Xapi storage implementation" description: """ diff --git a/xen-api-client-async.opam b/xen-api-client-async.opam deleted file mode 100644 index c283cb6d1e8..00000000000 --- a/xen-api-client-async.opam +++ /dev/null @@ -1,38 +0,0 @@ -# This file is generated by dune, edit dune-project instead - -opam-version: "2.0" -maintainer: "xen-api@lists.xen.org" -authors: [ "David Scott" "Anil Madhavapeddy" "Jerome Maloberti" "John Else" "Jon Ludlam" "Thomas Sanders" "Mike McClurg" ] -license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception" -homepage: "https://github.com/xapi-project/xen-api" -dev-repo: "git+https://github.com/xapi-project/xen-api.git" -bug-reports: "https://github.com/xapi-project/xen-api/issues" -tags: [ - "org:xapi-project" -] -build: [ - ["dune" "build" "-p" name "-j" jobs] - ["dune" "runtest" "-p" name "-j" jobs] {with-test} -] -depends: [ - "ocaml" - "dune" {>= "3.15"} - "async" {>= "v0.9.0"} - "async_kernel" - "async_unix" - "base" - "base-threads" - "cohttp" {>= "0.22.0"} - "core" - "core_kernel" - "core_unix" - "rpclib" - "uri" - "xen-api-client" - "xmlm" -] -synopsis: - "Xen-API client library for remotely-controlling a xapi host" -url { - src: "https://github.com/xapi-project/xen-api/archive/master.tar.gz" -} diff --git a/xen-api-client-async.opam.template b/xen-api-client-async.opam.template deleted file mode 100644 index 8224d441c1d..00000000000 --- a/xen-api-client-async.opam.template +++ /dev/null @@ -1,36 +0,0 @@ -opam-version: "2.0" -maintainer: "xen-api@lists.xen.org" -authors: [ "David Scott" "Anil Madhavapeddy" "Jerome Maloberti" "John Else" "Jon Ludlam" "Thomas Sanders" "Mike McClurg" ] -license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception" -homepage: "https://github.com/xapi-project/xen-api" -dev-repo: "git+https://github.com/xapi-project/xen-api.git" -bug-reports: "https://github.com/xapi-project/xen-api/issues" -tags: [ - "org:xapi-project" -] -build: [ - ["dune" "build" "-p" name "-j" jobs] - ["dune" "runtest" "-p" name "-j" jobs] {with-test} -] -depends: [ - "ocaml" - "dune" {>= "3.15"} - "async" {>= "v0.9.0"} - "async_kernel" - "async_unix" - "base" - "base-threads" - "cohttp" {>= "0.22.0"} - "core" - "core_kernel" - "core_unix" - "rpclib" - "uri" - "xen-api-client" - "xmlm" -] -synopsis: - "Xen-API client library for remotely-controlling a xapi host" -url { - src: "https://github.com/xapi-project/xen-api/archive/master.tar.gz" -}