Skip to content

Commit

Permalink
Merge pull request #5404 from Vincent-lau/private/shul2/switch-optional
Browse files Browse the repository at this point in the history
CP-47033: Make message switch concurrent processing optional
  • Loading branch information
Vincent-lau authored Apr 11, 2024
2 parents e3d2299 + 966b636 commit 01487ea
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 5 deletions.
3 changes: 3 additions & 0 deletions ocaml/message-switch/async/protocol_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ module M = struct

let iter f t = Deferred.List.iter t ~f

let iter_dontwait f t =
Deferred.don't_wait_for @@ Deferred.List.iter ~how:`Parallel t ~f

let any = Deferred.any

let is_determined = Deferred.is_determined
Expand Down
1 change: 1 addition & 0 deletions ocaml/message-switch/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
sexplib
sexplib0
uri
xapi-log
)
(preprocess (pps ppx_deriving_rpc ppx_sexp_conv))
)
Expand Down
92 changes: 92 additions & 0 deletions ocaml/message-switch/core/make.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
open Sexplib.Std
open Protocol

module D = Debug.Make (struct let name = "Message_switch.make" end)

open D

module Connection =
functor
(IO : Cohttp.S.IO)
Expand Down Expand Up @@ -406,4 +410,92 @@ functor
in
let _ = loop c None in
return (Ok t)

let listen_p ~process ~switch:port ~queue:name () =
let token = Printf.sprintf "%d" (Unix.getpid ()) in
let protect_connect path f =
M.connect path >>= fun conn ->
f conn >>= function
| Ok _ as ok ->
return ok
| Error _ as err ->
M.disconnect conn >>= fun () -> return err
in
let reconnect () =
protect_connect port @@ fun request_conn ->
Connection.rpc request_conn (In.Login token) >>|= fun (_ : string) ->
protect_connect port @@ fun reply_conn ->
Connection.rpc reply_conn (In.Login token) >>|= fun (_ : string) ->
return (Ok (request_conn, reply_conn))
in
reconnect () >>|= fun ((request_conn, reply_conn) as c) ->
let request_shutdown = M.Ivar.create () in
let on_shutdown = M.Ivar.create () in
let mutex = M.Mutex.create () in
Connection.rpc request_conn (In.CreatePersistent name) >>|= fun _ ->
let t = {request_shutdown; on_shutdown} in
let reconnect () =
M.disconnect request_conn >>= fun () ->
M.disconnect reply_conn >>= reconnect
in
let rec loop c from =
let transfer = {In.from; timeout; queues= [name]} in
let frame = In.Transfer transfer in
let message = Connection.rpc request_conn frame in
any [map (fun _ -> ()) message; M.Ivar.read request_shutdown]
>>= fun () ->
if is_determined (M.Ivar.read request_shutdown) then (
M.Ivar.fill on_shutdown () ; return (Ok ())
) else
message >>= function
| Error _e ->
M.Mutex.with_lock mutex reconnect >>|= fun c -> loop c from
| Ok raw -> (
let transfer = Out.transfer_of_rpc (Jsonrpc.of_string raw) in
let print_error = function
| Ok (_ : string) ->
return ()
| Error _ as err ->
error "message switch reply received error" ;
ignore @@ error_to_msg err ;
return ()
in
match transfer.Out.messages with
| [] ->
loop c from
| _ :: _ ->
iter_dontwait
(fun (i, m) ->
process m.Message.payload >>= fun response ->
( match m.Message.kind with
| Message.Response _ ->
return () (* configuration error *)
| Message.Request reply_to ->
let request =
In.Send
( reply_to
, {
Message.kind= Message.Response i
; payload= response
}
)
in
M.Mutex.with_lock mutex (fun () ->
Connection.rpc reply_conn request
)
>>= print_error
)
>>= fun () ->
let request = In.Ack i in
M.Mutex.with_lock mutex (fun () ->
Connection.rpc reply_conn request
)
>>= print_error
)
transfer.Out.messages ;
loop c (Some transfer.Out.next)
)
in
let _ = loop c None in
return (Ok t)
end
10 changes: 10 additions & 0 deletions ocaml/message-switch/core/s.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ module type BACKEND = sig

