Skip to content

Commit

Permalink
CP-51574: Add explicit reentrant lock to Db_lock (#6011)
Browse files Browse the repository at this point in the history
Update `Db_lock` to use an explicit reentrant (recursive) mutex pattern.

This is a pretty standard implementation. The way it works is that
threads compete to become the "holder" of the lock. The current holder
of the lock is identified using thread identifiers (integers) and
read/written from/to atomically (by way of `Atomic.t`). If attempting to
acquire the lock (i.e. atomically `compare_and_set` the holder, with the
expected current value as physically equal to `None`) fails, then the
thread begins to wait. Waiting threads wait using a condition variable
to avoid busy waiting. The lock can be acquired several times after it
is acquired by the calling thread, but releasing it must ensure that
every lock is matched by a corresponding unlock (the thread must
relinquish every "hold" it has on the lock). Upon successfully releasing
its holds on the lock, the thread that is relinquishing control resets
the holder (to `None`) and uses the condition variable to signal a
waiting thread to wakeup (and attempt to become the holder).

To make this pattern safe and not expose too many details, an interface
file (`db_lock.mli`) is introduced. This interface does not expose the
details of the underlying lock, but rather the single function:

```ocaml
val with_lock : (unit -> 'a) -> 'a
```

which ensures the invocation of its argument is properly sandwiched
between code that correctly acquires and releases the lock (taking care
to avoid holding onto the lock during exceptional circumstances, e.g. by
way of an exception that is unhandled). Code written to use the database
lock currently only use this interface to acquire the `Db_lock`, so no
other changes are required.

A follow-up change may be to enforce the same usage pattern for the
global database flush lock, which is seemingly only used in the same
way.

-----

Ring3: BST+BVT (205351) all looks fine (so far) except an SDK-related
test, which appears to be a known issue. I have ran the same suites on
the same code a few times, with everything green except known issues.

This is really a code cleanup effort, as opposed to new functionality.
The current code is an ad-hoc implementation of this idea.

The hope is that the style moving forward would be to use atomics and
locks, so that we can begin to tackle things that would not be
domain-safe in OCaml 5. For example, the current implementation queries
`allow_thread_through_dbcache_mutex` (a `ref`) non-atomically.

Some code could probably be moved around but I'm unwilling to do any
serious functional changes as it would be rather easy to introduce
careless errors. This is the kind of simple implementation that you will
find in textbooks.
  • Loading branch information
contificate authored Sep 25, 2024
2 parents bbb642e + fae9b55 commit b5dbe55
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 47 deletions.
168 changes: 127 additions & 41 deletions ocaml/database/db_lock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,60 +11,146 @@
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)
(* Lock shared between client/slave implementations *)

open Xapi_stdext_pervasives.Pervasiveext
module type REENTRANT_LOCK = sig
type t

(* Withlock takes dbcache_mutex, and ref-counts to allow the same thread to re-enter without blocking as many times
as it wants. *)
let dbcache_mutex = Mutex.create ()
(** Timing statistics modified by each thread after the lock is
initially acquired. *)
type statistics = {
mutable max_time: float
; mutable min_time: float
; mutable total_time: float
; mutable acquires: int
}

let time = ref 0.0
val create : unit -> t
(** Creates an instance of a reentrant lock. *)

let n = ref 0
val lock : t -> unit
(** [lock l] acquires the lock [l]. If the calling thread already
holds the lock, the implementation internally increases the number
of "holds" the thread has on the lock. Each call to [lock] must
have a corresponding call to [unlock] or else it is an error. *)

let maxtime = ref neg_infinity
val unlock : t -> unit
(** [unlock l] releases a hold on the lock. If the hold count
becomes 0, the lock is free to be acquired by other threads. It is
an error to call this from a thread that does not hold the lock. *)

let mintime = ref infinity
val statistics : t -> statistics
(** Returns a copy of the internal timing statistics maintained by
the implementation. Calling this has the effect of temporarily
acquiring the lock, as only the lock holder can read or modify the
internal record. *)
end

let thread_reenter_count = ref 0
(** A simple re-entrant lock (recursive mutex). *)
module ReentrantLock : REENTRANT_LOCK = struct
type tid = int

let allow_thread_through_dbcache_mutex = ref None
type statistics = {
mutable max_time: float
; mutable min_time: float
; mutable total_time: float
; mutable acquires: int
}

