Skip to content

Commit

Permalink
CP-51574: Add explicit reentrant lock to Db_lock
Browse files Browse the repository at this point in the history
Update Db_lock to use an explicit reentrant (recursive) mutex pattern.

This is a pretty standard implementation. The way it works is that threads
compete to become the "holder" of the lock. The current holder of the lock is
identified using thread identifiers (integers) and read/written from/to
atomically (by way of Atomic.t). If attempting to acquire the lock (i.e.
atomically compare_and_set the holder, with the expected current value as
physically equal to None) fails, then the thread begins to wait. Waiting
threads wait using a condition variable to avoid busy waiting. The lock can be
acquired several times after it is acquired by the calling thread, but
releasing it must ensure that every lock is matched by a corresponding unlock
(the thread must relinquish every "hold" it has on the lock). Upon successfully
releasing its holds on the lock, the thread that is relinquishing control
resets the holder (to None) and uses the condition variable to signal a waiting
thread to wakeup (and attempt to become the holder).

To make this pattern safe and not expose too many details, an interface file
(db_lock.mli) is introduced. This interface does not expose the details of the
underlying lock, but rather the single function:

  val with_lock : (unit -> 'a) -> 'a

which ensures the evaluation of its argument is properly sandwiched between
code that correctly acquires and releases the lock (taking care to avoid
holding onto the lock during exceptional circumstances, e.g. by way of an
exception that is unhandled). Code written to use the database lock currently
only use this interface to acquire the Db_lock, so no other changes are
required.

A follow-up change may be to enforce the same usage pattern for the global
database flush lock, which is seemingly only used in the same way.

Signed-off-by: Colin James <[email protected]>
  • Loading branch information
contificate committed Sep 25, 2024
1 parent bbb642e commit fae9b55
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 47 deletions.
168 changes: 127 additions & 41 deletions ocaml/database/db_lock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,60 +11,146 @@
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)
(* Lock shared between client/slave implementations *)

open Xapi_stdext_pervasives.Pervasiveext
module type REENTRANT_LOCK = sig
type t

(* Withlock takes dbcache_mutex, and ref-counts to allow the same thread to re-enter without blocking as many times
as it wants. *)
let dbcache_mutex = Mutex.create ()
(** Timing statistics modified by each thread after the lock is
initially acquired. *)
type statistics = {
mutable max_time: float
; mutable min_time: float
; mutable total_time: float
; mutable acquires: int
}

let time = ref 0.0
val create : unit -> t
(** Creates an instance of a reentrant lock. *)

let n = ref 0
val lock : t -> unit
(** [lock l] acquires the lock [l]. If the calling thread already
holds the lock, the implementation internally increases the number
of "holds" the thread has on the lock. Each call to [lock] must
have a corresponding call to [unlock] or else it is an error. *)

let maxtime = ref neg_infinity
val unlock : t -> unit
(** [unlock l] releases a hold on the lock. If the hold count
becomes 0, the lock is free to be acquired by other threads. It is
an error to call this from a thread that does not hold the lock. *)

let mintime = ref infinity
val statistics : t -> statistics
(** Returns a copy of the internal timing statistics maintained by
the implementation. Calling this has the effect of temporarily
acquiring the lock, as only the lock holder can read or modify the
internal record. *)
end

let thread_reenter_count = ref 0
(** A simple re-entrant lock (recursive mutex). *)
module ReentrantLock : REENTRANT_LOCK = struct
type tid = int

let allow_thread_through_dbcache_mutex = ref None
type statistics = {
mutable max_time: float
; mutable min_time: float
; mutable total_time: float
; mutable acquires: int
}

