Skip to content

Commit

Permalink
irmin-pack: update GC for chunked suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek committed Nov 3, 2022
1 parent 3dd2231 commit 7d166c3
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 213 deletions.
61 changes: 59 additions & 2 deletions src/irmin-pack/unix/chunked_suffix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
Raises `Read_out_of_bounds exception. *)

val fold :
(acc:'a -> is_appendable:bool -> chunk:chunk -> 'a) -> 'a -> t -> 'a

val open_ :
start_idx:int ->
chunk_num:int ->
Expand All @@ -74,6 +77,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
(unit, [> add_new_error ]) result

val length : t -> int63
(** [length t] is the length of bytes for all chunks *)

val start_idx : t -> int
(** [start_idx t] is the idx of the first chunk *)

val count : t -> int
(** [count t] is the number of chunks *)
end = struct
type t = { mutable chunks : chunk Array.t }

Expand All @@ -100,6 +110,14 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let is_legacy chunk_idx = chunk_idx = 0

let fold f acc t =
let appendable_idx = (appendable t).idx in
Array.fold_left
(fun acc chunk ->
let is_appendable = chunk.idx = appendable_idx in
f ~acc ~is_appendable ~chunk)
acc t.chunks

let open_ ~start_idx ~chunk_num ~open_chunk =
let off_acc = ref Int63.zero in
let create_chunk i =
Expand Down Expand Up @@ -170,6 +188,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let length t =
let open Int63.Syntax in
Array.fold_left (fun sum c -> sum + Ao.end_poff c.ao) Int63.zero t.chunks

let count t = Array.length t.chunks
let start_idx t = t.chunks.(0).idx
end

type t = { inventory : Inventory.t; root : string; dead_header_size : int }
Expand Down Expand Up @@ -234,13 +255,40 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in
{ inventory; root; dead_header_size }

let start_idx t = Inventory.start_idx t.inventory
let chunk_num t = Inventory.count t.inventory
let appendable_ao t = (Inventory.appendable t.inventory).ao
let end_poff t = appendable_ao t |> Ao.end_poff
let length t = Inventory.length t.inventory

let read_exn t ~off ~len buf =
let chunk, poff = Inventory.find ~off t.inventory in
Ao.read_exn chunk.ao ~off:poff ~len buf
let rec read progress_off suffix_off len_requested =
let open Int63.Syntax in
(* Find chunk with [suffix_off] and calculate length we can read. *)
let chunk, poff = Inventory.find ~off:suffix_off t.inventory in
let chunk_end_poff = Ao.end_poff chunk.ao in
let read_end_poff = poff + len_requested in
let len_read =
if read_end_poff > chunk_end_poff then chunk_end_poff - poff
else len_requested
in

(* Perform read. If this is the first read, we can use [buf]; otherwise,
we create a new buffer and transfer after the read. *)
let len_i = Int63.to_int len_read in
let is_first_read = progress_off = Int63.zero in
let ao_buf = if is_first_read then buf else Bytes.create len_i in
Ao.read_exn chunk.ao ~off:poff ~len:len_i ao_buf;
if not is_first_read then
Bytes.blit ao_buf 0 buf (Int63.to_int progress_off) len_i;

(* Read more if any is [rem]aining. *)
let rem = len_requested - len_read in
if rem > Int63.zero then
read (progress_off + len_read) (suffix_off + len_read) rem
else ()
in
read Int63.zero off (Int63.of_int len)

let append_exn t s = Ao.append_exn (appendable_ao t) s

Expand Down Expand Up @@ -277,4 +325,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let readonly t = appendable_ao t |> Ao.readonly
let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold

let fold_chunks f acc t =
Inventory.fold
(fun ~acc ~is_appendable ~chunk ->
let len = Ao.end_poff chunk.ao in
let start_suffix_off = chunk.suffix_off in
let end_suffix_off = Int63.Syntax.(start_suffix_off + len) in
f ~acc ~idx:chunk.idx ~start_suffix_off ~end_suffix_off ~is_appendable)
acc t.inventory
end
13 changes: 13 additions & 0 deletions src/irmin-pack/unix/chunked_suffix_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ module type S = sig
t ->
(unit, [> add_new_error ]) result

val start_idx : t -> int
val chunk_num : t -> int
val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result
val empty_buffer : t -> bool
val flush : t -> (unit, [> Io.write_error ]) result
Expand All @@ -98,6 +100,17 @@ module type S = sig
val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result
val readonly : t -> bool
val auto_flush_threshold : t -> int option

val fold_chunks :
(acc:'a ->
idx:int ->
start_suffix_off:int63 ->
end_suffix_off:int63 ->
is_appendable:bool ->
'a) ->
'a ->
t ->
'a
end

module type Sigs = sig
Expand Down
37 changes: 27 additions & 10 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
module Control = Fm.Control

type t = { fm : Fm.t }
type location = Prefix | Suffix [@@deriving irmin]
type location = Prefix | Suffix [@@deriving irmin ~pp]

type accessor = { poff : int63; len : int63; location : location }
[@@deriving irmin]
Expand Down Expand Up @@ -66,27 +66,41 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
assert false
| Gced { suffix_start_offset; _ } -> suffix_start_offset

let suffix_dead_bytes t =
let pl = Control.payload (Fm.control t.fm) in
match pl.status with
| Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
| No_gc_yet ->
Int63.zero
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
assert false
| Gced { suffix_dead_bytes; _ } -> suffix_dead_bytes

(* The suffix only know the real offsets, it is in the dispatcher that global
offsets are translated into real ones (i.e. in prefix or suffix offsets). *)
let end_offset t =
let open Int63.Syntax in
Suffix.length (Fm.suffix t.fm) + suffix_start_offset t
Suffix.length (Fm.suffix t.fm) + suffix_start_offset t - suffix_dead_bytes t

module Suffix_arithmetic = struct
(* Adjust the read in suffix, as the global offset [off] is
[off] = [suffix_start_offset] + [suffix_offset]. *)
let poff_of_off t off =
[off] = [suffix_start_offset] + [soff] - [suffix_dead_bytes]. *)
let soff_of_off t off =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
off - suffix_start_offset
let suffix_dead_bytes = suffix_dead_bytes t in
off - suffix_start_offset + suffix_dead_bytes

let off_of_poff t suffix_off =
let off_of_soff t soff =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
suffix_off + suffix_start_offset
let suffix_dead_bytes = suffix_dead_bytes t in
suffix_start_offset + soff - suffix_dead_bytes
end

let offset_of_suffix_poff = Suffix_arithmetic.off_of_poff
let offset_of_soff = Suffix_arithmetic.off_of_soff
let soff_of_offset = Suffix_arithmetic.soff_of_off

module Prefix_arithmetic = struct
(* Find the last chunk which is before [off_start] (or at [off_start]). If no
Expand Down Expand Up @@ -162,7 +176,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
if entry_end_offset > end_offset t then
raise (Errors.Pack_error `Read_out_of_bounds)
else
let poff = Suffix_arithmetic.poff_of_off t off in
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }

let v_in_prefix_exn mapping ~off ~len =
Expand All @@ -184,7 +198,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
else if bytes_after_off > max_len then max_len
else bytes_after_off
in
let poff = Suffix_arithmetic.poff_of_off t off in
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }

let v_range_in_prefix_exn t ~off ~min_len ~max_len =
Expand All @@ -211,6 +225,9 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
end

let read_exn t { poff; len; location } buf =
[%log.debug
"read_exn in %a at %a for %a" (Irmin.Type.pp location_t) location Int63.pp
poff Int63.pp len];
assert (len <= Int63.of_int Stdlib.max_int);
(* This assetion cannot be triggered because:
Expand Down
10 changes: 7 additions & 3 deletions src/irmin-pack/unix/dispatcher_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,13 @@ module type S = sig
(** [suffix_start_offset] is the offsets of the first pack entry in the
suffix. All pack entries in the prefix fit below [suffix_start_offset]. *)

val offset_of_suffix_poff : t -> int63 -> int63
(** [offset_of_suffix_poff t suffix_off] converts a suffix offset into a
(global) offset. *)
val offset_of_soff : t -> int63 -> int63
(** [offset_of_soff t suffix_off] converts a suffix offset into a (global)
offset. *)

val soff_of_offset : t -> int63 -> int63
(** [soff_of_offset t global_offset] converts a global offset to a suffix
offset. *)

val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit
(** [read_bytes_exn] reads a slice of the global offset space defined by [off]
Expand Down
44 changes: 12 additions & 32 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,11 @@ struct
let chunk_start_idx0 = pl0.chunk_start_idx in
let chunk_start_idx1 = pl1.chunk_start_idx in
let* () =
if
chunk_num0 <> chunk_num1
|| chunk_start_idx0 <> chunk_start_idx1
(* TODO: remove this last check, kept for passing the tests. *)
|| gen0 <> gen1
if chunk_num0 <> chunk_num1 || chunk_start_idx0 <> chunk_start_idx1
then
let end_poff = pl1.suffix_end_poff in
(* TODO: remove this max, kept for passing the tests. *)
let chunk_start_idx = max chunk_start_idx1 gen1 in
reopen_suffix t ~chunk_start_idx ~end_poff ~chunk_num:chunk_num1
reopen_suffix t ~chunk_start_idx:chunk_start_idx1 ~end_poff
~chunk_num:chunk_num1
else Ok ()
in
if gen0 = gen1 then Ok ()
Expand Down Expand Up @@ -692,33 +687,22 @@ struct
| `Unknown_major_pack_version _ ) as e ->
e)

let swap t ~generation ~new_suffix_start_offset ~new_suffix_end_offset
~latest_gc_target_offset =
let swap t ~generation ~suffix_start_offset ~chunk_start_idx ~chunk_num
~suffix_dead_bytes ~latest_gc_target_offset =
let open Result_syntax in
[%log.debug
"Gc in main: swap %d %#d %#d" generation
(Int63.to_int new_suffix_start_offset)
(Int63.to_int new_suffix_end_offset)];
"Gc in main: swap gen %d; suffix start %a; chunk start idx %d; chunk num \
%d; suffix dead bytes %a"
generation Int63.pp suffix_start_offset chunk_start_idx chunk_num Int63.pp
suffix_dead_bytes];
let c0 = Mtime_clock.counter () in
let pl = Control.payload t.control in
(* Opening the suffix requires passing it its length. We compute it from the
global offsets

TODO: this calculation of [suffix_end_poff] only works with one chunk.
this code will be removed once we have chunk GC.
*)
let suffix_end_poff =
let open Int63.Syntax in
new_suffix_end_offset - new_suffix_start_offset
in
(* Step 1. Reopen files *)
let* () = reopen_prefix t ~generation in
let* () = reopen_mapping t ~generation in
(* TODO: remove this max, kept for the tests. *)
let chunk_start_idx = max pl.chunk_start_idx generation in
let* () =
reopen_suffix t ~chunk_start_idx ~end_poff:suffix_end_poff
~chunk_num:pl.chunk_num
reopen_suffix t ~chunk_start_idx ~chunk_num ~end_poff:pl.suffix_end_poff
in
let span1 = Mtime_clock.count c0 |> Mtime.Span.to_us in

Expand All @@ -734,19 +718,16 @@ struct
| T14 | T15 ->
assert false
| Gced _ | No_gc_yet ->
let suffix_start_offset = new_suffix_start_offset in
Gced
{
suffix_start_offset;
generation;
latest_gc_target_offset;
suffix_dead_bytes = Int63.zero;
suffix_dead_bytes;
}
in

(* TODO using generation for starting chunk idx works since we only have
only one chunk at a time. this will change once we have chunk based GC. *)
{ pl with status; suffix_end_poff; chunk_start_idx = generation }
{ pl with status; chunk_start_idx; chunk_num }
in
[%log.debug "GC: writing new control_file"];
Control.set_payload t.control pl
Expand Down Expand Up @@ -780,7 +761,6 @@ struct
(* Unreachable *)
assert false

(* TODO : this does not work yet with GC. *)
let split t =
let open Result_syntax in
(* Step 1. Create a new chunk file *)
Expand Down
13 changes: 8 additions & 5 deletions src/irmin-pack/unix/file_manager_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,16 @@ module type S = sig
val swap :
t ->
generation:int ->
new_suffix_start_offset:int63 ->
new_suffix_end_offset:int63 ->
suffix_start_offset:int63 ->
chunk_start_idx:int ->
chunk_num:int ->
suffix_dead_bytes:int63 ->
latest_gc_target_offset:int63 ->
(unit, [> Errs.t ]) result
(** Swaps to using files from the GC [generation]. The offsets
[new_suffix_start_offset] and [new_suffix_end_offset] are used to properly
load the suffix. The control file is also updated. *)
(** Swaps to using files from the GC [generation]. The values
[suffix_start_offset], [chunk_start_idx], [chunk_num], and
[suffix_dead_bytes] are used to properly load and read the suffix after a
GC. The control file is also updated on disk. *)

val readonly : t -> bool
val generation : t -> int
Expand Down
Loading

0 comments on commit 7d166c3

Please sign in to comment.