Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

irmin-pack: update GC to work with chunked suffix #2126

Merged
merged 3 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
- Detecting control file corruption with a checksum (#2119, @art-w)
- Change on-disk layout of the suffix from a single file to a multiple,
chunked file design (#2115, @metanivek)
- Modify GC to work with new chunked suffix. See `examples/gc.ml` for a
demonstration of how it works with the new `split` function. (#2126,
@metanivek)

### Fixed

Expand Down
103 changes: 69 additions & 34 deletions examples/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ module Repo_config = struct
end

(** Utility for creating commit info *)
let info fmt = Irmin_unix.info ~author:"pack example" fmt
let info fmt key value = Irmin_unix.info ~author:"pack example" fmt key value ()

(** Utility for computing the size of a directory *)
let rec megabytes_of_path path =
Expand All @@ -95,54 +95,89 @@ let rec megabytes_of_path path =
0. (Sys.readdir path)
else float_of_int Unix.((stat path).st_size) /. 1e6

(** Demonstrate running GC on all commits before the head *)
let gc_all_but_head repo branch =
let* head = Store.Head.get branch in
let head_key = Store.Commit.key head in
let finished = function
| Ok stats ->
let duration = Irmin_pack_unix.Stats.Latest_gc.total_duration stats in
let finalise_duration =
Irmin_pack_unix.Stats.Latest_gc.finalise_duration stats
(** A utility module for tracking the latest commit and the commit we will want
to run GC for. *)
module Tracker = struct
type t = {
mutable latest_commit : Store.commit option;
mutable next_gc_commit : Store.commit option;
}

let v () = { latest_commit = None; next_gc_commit = None }
let update_latest_commit t commit = t.latest_commit <- Some commit

let latest_parents t =
match t.latest_commit with None -> [] | Some c -> Store.Commit.parents c

let latest_tree t =
match t.latest_commit with
| None -> Store.Tree.empty ()
| Some c -> Store.Commit.tree c

let mark_next_gc_commit t = t.next_gc_commit <- t.latest_commit
end

(** Demonstrate running GC on a previous commit aligned to the end of a chunk
for ideal GC space reclamation. *)
let run_gc repo tracker =
let* () =
match Tracker.(tracker.next_gc_commit) with
| None -> Lwt.return_unit
| Some commit -> (
let finished = function
| Ok stats ->
let duration =
Irmin_pack_unix.Stats.Latest_gc.total_duration stats
in
let finalise_duration =
Irmin_pack_unix.Stats.Latest_gc.finalise_duration stats
in
Printf.printf
"GC finished in %.4fs. Finalise took %.4fs. Size of repo: \
%.2fMB.\n"
duration finalise_duration
(megabytes_of_path Repo_config.root)
|> Lwt.return
| Error (`Msg err) -> print_endline err |> Lwt.return
in
Printf.printf
"GC finished in %.4fs. Finalise took %.4fs. Size of repo: %.2fMB.\n"
duration finalise_duration
(megabytes_of_path Repo_config.root)
|> Lwt.return
| Error (`Msg err) -> print_endline err |> Lwt.return
(* Launch GC *)
let commit_key = Store.Commit.key commit in
let+ launched = Store.Gc.run ~finished repo commit_key in
match launched with
| Ok false ->
Printf.printf "GC did not launch. Already running? %B\n"
(Store.Gc.is_finished repo = false)
| Ok true ->
Printf.printf "GC started. Size of repo: %.2fMB\n"
(megabytes_of_path Repo_config.root)
| Error (`Msg err) -> print_endline err)
in
let+ launched = Store.Gc.run ~finished repo head_key in
match launched with
| Ok false ->
Printf.printf "GC did not launch. Already running? %B\n"
(Store.Gc.is_finished repo = false)
| Ok true ->
Printf.printf "GC started. Size of repo: %.2fMB\n"
(megabytes_of_path Repo_config.root)
| Error (`Msg err) -> print_endline err
(* Create new split and mark the latest commit to be the next GC commit. *)
let () = Store.split repo in
Tracker.mark_next_gc_commit tracker |> Lwt.return

let main () =
let run_gc = true in
let num_of_commits = 200_000 in
let gc_every = 5_000 in
let gc_every = 1_000 in
let* repo = Store.Repo.v Repo_config.config in
let* main_branch = Store.main repo in
let tracker = Tracker.v () in
(* Create commits *)
let* _ =
let rec loop i n =
let key = Printf.sprintf "hello%d" i in
let key = "hello" in
let value = Printf.sprintf "packfile%d" i in
let* _ =
Store.set ~info:(info "add %s = %s" key value) main_branch [ key ] value
let* tree = Store.Tree.add (Tracker.latest_tree tracker) [ key ] value in
let parents = Tracker.latest_parents tracker in
let* commit =
Store.Commit.v repo ~info:(info "add %s = %s" key value) ~parents tree
in
Tracker.update_latest_commit tracker commit;
let* _ =
if run_gc && i mod gc_every = 0 then gc_all_but_head repo main_branch
else Lwt.return_unit
if i mod gc_every = 0 then run_gc repo tracker else Lwt.return_unit
in
if i >= n then Lwt.return_unit else loop (i + 1) n
in
loop 0 num_of_commits
loop 1 num_of_commits
in
(* A GC may still be running. Wait for GC to finish before ending the process *)
let* _ = Store.Gc.wait repo in
Expand Down
61 changes: 59 additions & 2 deletions src/irmin-pack/unix/chunked_suffix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
Raises `Read_out_of_bounds exception. *)

val fold :
(acc:'a -> is_appendable:bool -> chunk:chunk -> 'a) -> 'a -> t -> 'a

val open_ :
start_idx:int ->
chunk_num:int ->
Expand All @@ -74,6 +77,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
(unit, [> add_new_error ]) result

val length : t -> int63
(** [length t] is the length of bytes for all chunks *)

val start_idx : t -> int
(** [start_idx t] is the idx of the first chunk *)

val count : t -> int
(** [count t] is the number of chunks *)
end = struct
type t = { mutable chunks : chunk Array.t }

Expand All @@ -100,6 +110,14 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let is_legacy chunk_idx = chunk_idx = 0

let fold f acc t =
let appendable_idx = (appendable t).idx in
Array.fold_left
(fun acc chunk ->
let is_appendable = chunk.idx = appendable_idx in
f ~acc ~is_appendable ~chunk)
acc t.chunks

let open_ ~start_idx ~chunk_num ~open_chunk =
let off_acc = ref Int63.zero in
let create_chunk i =
Expand Down Expand Up @@ -170,6 +188,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let length t =
let open Int63.Syntax in
Array.fold_left (fun sum c -> sum + Ao.end_poff c.ao) Int63.zero t.chunks

let count t = Array.length t.chunks
let start_idx t = t.chunks.(0).idx
end

type t = { inventory : Inventory.t; root : string; dead_header_size : int }
Expand Down Expand Up @@ -234,13 +255,40 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in
{ inventory; root; dead_header_size }

let start_idx t = Inventory.start_idx t.inventory
let chunk_num t = Inventory.count t.inventory
let appendable_ao t = (Inventory.appendable t.inventory).ao
let end_poff t = appendable_ao t |> Ao.end_poff
let length t = Inventory.length t.inventory

let read_exn t ~off ~len buf =
let chunk, poff = Inventory.find ~off t.inventory in
Ao.read_exn chunk.ao ~off:poff ~len buf
let rec read progress_off suffix_off len_requested =
let open Int63.Syntax in
(* Find chunk with [suffix_off] and calculate length we can read. *)
let chunk, poff = Inventory.find ~off:suffix_off t.inventory in
let chunk_end_poff = Ao.end_poff chunk.ao in
let read_end_poff = poff + len_requested in
let len_read =
if read_end_poff > chunk_end_poff then chunk_end_poff - poff
else len_requested
in

(* Perform read. If this is the first read, we can use [buf]; otherwise,
we create a new buffer and transfer after the read. *)
let len_i = Int63.to_int len_read in
let is_first_read = progress_off = Int63.zero in
let ao_buf = if is_first_read then buf else Bytes.create len_i in
icristescu marked this conversation as resolved.
Show resolved Hide resolved
Ao.read_exn chunk.ao ~off:poff ~len:len_i ao_buf;
if not is_first_read then
Bytes.blit ao_buf 0 buf (Int63.to_int progress_off) len_i;

(* Read more if any is [rem]aining. *)
let rem = len_requested - len_read in
if rem > Int63.zero then
read (progress_off + len_read) (suffix_off + len_read) rem
else ()
in
read Int63.zero off (Int63.of_int len)

let append_exn t s = Ao.append_exn (appendable_ao t) s

Expand Down Expand Up @@ -277,4 +325,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let readonly t = appendable_ao t |> Ao.readonly
let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold

let fold_chunks f acc t =
Inventory.fold
(fun ~acc ~is_appendable ~chunk ->
let len = Ao.end_poff chunk.ao in
let start_suffix_off = chunk.suffix_off in
let end_suffix_off = Int63.Syntax.(start_suffix_off + len) in
f ~acc ~idx:chunk.idx ~start_suffix_off ~end_suffix_off ~is_appendable)
acc t.inventory
end
13 changes: 13 additions & 0 deletions src/irmin-pack/unix/chunked_suffix_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ module type S = sig
t ->
(unit, [> add_new_error ]) result

val start_idx : t -> int
val chunk_num : t -> int
val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result
val empty_buffer : t -> bool
val flush : t -> (unit, [> Io.write_error ]) result
Expand All @@ -98,6 +100,17 @@ module type S = sig
val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result
val readonly : t -> bool
val auto_flush_threshold : t -> int option

val fold_chunks :
(acc:'a ->
idx:int ->
start_suffix_off:int63 ->
end_suffix_off:int63 ->
is_appendable:bool ->
'a) ->
'a ->
t ->
'a
end

module type Sigs = sig
Expand Down
37 changes: 27 additions & 10 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
module Control = Fm.Control

type t = { fm : Fm.t }
type location = Prefix | Suffix [@@deriving irmin]
type location = Prefix | Suffix [@@deriving irmin ~pp]

type accessor = { poff : int63; len : int63; location : location }
[@@deriving irmin]
Expand Down Expand Up @@ -66,27 +66,41 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
assert false
| Gced { suffix_start_offset; _ } -> suffix_start_offset

let suffix_dead_bytes t =
let pl = Control.payload (Fm.control t.fm) in
match pl.status with
| Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
| No_gc_yet ->
Int63.zero
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
assert false
| Gced { suffix_dead_bytes; _ } -> suffix_dead_bytes

(* The suffix only know the real offsets, it is in the dispatcher that global
offsets are translated into real ones (i.e. in prefix or suffix offsets). *)
let end_offset t =
let open Int63.Syntax in
Suffix.length (Fm.suffix t.fm) + suffix_start_offset t
Suffix.length (Fm.suffix t.fm) + suffix_start_offset t - suffix_dead_bytes t

module Suffix_arithmetic = struct
(* Adjust the read in suffix, as the global offset [off] is
[off] = [suffix_start_offset] + [suffix_offset]. *)
let poff_of_off t off =
[off] = [suffix_start_offset] + [soff] - [suffix_dead_bytes]. *)
let soff_of_off t off =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
off - suffix_start_offset
let suffix_dead_bytes = suffix_dead_bytes t in
off - suffix_start_offset + suffix_dead_bytes

let off_of_poff t suffix_off =
let off_of_soff t soff =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
suffix_off + suffix_start_offset
let suffix_dead_bytes = suffix_dead_bytes t in
suffix_start_offset + soff - suffix_dead_bytes
end

let offset_of_suffix_poff = Suffix_arithmetic.off_of_poff
let offset_of_soff = Suffix_arithmetic.off_of_soff
let soff_of_offset = Suffix_arithmetic.soff_of_off

module Prefix_arithmetic = struct
(* Find the last chunk which is before [off_start] (or at [off_start]). If no
Expand Down Expand Up @@ -162,7 +176,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
if entry_end_offset > end_offset t then
raise (Errors.Pack_error `Read_out_of_bounds)
else
let poff = Suffix_arithmetic.poff_of_off t off in
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }

let v_in_prefix_exn mapping ~off ~len =
Expand All @@ -184,7 +198,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
else if bytes_after_off > max_len then max_len
else bytes_after_off
in
let poff = Suffix_arithmetic.poff_of_off t off in
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }

let v_range_in_prefix_exn t ~off ~min_len ~max_len =
Expand All @@ -211,6 +225,9 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
end

let read_exn t { poff; len; location } buf =
[%log.debug
"read_exn in %a at %a for %a" (Irmin.Type.pp location_t) location Int63.pp
poff Int63.pp len];
assert (len <= Int63.of_int Stdlib.max_int);
(* This assetion cannot be triggered because:
Expand Down
10 changes: 7 additions & 3 deletions src/irmin-pack/unix/dispatcher_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,13 @@ module type S = sig
(** [suffix_start_offset] is the offsets of the first pack entry in the
suffix. All pack entries in the prefix fit below [suffix_start_offset]. *)

val offset_of_suffix_poff : t -> int63 -> int63
(** [offset_of_suffix_poff t suffix_off] converts a suffix offset into a
(global) offset. *)
val offset_of_soff : t -> int63 -> int63
(** [offset_of_soff t suffix_off] converts a suffix offset into a (global)
offset. *)

val soff_of_offset : t -> int63 -> int63
(** [soff_of_offset t global_offset] converts a global offset to a suffix
offset. *)

val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit
(** [read_bytes_exn] reads a slice of the global offset space defined by [off]
Expand Down
Loading