Skip to content

Commit

Permalink
Merge pull request #5861 from edwintorok/private/edvint/epoll4.0-delay
Browse files Browse the repository at this point in the history
[epoll]: replace duplicate Delay modules with ThreadExt.Delay
  • Loading branch information
edwintorok authored Jul 22, 2024
2 parents 54abab8 + d9590a0 commit b411adc
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 92 deletions.
1 change: 1 addition & 0 deletions message-switch-unix.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ depends: [
"base-threads"
"message-switch-core"
"ppx_deriving_rpc"
"xapi-stdext-unix"
]
synopsis: "A simple store-and-forward message switch"
description: """
Expand Down
1 change: 1 addition & 0 deletions ocaml/message-switch/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
rpclib.core
rpclib.json
threads.posix
xapi-stdext-threads
)
(preprocess (pps ppx_deriving_rpc))
)
Expand Down
66 changes: 1 addition & 65 deletions ocaml/message-switch/unix/protocol_unix_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,71 +34,7 @@ module Int64Map = Map.Make (struct
let compare = compare
end)

module Delay = struct
(* Concrete type is the ends of a pipe *)
type t = {
(* A pipe is used to wake up a thread blocked in wait: *)
mutable pipe_out: Unix.file_descr option
; mutable pipe_in: Unix.file_descr option
; (* Indicates that a signal arrived before a wait: *)
mutable signalled: bool
; m: Mutex.t
}

let make () =
{pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()}

exception Pre_signalled

let wait (x : t) (seconds : float) =
let to_close = ref [] in
let close' fd =
if List.mem fd !to_close then Unix.close fd ;
to_close := List.filter (fun x -> fd <> x) !to_close
in
finally'
(fun () ->
try
let pipe_out =
Mutex.execute x.m (fun () ->
if x.signalled then (
x.signalled <- false ;
raise Pre_signalled
) ;
let pipe_out, pipe_in = Unix.pipe () in
(* these will be unconditionally closed on exit *)
to_close := [pipe_out; pipe_in] ;
x.pipe_out <- Some pipe_out ;
x.pipe_in <- Some pipe_in ;
x.signalled <- false ;
pipe_out
)
in
let r, _, _ = Unix.select [pipe_out] [] [] seconds in
(* flush the single byte from the pipe *)
if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ;
(* return true if we waited the full length of time, false if we were woken *)
r = []
with Pre_signalled -> false
)
(fun () ->
Mutex.execute x.m (fun () ->
x.pipe_out <- None ;
x.pipe_in <- None ;
List.iter close' !to_close
)
)

let signal (x : t) =
Mutex.execute x.m (fun () ->
match x.pipe_in with
| Some fd ->
ignore (Unix.write fd (Bytes.of_string "X") 0 1)
| None ->
x.signalled <- true
(* If the wait hasn't happened yet then store up the signal *)
)
end
module Delay = Xapi_stdext_threads.Threadext.Delay

type item = {id: int; name: string; fn: unit -> unit}

Expand Down
28 changes: 1 addition & 27 deletions ocaml/xapi-idl/lib/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,7 @@ open D

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

module PipeDelay = struct
(* Concrete type is the ends of a pipe *)
type t = {
(* A pipe is used to wake up a thread blocked in wait: *)
pipe_out: Unix.file_descr
; pipe_in: Unix.file_descr
}

let make () =
let pipe_out, pipe_in = Unix.pipe () in
{pipe_out; pipe_in}

let wait (x : t) (seconds : float) =
let timeout = if seconds < 0.0 then 0.0 else seconds in
if Thread.wait_timed_read x.pipe_out timeout then
(* flush the single byte from the pipe *)
let (_ : int) = Unix.read x.pipe_out (Bytes.create 1) 0 1 in
(* return false if we were woken *)
false
else
(* return true if we waited the full length of time, false if we were woken *)
true

let signal (x : t) =
let (_ : int) = Unix.write x.pipe_in (Bytes.of_string "X") 0 1 in
()
end
module PipeDelay = Xapi_stdext_threads.Threadext.Delay

type handle = Mtime.span * int

Expand Down

0 comments on commit b411adc

Please sign in to comment.