val iter : ('a -> unit t) -> 'a list -> unit t

val iter_dontwait : ('a -> unit t) -> 'a list -> unit

val any : 'a t list -> 'a t

val is_determined : 'a t -> bool
Expand Down Expand Up @@ -89,6 +91,14 @@ module type SERVER = sig
(** Connect to [switch] and start processing messages on [queue] via function
[process] *)

val listen_p :
process:(string -> string io)
-> switch:string
-> queue:string
-> unit
-> t result io
(** same as above, but processes requests concurrently *)

val shutdown : t:t -> unit -> unit io
(** [shutdown t] shutdown a server *)
end
Expand Down
12 changes: 11 additions & 1 deletion ocaml/message-switch/core_test/async/server_async_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ let path = ref "/var/run/message-switch/sock"

let name = ref "server"

let concurrent = ref false

let shutdown = Ivar.create ()

let process = function
Expand All @@ -33,7 +35,10 @@ let process = function

let main () =
let (_ : 'a Deferred.t) =
Server.listen ~process ~switch:!path ~queue:!name ()
if !concurrent then
Server.listen_p ~process ~switch:!path ~queue:!name ()
else
Server.listen ~process ~switch:!path ~queue:!name ()
in
Ivar.read shutdown >>= fun () ->
Clock.after (Time.Span.of_sec 1.) >>= fun () -> exit 0
Expand All @@ -49,6 +54,11 @@ let _ =
, Arg.Set_string name
, Printf.sprintf "name to send message to (default %s)" !name
)
; ( "-concurrent"
, Arg.Set concurrent
, Printf.sprintf "set concurrent processing of messages (default %b)"
!concurrent
)
]
(fun x -> P.fprintf stderr "Ignoring unexpected argument: %s" x)
"Respond to RPCs on a name" ;
Expand Down
6 changes: 4 additions & 2 deletions ocaml/message-switch/core_test/basic-rpc-test.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#!/bin/bash
set -e

SPATH=${TMPDIR:-/tmp}/sock
SWITCHPATH=${TMPDIR:-/tmp}/switch
SPATH=${TMPDIR:-/tmp}/sock_s
SWITCHPATH=${TMPDIR:-/tmp}/switch_s


rm -rf ${SWITCHPATH} && mkdir -p ${SWITCHPATH}

echo Test message switch serial processing

echo Checking the switch can start late
./server_unix_main.exe -path $SPATH &
sleep 1
Expand Down
45 changes: 45 additions & 0 deletions ocaml/message-switch/core_test/concur-rpc-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash
set -e

SPATH="${TMPDIR:-/tmp}/sock_p-$$"
SWITCHPATH="${TMPDIR:-/tmp}/switch_p-$$"

trap "cleanup" TERM INT

function cleanup {
rm -rf "${SWITCHPATH}"
}

rm -rf "${SWITCHPATH}" && mkdir -p "${SWITCHPATH}"

echo Test message switch concurrent processing

echo Checking the switch can start late
test -x ./server_unix_main.exe || exit 1
./server_unix_main.exe -path "$SPATH" &
sleep 1
test -x ../switch/switch_main.exe && test -x ./client_unix_main.exe || exit 1
../switch/switch_main.exe --path "$SPATH" --statedir "${SWITCHPATH}" &
./client_unix_main.exe -path "$SPATH" -secs 5
sleep 2

echo Performance test of Lwt to Lwt
test -x lwt/server_main.exe && test -x lwt/client_main.exe || exit 1
lwt/server_main.exe -path "$SPATH" -concurrent &
lwt/client_main.exe -path "$SPATH" -secs 5
sleep 2

echo Performance test of Async to Lwt
test -x lwt/server_main.exe && test -x async/client_async_main.exe || exit 1
lwt/server_main.exe -path "$SPATH" -concurrent &
async/client_async_main.exe -path "$SPATH" -secs 5
sleep 2

echo Performance test of Async to Async
test -x async/server_async_main.exe && test -x async/client_async_main.exe || exit 1
async/server_async_main.exe -path "$SPATH" -concurrent &
async/client_async_main.exe -path "$SPATH" -secs 5
sleep 2

../cli/main.exe shutdown --path "$SPATH"
sleep 2
17 changes: 17 additions & 0 deletions ocaml/message-switch/core_test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,20 @@
(package message-switch)
)

(rule
(alias runtest)
(deps
client_unix_main.exe
server_unix_main.exe
async/client_async_main.exe
async/server_async_main.exe
lwt/client_main.exe
lwt/server_main.exe
lwt/link_test_main.exe
../switch/switch_main.exe
../cli/main.exe
)
(action (run ./concur-rpc-test.sh))
(package message-switch)
)

16 changes: 14 additions & 2 deletions ocaml/message-switch/core_test/lwt/server_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ let path = ref "/var/run/message-switch/sock"

let name = ref "server"

let concurrent = ref false

let t, u = Lwt.task ()

let process = function
Expand All @@ -29,8 +31,13 @@ let process = function
return x

let main () =
Message_switch_lwt.Protocol_lwt.Server.listen ~process ~switch:!path
~queue:!name ()
( if !concurrent then
Message_switch_lwt.Protocol_lwt.Server.listen_p ~process ~switch:!path
~queue:!name ()
else
Message_switch_lwt.Protocol_lwt.Server.listen ~process ~switch:!path
~queue:!name ()
)
>>= fun _ ->
t >>= fun () -> Lwt_unix.sleep 1.

Expand All @@ -45,6 +52,11 @@ let _ =
, Arg.Set_string name
, Printf.sprintf "name to send message to (default %s)" !name
)
; ( "-concurrent"
, Arg.Set concurrent
, Printf.sprintf "set concurrent processing of messages (default %b)"
!concurrent
)
]
(fun x -> Printf.fprintf stderr "Ignoring unexpected argument: %s" x)
"Respond to RPCs on a name" ;
Expand Down
2 changes: 2 additions & 0 deletions ocaml/message-switch/lwt/protocol_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ module M = struct

let iter = Lwt_list.iter_s

let iter_dontwait f lst = Lwt.async (fun () -> Lwt_list.iter_p f lst)

let any = Lwt.choose

let is_determined t = Lwt.state t <> Lwt.Sleep
Expand Down
7 changes: 7 additions & 0 deletions ocaml/message-switch/switch/switch_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ module Lwt_result = struct
let ( >>= ) m f = m >>= fun x -> f (Stdlib.Result.get_ok x)
end

let exn_hook e =
let bt = Printexc.get_raw_backtrace () in
error "Caught exception in Lwt.async: %s" (Printexc.to_string e) ;
error "backtrace: %s" (Printexc.raw_backtrace_to_string bt)

let () = Lwt.async_exception_hook := exn_hook

let make_server config trace_config =
let open Config in
info "Started server on %s" config.path ;
Expand Down
2 changes: 2 additions & 0 deletions ocaml/message-switch/unix/protocol_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -546,5 +546,7 @@ module Server = struct
let (_ : Thread.t) = thread_forever (loop connections) None in
Ok ()

let listen_p = listen

let shutdown ~t:_ () = failwith "Shutdown is unimplemented"
end

0 comments on commit 01487ea

Please sign in to comment.