Skip to content

Commit

Permalink
Merge pull request #2126 from metanivek/chunked_suffix_gc
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek authored Nov 3, 2022
2 parents 3dd2231 + 26bc500 commit 98e0ed0
Show file tree
Hide file tree
Showing 13 changed files with 469 additions and 247 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
- Detecting control file corruption with a checksum (#2119, @art-w)
- Change on-disk layout of the suffix from a single file to a multiple,
chunked file design (#2115, @metanivek)
- Modify GC to work with new chunked suffix. See `examples/gc.ml` for a
demonstration of how it works with the new `split` function. (#2126,
@metanivek)

### Fixed

Expand Down
103 changes: 69 additions & 34 deletions examples/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ module Repo_config = struct
end

(** Utility for creating commit info *)
let info fmt = Irmin_unix.info ~author:"pack example" fmt
let info fmt key value = Irmin_unix.info ~author:"pack example" fmt key value ()

(** Utility for computing the size of a directory *)
let rec megabytes_of_path path =
Expand All @@ -95,54 +95,89 @@ let rec megabytes_of_path path =
0. (Sys.readdir path)
else float_of_int Unix.((stat path).st_size) /. 1e6

(** Demonstrate running GC on all commits before the head *)
let gc_all_but_head repo branch =
let* head = Store.Head.get branch in
let head_key = Store.Commit.key head in
let finished = function
| Ok stats ->
let duration = Irmin_pack_unix.Stats.Latest_gc.total_duration stats in
let finalise_duration =
Irmin_pack_unix.Stats.Latest_gc.finalise_duration stats
(** A utility module for tracking the latest commit and the commit we will want
to run GC for. *)
module Tracker = struct
type t = {
mutable latest_commit : Store.commit option;
mutable next_gc_commit : Store.commit option;
}

let v () = { latest_commit = None; next_gc_commit = None }
let update_latest_commit t commit = t.latest_commit <- Some commit

let latest_parents t =
match t.latest_commit with None -> [] | Some c -> Store.Commit.parents c

let latest_tree t =
match t.latest_commit with
| None -> Store.Tree.empty ()
| Some c -> Store.Commit.tree c

let mark_next_gc_commit t = t.next_gc_commit <- t.latest_commit
end

(** Demonstrate running GC on a previous commit aligned to the end of a chunk
for ideal GC space reclamation. *)
let run_gc repo tracker =
let* () =
match Tracker.(tracker.next_gc_commit) with
| None -> Lwt.return_unit
| Some commit -> (
let finished = function
| Ok stats ->
let duration =
Irmin_pack_unix.Stats.Latest_gc.total_duration stats
in
let finalise_duration =
Irmin_pack_unix.Stats.Latest_gc.finalise_duration stats
in
Printf.printf
"GC finished in %.4fs. Finalise took %.4fs. Size of repo: \
%.2fMB.\n"
duration finalise_duration
(megabytes_of_path Repo_config.root)
|> Lwt.return
| Error (`Msg err) -> print_endline err |> Lwt.return
in
Printf.printf
"GC finished in %.4fs. Finalise took %.4fs. Size of repo: %.2fMB.\n"
duration finalise_duration
(megabytes_of_path Repo_config.root)
|> Lwt.return
| Error (`Msg err) -> print_endline err |> Lwt.return
(* Launch GC *)
let commit_key = Store.Commit.key commit in
let+ launched = Store.Gc.run ~finished repo commit_key in
match launched with
| Ok false ->
Printf.printf "GC did not launch. Already running? %B\n"
(Store.Gc.is_finished repo = false)
| Ok true ->
Printf.printf "GC started. Size of repo: %.2fMB\n"
(megabytes_of_path Repo_config.root)
| Error (`Msg err) -> print_endline err)
in
let+ launched = Store.Gc.run ~finished repo head_key in
match launched with
| Ok false ->
Printf.printf "GC did not launch. Already running? %B\n"
(Store.Gc.is_finished repo = false)
| Ok true ->
Printf.printf "GC started. Size of repo: %.2fMB\n"
(megabytes_of_path Repo_config.root)
| Error (`Msg err) -> print_endline err
(* Create new split and mark the latest commit to be the next GC commit. *)
let () = Store.split repo in
Tracker.mark_next_gc_commit tracker |> Lwt.return

let main () =
let run_gc = true in
let num_of_commits = 200_000 in
let gc_every = 5_000 in
let gc_every = 1_000 in
let* repo = Store.Repo.v Repo_config.config in
let* main_branch = Store.main repo in
let tracker = Tracker.v () in
(* Create commits *)
let* _ =
let rec loop i n =
let key = Printf.sprintf "hello%d" i in
let key = "hello" in
let value = Printf.sprintf "packfile%d" i in
let* _ =
Store.set ~info:(info "add %s = %s" key value) main_branch [ key ] value
let* tree = Store.Tree.add (Tracker.latest_tree tracker) [ key ] value in
let parents = Tracker.latest_parents tracker in
let* commit =
Store.Commit.v repo ~info:(info "add %s = %s" key value) ~parents tree
in
Tracker.update_latest_commit tracker commit;
let* _ =
if run_gc && i mod gc_every = 0 then gc_all_but_head repo main_branch
else Lwt.return_unit
if i mod gc_every = 0 then run_gc repo tracker else Lwt.return_unit
in
if i >= n then Lwt.return_unit else loop (i + 1) n
in
loop 0 num_of_commits
loop 1 num_of_commits
in
(* A GC may still be running. Wait for GC to finish before ending the process *)
let* _ = Store.Gc.wait repo in
Expand Down
61 changes: 59 additions & 2 deletions src/irmin-pack/unix/chunked_suffix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
Raises `Read_out_of_bounds exception. *)

val fold :
(acc:'a -> is_appendable:bool -> chunk:chunk -> 'a) -> 'a -> t -> 'a

val open_ :
start_idx:int ->
chunk_num:int ->
Expand All @@ -74,6 +77,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
(unit, [> add_new_error ]) result

val length : t -> int63
(** [length t] is the length of bytes for all chunks *)

val start_idx : t -> int
(** [start_idx t] is the idx of the first chunk *)

val count : t -> int
(** [count t] is the number of chunks *)
end = struct
type t = { mutable chunks : chunk Array.t }

Expand All @@ -100,6 +110,14 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let is_legacy chunk_idx = chunk_idx = 0

let fold f acc t =
let appendable_idx = (appendable t).idx in
Array.fold_left
(fun acc chunk ->
let is_appendable = chunk.idx = appendable_idx in
f ~acc ~is_appendable ~chunk)
acc t.chunks

let open_ ~start_idx ~chunk_num ~open_chunk =
let off_acc = ref Int63.zero in
let create_chunk i =
Expand Down Expand Up @@ -170,6 +188,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let length t =
let open Int63.Syntax in
Array.fold_left (fun sum c -> sum + Ao.end_poff c.ao) Int63.zero t.chunks

let count t = Array.length t.chunks
let start_idx t = t.chunks.(0).idx
end

type t = { inventory : Inventory.t; root : string; dead_header_size : int }
Expand Down Expand Up @@ -234,13 +255,40 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in
{ inventory; root; dead_header_size }

let start_idx t = Inventory.start_idx t.inventory
let chunk_num t = Inventory.count t.inventory
let appendable_ao t = (Inventory.appendable t.inventory).ao
let end_poff t = appendable_ao t |> Ao.end_poff
let length t = Inventory.length t.inventory

let read_exn t ~off ~len buf =
let chunk, poff = Inventory.find ~off t.inventory in
Ao.read_exn chunk.ao ~off:poff ~len buf
let rec read progress_off suffix_off len_requested =
let open Int63.Syntax in
(* Find chunk with [suffix_off] and calculate length we can read. *)
let chunk, poff = Inventory.find ~off:suffix_off t.inventory in
let chunk_end_poff = Ao.end_poff chunk.ao in
let read_end_poff = poff + len_requested in
let len_read =
if read_end_poff > chunk_end_poff then chunk_end_poff - poff
else len_requested
in

(* Perform read. If this is the first read, we can use [buf]; otherwise,
we create a new buffer and transfer after the read. *)
let len_i = Int63.to_int len_read in
let is_first_read = progress_off = Int63.zero in
let ao_buf = if is_first_read then buf else Bytes.create len_i in
Ao.read_exn chunk.ao ~off:poff ~len:len_i ao_buf;
if not is_first_read then
Bytes.blit ao_buf 0 buf (Int63.to_int progress_off) len_i;

(* Read more if any is [rem]aining. *)
let rem = len_requested - len_read in
if rem > Int63.zero then
read (progress_off + len_read) (suffix_off + len_read) rem
else ()
in
read Int63.zero off (Int63.of_int len)

let append_exn t s = Ao.append_exn (appendable_ao t) s

Expand Down Expand Up @@ -277,4 +325,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let readonly t = appendable_ao t |> Ao.readonly
let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold

let fold_chunks f acc t =
Inventory.fold
(fun ~acc ~is_appendable ~chunk ->
let len = Ao.end_poff chunk.ao in
let start_suffix_off = chunk.suffix_off in
let end_suffix_off = Int63.Syntax.(start_suffix_off + len) in
f ~acc ~idx:chunk.idx ~start_suffix_off ~end_suffix_off ~is_appendable)
acc t.inventory
end
13 changes: 13 additions & 0 deletions src/irmin-pack/unix/chunked_suffix_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ module type S = sig
t ->
(unit, [> add_new_error ]) result

val start_idx : t -> int
val chunk_num : t -> int
val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result
val empty_buffer : t -> bool
val flush : t -> (unit, [> Io.write_error ]) result
Expand All @@ -98,6 +100,17 @@ module type S = sig
val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result
val readonly : t -> bool
val auto_flush_threshold : t -> int option

val fold_chunks :
(acc:'a ->
idx:int ->
start_suffix_off:int63 ->
end_suffix_off:int63 ->
is_appendable:bool ->
'a) ->
'a ->
t ->
'a
end

module type Sigs = sig
Expand Down
37 changes: 27 additions & 10 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
module Control = Fm.Control

type t = { fm : Fm.t }
type location = Prefix | Suffix [@@deriving irmin]
type location = Prefix | Suffix [@@deriving irmin ~pp]

type accessor = { poff : int63; len : int63; location : location }
[@@deriving irmin]
Expand Down Expand Up @@ -66,27 +66,41 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
assert false
| Gced { suffix_start_offset; _ } -> suffix_start_offset

let suffix_dead_bytes t =
let pl = Control.payload (Fm.control t.fm) in
match pl.status with
| Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
| No_gc_yet ->
Int63.zero
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
assert false
| Gced { suffix_dead_bytes; _ } -> suffix_dead_bytes

(* The suffix only know the real offsets, it is in the dispatcher that global
offsets are translated into real ones (i.e. in prefix or suffix offsets). *)
let end_offset t =
let open Int63.Syntax in
Suffix.length (Fm.suffix t.fm) + suffix_start_offset t
Suffix.length (Fm.suffix t.fm) + suffix_start_offset t - suffix_dead_bytes t

module Suffix_arithmetic = struct
(* Adjust the read in suffix, as the global offset [off] is
[off] = [suffix_start_offset] + [suffix_offset]. *)
let poff_of_off t off =
[off] = [suffix_start_offset] + [soff] - [suffix_dead_bytes]. *)
let soff_of_off t off =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
off - suffix_start_offset
let suffix_dead_bytes = suffix_dead_bytes t in
off - suffix_start_offset + suffix_dead_bytes

let off_of_poff t suffix_off =
let off_of_soff t soff =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
suffix_off + suffix_start_offset
let suffix_dead_bytes = suffix_dead_bytes t in
suffix_start_offset + soff - suffix_dead_bytes
end

let offset_of_suffix_poff = Suffix_arithmetic.off_of_poff
let offset_of_soff = Suffix_arithmetic.off_of_soff
let soff_of_offset = Suffix_arithmetic.soff_of_off

module Prefix_arithmetic = struct
(* Find the last chunk which is before [off_start] (or at [off_start]). If no
Expand Down Expand Up @@ -162,7 +176,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
if entry_end_offset > end_offset t then
raise (Errors.Pack_error `Read_out_of_bounds)
else
let poff = Suffix_arithmetic.poff_of_off t off in
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }

let v_in_prefix_exn mapping ~off ~len =
Expand All @@ -184,7 +198,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
else if bytes_after_off > max_len then max_len
else bytes_after_off
in
let poff = Suffix_arithmetic.poff_of_off t off in
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }

let v_range_in_prefix_exn t ~off ~min_len ~max_len =
Expand All @@ -211,6 +225,9 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
end

let read_exn t { poff; len; location } buf =
[%log.debug
"read_exn in %a at %a for %a" (Irmin.Type.pp location_t) location Int63.pp
poff Int63.pp len];
assert (len <= Int63.of_int Stdlib.max_int);
(* This assetion cannot be triggered because:
Expand Down
10 changes: 7 additions & 3 deletions src/irmin-pack/unix/dispatcher_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,13 @@ module type S = sig
(** [suffix_start_offset] is the offsets of the first pack entry in the
suffix. All pack entries in the prefix fit below [suffix_start_offset]. *)

val offset_of_suffix_poff : t -> int63 -> int63
(** [offset_of_suffix_poff t suffix_off] converts a suffix offset into a
(global) offset. *)
val offset_of_soff : t -> int63 -> int63
(** [offset_of_soff t suffix_off] converts a suffix offset into a (global)
offset. *)

val soff_of_offset : t -> int63 -> int63
(** [soff_of_offset t global_offset] converts a global offset to a suffix
offset. *)

val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit
(** [read_bytes_exn] reads a slice of the global offset space defined by [off]
Expand Down
Loading

0 comments on commit 98e0ed0

Please sign in to comment.