let with_lock f =
let me = Thread.id (Thread.self ()) in
let do_with_lock () =
let now = Unix.gettimeofday () in
Mutex.lock dbcache_mutex ;
let now2 = Unix.gettimeofday () in
let delta = now2 -. now in
time := !time +. delta ;
n := !n + 1 ;
maxtime := max !maxtime delta ;
mintime := min !mintime delta ;
allow_thread_through_dbcache_mutex := Some me ;
thread_reenter_count := 1 ;
finally f (fun () ->
thread_reenter_count := !thread_reenter_count - 1 ;
if !thread_reenter_count = 0 then (
allow_thread_through_dbcache_mutex := None ;
Mutex.unlock dbcache_mutex
type t = {
holder: tid option Atomic.t (* The holder of the lock *)
; mutable holds: int (* How many holds the holder has on the lock *)
; lock: Mutex.t (* Barrier to signal waiting threads *)
; condition: Condition.t
(* Waiting threads are signalled via this condition to reattempt to acquire the lock *)
; statistics: statistics (* Bookkeeping of time taken to acquire lock *)
}

let create_statistics () =
{max_time= neg_infinity; min_time= infinity; total_time= 0.; acquires= 0}

let create () =
{
holder= Atomic.make None
; holds= 0
; lock= Mutex.create ()
; condition= Condition.create ()
; statistics= create_statistics ()
}

let current_tid () = Thread.(self () |> id)

let lock l =
let me = current_tid () in
match Atomic.get l.holder with
| Some tid when tid = me ->
l.holds <- l.holds + 1
| _ ->
let intended = Some me in
let counter = Mtime_clock.counter () in
Mutex.lock l.lock ;
while not (Atomic.compare_and_set l.holder None intended) do
Condition.wait l.condition l.lock
done ;
let stats = l.statistics in
let delta = Clock.Timer.span_to_s (Mtime_clock.count counter) in
stats.total_time <- stats.total_time +. delta ;
stats.min_time <- Float.min delta stats.min_time ;
stats.max_time <- Float.max delta stats.max_time ;
stats.acquires <- stats.acquires + 1 ;
Mutex.unlock l.lock ;
l.holds <- 1

let unlock l =
let me = current_tid () in
match Atomic.get l.holder with
| Some tid when tid = me ->
l.holds <- l.holds - 1 ;
if l.holds = 0 then (
let () = Atomic.set l.holder None in
Mutex.lock l.lock ;
Condition.signal l.condition ;
Mutex.unlock l.lock
)
)
in
match !allow_thread_through_dbcache_mutex with
| None ->
do_with_lock ()
| Some id ->
if id = me then (
thread_reenter_count := !thread_reenter_count + 1 ;
finally f (fun () -> thread_reenter_count := !thread_reenter_count - 1)
) else
do_with_lock ()
| _ ->
failwith
(Printf.sprintf "%s: Calling thread does not hold the lock!"
__MODULE__
)

let statistics l =
lock l ;
let stats =
(* Force a deep copy of the mutable fields *)
let ({acquires; _} as original) = l.statistics in
{original with acquires}
in
unlock l ; stats
end

(* The top-level database lock that writers must acquire. *)
let db_lock = ReentrantLock.create ()

(* Global flush lock: all db flushes are performed holding this lock *)
(* When we want to prevent the database from being flushed for a period
(e.g. when doing a host backup in the OEM product) then we acquire this lock *)
let global_flush_mutex = Mutex.create ()

let report () = (!n, !time /. float_of_int !n, !mintime, !maxtime)
let with_lock f =
let open Xapi_stdext_pervasives.Pervasiveext in
ReentrantLock.(
lock db_lock ;
finally f (fun () -> unlock db_lock)
)

type report = {count: int; avg_time: float; min_time: float; max_time: float}

let report () =
let ReentrantLock.{max_time; min_time; total_time; acquires} =
ReentrantLock.statistics db_lock
in
{
count= acquires
; avg_time= total_time /. float_of_int acquires
; min_time
; max_time
}
24 changes: 24 additions & 0 deletions ocaml/database/db_lock.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(*
* Copyright (c) Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

val global_flush_mutex : Mutex.t

val with_lock : (unit -> 'a) -> 'a
(** [with_lock f] executes [f] in a context where the calling thread
holds the database lock. It is safe to nest such calls as the
underlying lock is reentrant (a recursive mutex). *)

type report = {count: int; avg_time: float; min_time: float; max_time: float}

val report : unit -> report
3 changes: 3 additions & 0 deletions ocaml/database/dune
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
(libraries
forkexec
gzip
mtime
mtime.clock.os
clock
rpclib.core
rpclib.json
safe-resources
Expand Down
11 changes: 6 additions & 5 deletions ocaml/xapi/xapi_diagnostics.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ let gc_stats ~__context ~host:_ =

let db_stats ~__context =
(* Use Printf.sprintf to keep format *)
let n, avgtime, min, max = Xapi_database.Db_lock.report () in
let open Xapi_database in
let Db_lock.{count; avg_time; min_time; max_time} = Db_lock.report () in
[
("n", Printf.sprintf "%d" n)
; ("avgtime", Printf.sprintf "%f" avgtime)
; ("min", Printf.sprintf "%f" min)
; ("max", Printf.sprintf "%f" max)
("n", Printf.sprintf "%d" count)
; ("avgtime", Printf.sprintf "%f" avg_time)
; ("min", Printf.sprintf "%f" min_time)
; ("max", Printf.sprintf "%f" max_time)
]

let network_stats ~__context ~host:_ ~params =
Expand Down
2 changes: 1 addition & 1 deletion quality-gate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ verify-cert () {
}

mli-files () {
N=510
N=509
# do not count ml files from the tests in ocaml/{tests/perftest/quicktest}
MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)
MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)
Expand Down

0 comments on commit fae9b55

Please sign in to comment.