let with_lock f =
let me = Thread.id (Thread.self ()) in
let do_with_lock () =
let now = Unix.gettimeofday () in
Mutex.lock dbcache_mutex ;
let now2 = Unix.gettimeofday () in
let delta = now2 -. now in
time := !time +. delta ;
n := !n + 1 ;
maxtime := max !maxtime delta ;
mintime := min !mintime delta ;
allow_thread_through_dbcache_mutex := Some me ;
thread_reenter_count := 1 ;
finally f (fun () ->
thread_reenter_count := !thread_reenter_count - 1 ;
if !thread_reenter_count = 0 then (
allow_thread_through_dbcache_mutex := None ;
Mutex.unlock dbcache_mutex
type t = {
holder: tid option Atomic.t (* The holder of the lock *)
; mutable holds: int (* How many holds the holder has on the lock *)
; lock: Mutex.t (* Barrier to signal waiting threads *)
; condition: Condition.t
(* Waiting threads are signalled via this condition to reattempt to acquire the lock *)
; statistics: statistics (* Bookkeeping of time taken to acquire lock *)
}

let create_statistics () =
{max_time= neg_infinity; min_time= infinity; total_time= 0.; acquires= 0}

let create () =
{
holder= Atomic.make None
; holds= 0
; lock= Mutex.create ()
; condition= Condition.create ()
; statistics= create_statistics ()
}

let current_tid () = Thread.(self () |> id)

let lock l =
let me = current_tid () in
match Atomic.get l.holder with
| Some tid when tid = me ->
l.holds <- l.holds + 1
| _ ->
let intended = Some me in
let counter = Mtime_clock.counter () in
Mutex.lock l.lock ;
while not (Atomic.compare_and_set l.holder None intended) do
Condition.wait l.condition l.lock
done ;
let stats = l.statistics in
let delta = Clock.Timer.span_to_s (Mtime_clock.count counter) in
stats.total_time <- stats.total_time +. delta ;
stats.min_time <- Float.min delta stats.min_time ;
stats.max_time <- Float.max delta stats.max_time ;
stats.acquires <- stats.acquires + 1 ;
Mutex.unlock l.lock ;
l.holds <- 1

let unlock l =
let me = current_tid () in
match Atomic.get l.holder with
| Some tid when tid = me ->
l.holds <- l.holds - 1 ;
if l.holds = 0 then (
let () = Atomic.set l.holder None in
Mutex.lock l.lock ;
Condition.signal l.condition ;
Mutex.unlock l.lock
)
)
in
match !allow_thread_through_dbcache_mutex with
| None ->
do_with_lock ()
| Some id ->
if id = me then (
thread_reenter_count := !thread_reenter_count + 1 ;
finally f (fun () -> thread_reenter_count := !thread_reenter_count - 1)
) else
do_with_lock ()
| _ ->
failwith
(Printf.sprintf "%s: Calling thread does not hold the lock!"
__MODULE__
)

let statistics l =
lock l ;
let stats =
(* Force a deep copy of the mutable fields *)
let ({acquires; _} as original) = l.statistics in
{original with acquires}
in
unlock l ; stats
end

(* The top-level database lock that writers must acquire. *)
let db_lock = ReentrantLock.create ()

(* Global flush lock: all db flushes are performed holding this lock *)
(* When we want to prevent the database from being flushed for a period
(e.g. when doing a host backup in the OEM product) then we acquire this lock *)
let global_flush_mutex = Mutex.create ()

let report () = (!n, !time /. float_of_int !n, !mintime, !maxtime)
let with_lock f =
let open Xapi_stdext_pervasives.Pervasiveext in
ReentrantLock.(
lock db_lock ;
finally f (fun () -> unlock db_lock)
)

type report = {count: int; avg_time: float; min_time: float; max_time: float}

let report () =
let ReentrantLock.{max_time; min_time; total_time; acquires} =
ReentrantLock.statistics db_lock
in
{
count= acquires
; avg_time= total_time /. float_of_int acquires
; min_time
; max_time
}
24 changes: 24 additions & 0 deletions ocaml/database/db_lock.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(*
* Copyright (c) Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

val global_flush_mutex : Mutex.t

val with_lock : (unit -> 'a) -> 'a
(** [with_lock f] executes [f] in a context where the calling thread
holds the database lock. It is safe to nest such calls as the
underlying lock is reentrant (a recursive mutex). *)

type report = {count: int; avg_time: float; min_time: float; max_time: float}

val report : unit -> report
3 changes: 3 additions & 0 deletions ocaml/database/dune
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
(libraries
forkexec
gzip
mtime
mtime.clock.os
clock
rpclib.core
rpclib.json
safe-resources
Expand Down
11 changes: 6 additions & 5 deletions ocaml/xapi/xapi_diagnostics.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ let gc_stats ~__context ~host:_ =

let db_stats ~__context =
(* Use Printf.sprintf to keep format *)
let n, avgtime, min, max = Xapi_database.Db_lock.report () in
let open Xapi_database in
let Db_lock.{count; avg_time; min_time; max_time} = Db_lock.report () in
[
("n", Printf.sprintf "%d" n)
; ("avgtime", Printf.sprintf "%f" avgtime)
; ("min", Printf.sprintf "%f" min)
; ("max", Printf.sprintf "%f" max)
("n", Printf.sprintf "%d" count)
; ("avgtime", Printf.sprintf "%f" avg_time)
; ("min", Printf.sprintf "%f" min_time)
; ("max", Printf.sprintf "%f" max_time)
]

let network_stats ~__context ~host:_ ~params =
Expand Down
2 changes: 1 addition & 1 deletion quality-gate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ verify-cert () {
}

mli-files () {
N=510
N=509
# do not count ml files from the tests in ocaml/{tests/perftest/quicktest}
MLIS=$(git ls-files -- '**/*.mli' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)
MLS=$(git ls-files -- '**/*.ml' | grep -vE "ocaml/tests|ocaml/perftest|ocaml/quicktest|ocaml/message-switch/core_test" | xargs -I {} sh -c "echo {} | cut -f 1 -d '.'" \;)
Expand Down

0 comments on commit b5dbe55

Please sign in to comment.