From 7d166c332f14479338819866830238a1a6b57570 Mon Sep 17 00:00:00 2001 From: metanivek Date: Fri, 21 Oct 2022 14:16:56 -0400 Subject: [PATCH 1/3] irmin-pack: update GC for chunked suffix --- src/irmin-pack/unix/chunked_suffix.ml | 61 +++++++- src/irmin-pack/unix/chunked_suffix_intf.ml | 13 ++ src/irmin-pack/unix/dispatcher.ml | 37 +++-- src/irmin-pack/unix/dispatcher_intf.ml | 10 +- src/irmin-pack/unix/file_manager.ml | 44 ++---- src/irmin-pack/unix/file_manager_intf.ml | 13 +- src/irmin-pack/unix/gc.ml | 112 ++++++--------- src/irmin-pack/unix/gc_worker.ml | 144 +++++++++++-------- src/irmin-pack/unix/gc_worker.mli | 15 +- src/irmin-pack/unix/import.ml | 8 ++ test/irmin-pack/test_gc.ml | 153 +++++++++++++++++---- 11 files changed, 397 insertions(+), 213 deletions(-) diff --git a/src/irmin-pack/unix/chunked_suffix.ml b/src/irmin-pack/unix/chunked_suffix.ml index ca78f4b989..f35da88443 100644 --- a/src/irmin-pack/unix/chunked_suffix.ml +++ b/src/irmin-pack/unix/chunked_suffix.ml @@ -52,6 +52,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct Raises `Read_out_of_bounds exception. *) + val fold : + (acc:'a -> is_appendable:bool -> chunk:chunk -> 'a) -> 'a -> t -> 'a + val open_ : start_idx:int -> chunk_num:int -> @@ -74,6 +77,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct (unit, [> add_new_error ]) result val length : t -> int63 + (** [length t] is the length of bytes for all chunks *) + + val start_idx : t -> int + (** [start_idx t] is the idx of the first chunk *) + + val count : t -> int + (** [count t] is the number of chunks *) end = struct type t = { mutable chunks : chunk Array.t } @@ -100,6 +110,14 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct let is_legacy chunk_idx = chunk_idx = 0 + let fold f acc t = + let appendable_idx = (appendable t).idx in + Array.fold_left + (fun acc chunk -> + let is_appendable = chunk.idx = appendable_idx in + f ~acc ~is_appendable ~chunk) + acc t.chunks + let open_ ~start_idx ~chunk_num ~open_chunk = let off_acc = ref Int63.zero in let create_chunk i = @@ -170,6 +188,9 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct let length t = let open Int63.Syntax in Array.fold_left (fun sum c -> sum + Ao.end_poff c.ao) Int63.zero t.chunks + + let count t = Array.length t.chunks + let start_idx t = t.chunks.(0).idx end type t = { inventory : Inventory.t; root : string; dead_header_size : int } @@ -234,13 +255,40 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in { inventory; root; dead_header_size } + let start_idx t = Inventory.start_idx t.inventory + let chunk_num t = Inventory.count t.inventory let appendable_ao t = (Inventory.appendable t.inventory).ao let end_poff t = appendable_ao t |> Ao.end_poff let length t = Inventory.length t.inventory let read_exn t ~off ~len buf = - let chunk, poff = Inventory.find ~off t.inventory in - Ao.read_exn chunk.ao ~off:poff ~len buf + let rec read progress_off suffix_off len_requested = + let open Int63.Syntax in + (* Find chunk with [suffix_off] and calculate length we can read. *) + let chunk, poff = Inventory.find ~off:suffix_off t.inventory in + let chunk_end_poff = Ao.end_poff chunk.ao in + let read_end_poff = poff + len_requested in + let len_read = + if read_end_poff > chunk_end_poff then chunk_end_poff - poff + else len_requested + in + + (* Perform read. If this is the first read, we can use [buf]; otherwise, + we create a new buffer and transfer after the read. *) + let len_i = Int63.to_int len_read in + let is_first_read = progress_off = Int63.zero in + let ao_buf = if is_first_read then buf else Bytes.create len_i in + Ao.read_exn chunk.ao ~off:poff ~len:len_i ao_buf; + if not is_first_read then + Bytes.blit ao_buf 0 buf (Int63.to_int progress_off) len_i; + + (* Read more if any is [rem]aining. *) + let rem = len_requested - len_read in + if rem > Int63.zero then + read (progress_off + len_read) (suffix_off + len_read) rem + else () + in + read Int63.zero off (Int63.of_int len) let append_exn t s = Ao.append_exn (appendable_ao t) s @@ -277,4 +325,13 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct let readonly t = appendable_ao t |> Ao.readonly let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold + + let fold_chunks f acc t = + Inventory.fold + (fun ~acc ~is_appendable ~chunk -> + let len = Ao.end_poff chunk.ao in + let start_suffix_off = chunk.suffix_off in + let end_suffix_off = Int63.Syntax.(start_suffix_off + len) in + f ~acc ~idx:chunk.idx ~start_suffix_off ~end_suffix_off ~is_appendable) + acc t.inventory end diff --git a/src/irmin-pack/unix/chunked_suffix_intf.ml b/src/irmin-pack/unix/chunked_suffix_intf.ml index bca010ede8..570d6ea0b2 100644 --- a/src/irmin-pack/unix/chunked_suffix_intf.ml +++ b/src/irmin-pack/unix/chunked_suffix_intf.ml @@ -78,6 +78,8 @@ module type S = sig t -> (unit, [> add_new_error ]) result + val start_idx : t -> int + val chunk_num : t -> int val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result val empty_buffer : t -> bool val flush : t -> (unit, [> Io.write_error ]) result @@ -98,6 +100,17 @@ module type S = sig val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result val readonly : t -> bool val auto_flush_threshold : t -> int option + + val fold_chunks : + (acc:'a -> + idx:int -> + start_suffix_off:int63 -> + end_suffix_off:int63 -> + is_appendable:bool -> + 'a) -> + 'a -> + t -> + 'a end module type Sigs = sig diff --git a/src/irmin-pack/unix/dispatcher.ml b/src/irmin-pack/unix/dispatcher.ml index 9d46867211..510fe8e367 100644 --- a/src/irmin-pack/unix/dispatcher.ml +++ b/src/irmin-pack/unix/dispatcher.ml @@ -29,7 +29,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) : module Control = Fm.Control type t = { fm : Fm.t } - type location = Prefix | Suffix [@@deriving irmin] + type location = Prefix | Suffix [@@deriving irmin ~pp] type accessor = { poff : int63; len : int63; location : location } [@@deriving irmin] @@ -66,27 +66,41 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) : assert false | Gced { suffix_start_offset; _ } -> suffix_start_offset + let suffix_dead_bytes t = + let pl = Control.payload (Fm.control t.fm) in + match pl.status with + | Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy + | No_gc_yet -> + Int63.zero + | T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14 + | T15 -> + assert false + | Gced { suffix_dead_bytes; _ } -> suffix_dead_bytes + (* The suffix only know the real offsets, it is in the dispatcher that global offsets are translated into real ones (i.e. in prefix or suffix offsets). *) let end_offset t = let open Int63.Syntax in - Suffix.length (Fm.suffix t.fm) + suffix_start_offset t + Suffix.length (Fm.suffix t.fm) + suffix_start_offset t - suffix_dead_bytes t module Suffix_arithmetic = struct (* Adjust the read in suffix, as the global offset [off] is - [off] = [suffix_start_offset] + [suffix_offset]. *) - let poff_of_off t off = + [off] = [suffix_start_offset] + [soff] - [suffix_dead_bytes]. *) + let soff_of_off t off = let open Int63.Syntax in let suffix_start_offset = suffix_start_offset t in - off - suffix_start_offset + let suffix_dead_bytes = suffix_dead_bytes t in + off - suffix_start_offset + suffix_dead_bytes - let off_of_poff t suffix_off = + let off_of_soff t soff = let open Int63.Syntax in let suffix_start_offset = suffix_start_offset t in - suffix_off + suffix_start_offset + let suffix_dead_bytes = suffix_dead_bytes t in + suffix_start_offset + soff - suffix_dead_bytes end - let offset_of_suffix_poff = Suffix_arithmetic.off_of_poff + let offset_of_soff = Suffix_arithmetic.off_of_soff + let soff_of_offset = Suffix_arithmetic.soff_of_off module Prefix_arithmetic = struct (* Find the last chunk which is before [off_start] (or at [off_start]). If no @@ -162,7 +176,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) : if entry_end_offset > end_offset t then raise (Errors.Pack_error `Read_out_of_bounds) else - let poff = Suffix_arithmetic.poff_of_off t off in + let poff = Suffix_arithmetic.soff_of_off t off in { poff; len; location = Suffix } let v_in_prefix_exn mapping ~off ~len = @@ -184,7 +198,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) : else if bytes_after_off > max_len then max_len else bytes_after_off in - let poff = Suffix_arithmetic.poff_of_off t off in + let poff = Suffix_arithmetic.soff_of_off t off in { poff; len; location = Suffix } let v_range_in_prefix_exn t ~off ~min_len ~max_len = @@ -211,6 +225,9 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) : end let read_exn t { poff; len; location } buf = + [%log.debug + "read_exn in %a at %a for %a" (Irmin.Type.pp location_t) location Int63.pp + poff Int63.pp len]; assert (len <= Int63.of_int Stdlib.max_int); (* This assetion cannot be triggered because: diff --git a/src/irmin-pack/unix/dispatcher_intf.ml b/src/irmin-pack/unix/dispatcher_intf.ml index e209222ff8..86560bca24 100644 --- a/src/irmin-pack/unix/dispatcher_intf.ml +++ b/src/irmin-pack/unix/dispatcher_intf.ml @@ -84,9 +84,13 @@ module type S = sig (** [suffix_start_offset] is the offsets of the first pack entry in the suffix. All pack entries in the prefix fit below [suffix_start_offset]. *) - val offset_of_suffix_poff : t -> int63 -> int63 - (** [offset_of_suffix_poff t suffix_off] converts a suffix offset into a - (global) offset. *) + val offset_of_soff : t -> int63 -> int63 + (** [offset_of_soff t suffix_off] converts a suffix offset into a (global) + offset. *) + + val soff_of_offset : t -> int63 -> int63 + (** [soff_of_offset t global_offset] converts a global offset to a suffix + offset. *) val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit (** [read_bytes_exn] reads a slice of the global offset space defined by [off] diff --git a/src/irmin-pack/unix/file_manager.ml b/src/irmin-pack/unix/file_manager.ml index 90749322da..5092918832 100644 --- a/src/irmin-pack/unix/file_manager.ml +++ b/src/irmin-pack/unix/file_manager.ml @@ -410,16 +410,11 @@ struct let chunk_start_idx0 = pl0.chunk_start_idx in let chunk_start_idx1 = pl1.chunk_start_idx in let* () = - if - chunk_num0 <> chunk_num1 - || chunk_start_idx0 <> chunk_start_idx1 - (* TODO: remove this last check, kept for passing the tests. *) - || gen0 <> gen1 + if chunk_num0 <> chunk_num1 || chunk_start_idx0 <> chunk_start_idx1 then let end_poff = pl1.suffix_end_poff in - (* TODO: remove this max, kept for passing the tests. *) - let chunk_start_idx = max chunk_start_idx1 gen1 in - reopen_suffix t ~chunk_start_idx ~end_poff ~chunk_num:chunk_num1 + reopen_suffix t ~chunk_start_idx:chunk_start_idx1 ~end_poff + ~chunk_num:chunk_num1 else Ok () in if gen0 = gen1 then Ok () @@ -692,33 +687,22 @@ struct | `Unknown_major_pack_version _ ) as e -> e) - let swap t ~generation ~new_suffix_start_offset ~new_suffix_end_offset - ~latest_gc_target_offset = + let swap t ~generation ~suffix_start_offset ~chunk_start_idx ~chunk_num + ~suffix_dead_bytes ~latest_gc_target_offset = let open Result_syntax in [%log.debug - "Gc in main: swap %d %#d %#d" generation - (Int63.to_int new_suffix_start_offset) - (Int63.to_int new_suffix_end_offset)]; + "Gc in main: swap gen %d; suffix start %a; chunk start idx %d; chunk num \ + %d; suffix dead bytes %a" + generation Int63.pp suffix_start_offset chunk_start_idx chunk_num Int63.pp + suffix_dead_bytes]; let c0 = Mtime_clock.counter () in let pl = Control.payload t.control in - (* Opening the suffix requires passing it its length. We compute it from the - global offsets - TODO: this calculation of [suffix_end_poff] only works with one chunk. - this code will be removed once we have chunk GC. - *) - let suffix_end_poff = - let open Int63.Syntax in - new_suffix_end_offset - new_suffix_start_offset - in (* Step 1. Reopen files *) let* () = reopen_prefix t ~generation in let* () = reopen_mapping t ~generation in - (* TODO: remove this max, kept for the tests. *) - let chunk_start_idx = max pl.chunk_start_idx generation in let* () = - reopen_suffix t ~chunk_start_idx ~end_poff:suffix_end_poff - ~chunk_num:pl.chunk_num + reopen_suffix t ~chunk_start_idx ~chunk_num ~end_poff:pl.suffix_end_poff in let span1 = Mtime_clock.count c0 |> Mtime.Span.to_us in @@ -734,19 +718,16 @@ struct | T14 | T15 -> assert false | Gced _ | No_gc_yet -> - let suffix_start_offset = new_suffix_start_offset in Gced { suffix_start_offset; generation; latest_gc_target_offset; - suffix_dead_bytes = Int63.zero; + suffix_dead_bytes; } in - (* TODO using generation for starting chunk idx works since we only have - only one chunk at a time. this will change once we have chunk based GC. *) - { pl with status; suffix_end_poff; chunk_start_idx = generation } + { pl with status; chunk_start_idx; chunk_num } in [%log.debug "GC: writing new control_file"]; Control.set_payload t.control pl @@ -780,7 +761,6 @@ struct (* Unreachable *) assert false - (* TODO : this does not work yet with GC. *) let split t = let open Result_syntax in (* Step 1. Create a new chunk file *) diff --git a/src/irmin-pack/unix/file_manager_intf.ml b/src/irmin-pack/unix/file_manager_intf.ml index 329983d73e..65b65e0e53 100644 --- a/src/irmin-pack/unix/file_manager_intf.ml +++ b/src/irmin-pack/unix/file_manager_intf.ml @@ -241,13 +241,16 @@ module type S = sig val swap : t -> generation:int -> - new_suffix_start_offset:int63 -> - new_suffix_end_offset:int63 -> + suffix_start_offset:int63 -> + chunk_start_idx:int -> + chunk_num:int -> + suffix_dead_bytes:int63 -> latest_gc_target_offset:int63 -> (unit, [> Errs.t ]) result - (** Swaps to using files from the GC [generation]. The offsets - [new_suffix_start_offset] and [new_suffix_end_offset] are used to properly - load the suffix. The control file is also updated. *) + (** Swaps to using files from the GC [generation]. The values + [suffix_start_offset], [chunk_start_idx], [chunk_num], and + [suffix_dead_bytes] are used to properly load and read the suffix after a + GC. The control file is also updated on disk. *) val readonly : t -> bool val generation : t -> int diff --git a/src/irmin-pack/unix/gc.ml b/src/irmin-pack/unix/gc.ml index 93d704a5c2..8445fd275e 100644 --- a/src/irmin-pack/unix/gc.ml +++ b/src/irmin-pack/unix/gc.ml @@ -29,7 +29,7 @@ module Make (Args : Gc_args.S) = struct generation : int; task : Async.t; unlink : bool; - offset : int63; + new_suffix_start_offset : int63; resolver : (Stats.Latest_gc.stats, Errs.t) result Lwt.u; promise : (Stats.Latest_gc.stats, Errs.t) result Lwt.t; dispatcher : Dispatcher.t; @@ -44,7 +44,7 @@ module Make (Args : Gc_args.S) = struct let v ~root ~generation ~unlink ~dispatcher ~fm ~contents ~node ~commit commit_key = - let offset, latest_gc_target_offset = + let new_suffix_start_offset, latest_gc_target_offset = let state : _ Pack_key.state = Pack_key.inspect commit_key in match state with | Direct x -> @@ -55,19 +55,14 @@ module Make (Args : Gc_args.S) = struct assert false in let partial_stats = - let commit_offset = offset in + let commit_offset = latest_gc_target_offset in let before_suffix_start_offset = Dispatcher.suffix_start_offset dispatcher in let before_suffix_end_offset = Dispatcher.end_offset dispatcher in - let after_suffix_start_offset = - offset - (* This will not just be [offset] anymore when the commit is moved the - the prefix file. *) - in Gc_stats.Main.create "worker startup" ~commit_offset ~before_suffix_start_offset ~before_suffix_end_offset - ~after_suffix_start_offset + ~after_suffix_start_offset:new_suffix_start_offset in let unlink_result_file () = let result_file = Irmin_pack.Layout.V4.gc_result ~root ~generation in @@ -83,13 +78,13 @@ module Make (Args : Gc_args.S) = struct (* Unlink next gc's result file, in case it is on disk, for instance after a failed gc. *) unlink_result_file (); - (* function to track durations *) (* internal promise for gc *) let promise, resolver = Lwt.wait () in (* start worker task *) let task = Async.async (fun () -> - Worker.run_and_output_result root commit_key offset ~generation) + Worker.run_and_output_result root commit_key new_suffix_start_offset + ~generation) in let partial_stats = Gc_stats.Main.finish_current_step partial_stats "before finalise" @@ -99,7 +94,7 @@ module Make (Args : Gc_args.S) = struct root; generation; unlink; - offset; + new_suffix_start_offset; task; promise; resolver; @@ -113,49 +108,29 @@ module Make (Args : Gc_args.S) = struct latest_gc_target_offset; } - let open_new_suffix ~end_poff { root; generation; _ } = - let open Result_syntax in - let path = Irmin_pack.Layout.V4.suffix_chunk ~root ~chunk_idx:generation in - (* As the new suffix is necessarily in V3, the dead_header_size is - 0. *) - let dead_header_size = 0 in - let auto_flush_threshold = 1_000_000 in - let* suffix = - Ao.open_rw ~path ~end_poff ~dead_header_size - ~auto_flush_procedure:`Internal ~auto_flush_threshold - in - Ok suffix - - let transfer_latest_newies ~new_suffix_start_offset ~new_suffix_end_offset t = - [%log.debug "Gc in main: transfer latest newies"]; + let swap_and_purge t removable_chunk_num suffix_params = let open Result_syntax in - let open Int63.Syntax in - let old_suffix_end_offset = Dispatcher.end_offset t.dispatcher in - let remaining = old_suffix_end_offset - new_suffix_end_offset in - (* When opening the suffix we need to provide a physical offset. We compute - it from the global ones. *) - let suffix_end_poff = new_suffix_end_offset - new_suffix_start_offset in - let* new_suffix = open_new_suffix ~end_poff:suffix_end_poff t in - Errors.finalise (fun _ -> - Ao.close new_suffix - |> Errs.log_if_error "GC: Close suffix after copy latest newies") - @@ fun () -> - let append_exn = Ao.append_exn new_suffix in - let flush_and_raise () = Ao.flush new_suffix |> Errs.raise_if_error in - let* () = - Errs.catch (fun () -> - Dispatcher.read_bytes_exn t.dispatcher ~f:append_exn - ~off:new_suffix_end_offset ~len:remaining; - flush_and_raise ()) + let { generation; latest_gc_target_offset; _ } = t in + let Worker. + { + start_offset = suffix_start_offset; + chunk_start_idx; + dead_bytes = suffix_dead_bytes; + } = + suffix_params in - Ok old_suffix_end_offset + (* Calculate chunk num in main process since more chunks could have been + added while GC was running. GC process only tells us how many chunks are + to be removed. *) + let suffix = Fm.suffix t.fm in + let chunk_num = Fm.Suffix.chunk_num suffix - removable_chunk_num in + (* Assert that we have at least one chunk (the appendable chunk), which + is guaranteed by the GC process. *) + assert (chunk_num >= 1); - let swap_and_purge ~new_suffix_start_offset ~new_suffix_end_offset t = - let open Result_syntax in let* () = - Fm.swap t.fm ~generation:t.generation ~new_suffix_start_offset - ~new_suffix_end_offset - ~latest_gc_target_offset:t.latest_gc_target_offset + Fm.swap t.fm ~generation ~suffix_start_offset ~chunk_start_idx ~chunk_num + ~suffix_dead_bytes ~latest_gc_target_offset in (* No need to purge dict here, as it is global to the store. *) @@ -167,14 +142,16 @@ module Make (Args : Gc_args.S) = struct Commit_store.purge_lru t.commit; Ok () - let unlink_all { root; generation; _ } = + let unlink_all { root; generation; _ } removable_chunk_idxs = let result = let open Result_syntax in - (* Unlink previous suffix. *) - let suffix = - Irmin_pack.Layout.V4.suffix_chunk ~root ~chunk_idx:(generation - 1) + (* Unlink suffix chunks *) + let* () = + removable_chunk_idxs + |> List.iter_result @@ fun chunk_idx -> + let path = Irmin_pack.Layout.V4.suffix_chunk ~root ~chunk_idx in + Io.unlink path in - let* () = Io.unlink suffix in let* () = if generation >= 2 then (* Unlink previous prefix. *) @@ -257,31 +234,22 @@ module Make (Args : Gc_args.S) = struct let result = let open Result_syntax in match (status, gc_output) with - | `Success, Ok worker_stats -> - let new_suffix_end_offset_before = - Stats.Latest_gc.new_suffix_end_offset_before_finalise - worker_stats - in - let partial_stats = - Gc_stats.Main.finish_current_step partial_stats - "copy latest newies" - in - let* new_suffix_end_offset = - transfer_latest_newies ~new_suffix_start_offset:t.offset - ~new_suffix_end_offset:new_suffix_end_offset_before t - in + | ( `Success, + Ok { suffix_params; removable_chunk_idxs; stats = worker_stats } + ) -> let partial_stats = Gc_stats.Main.finish_current_step partial_stats "swap and purge" in let* () = - swap_and_purge ~new_suffix_start_offset:t.offset - ~new_suffix_end_offset t + swap_and_purge t + (List.length removable_chunk_idxs) + suffix_params in let partial_stats = Gc_stats.Main.finish_current_step partial_stats "unlink" in - if t.unlink then unlink_all t; + if t.unlink then unlink_all t removable_chunk_idxs; let partial_stats = let after_suffix_end_offset = diff --git a/src/irmin-pack/unix/gc_worker.ml b/src/irmin-pack/unix/gc_worker.ml index 1cf1d42aca..6d1c6c4cd5 100644 --- a/src/irmin-pack/unix/gc_worker.ml +++ b/src/irmin-pack/unix/gc_worker.ml @@ -105,10 +105,6 @@ module Make (Args : Gc_args.S) = struct resulting string, which we pass to write_exn. This usage is safe. *) write_exn ~off:accessor.poff ~len (Bytes.unsafe_to_string buffer) - let create_new_suffix ~root ~generation = - let path = Irmin_pack.Layout.V4.suffix_chunk ~root ~chunk_idx:generation in - Ao.create_rw_exn ~path - let report_old_file_sizes ~root ~generation stats = let open Result_syntax in let* prefix_size = @@ -122,7 +118,23 @@ module Make (Args : Gc_args.S) = struct stats := Gc_stats.Worker.add_file_size !stats "old_prefix" prefix_size; stats := Gc_stats.Worker.add_file_size !stats "old_mapping" mapping_size - let run ~generation root commit_key new_prefix_end_offset = + type suffix_params = { + start_offset : int63; + chunk_start_idx : int; + dead_bytes : int63; + } + [@@deriving irmin] + + type gc_results = { + suffix_params : suffix_params; + removable_chunk_idxs : int list; + stats : Stats.Latest_gc.worker; + } + [@@deriving irmin] + + type gc_output = (gc_results, Args.Errs.t) result [@@deriving irmin] + + let run ~generation root commit_key new_suffix_start_offset = let open Result_syntax in let config = Irmin_pack.Conf.init ~fresh:false ~readonly:true ~lru_size:0 root @@ -227,8 +239,6 @@ module Make (Args : Gc_args.S) = struct Gc_stats.Worker.add_file_size !stats "prefix" (Ao.end_poff prefix); Ao.close prefix |> Errs.log_if_error "GC: Close prefix") @@ fun () -> - (); - (* Step 5. Transfer to the new prefix, flush and close. *) [%log.debug "GC: transfering to the new prefix"]; stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer"; @@ -267,57 +277,74 @@ module Make (Args : Gc_args.S) = struct (Commit_value.parents commit) in - (* Step 6. Create the new suffix and prepare 2 functions for read and write - operations. *) - stats := Gc_stats.Worker.finish_current_step !stats "suffix: start"; - [%log.debug "GC: creating new suffix"]; - let suffix = create_new_suffix ~root ~generation in - Errors.finalise_exn (fun _outcome -> - Ao.fsync suffix - >>= (fun _ -> Ao.close suffix) - |> Errs.log_if_error "GC: Close suffix") - @@ fun () -> - let append_exn = Ao.append_exn suffix in - - (* Step 7. Transfer to the next suffix. *) - [%log.debug "GC: transfering to the new suffix"]; - stats := Gc_stats.Worker.finish_current_step !stats "suffix: transfer"; - let num_iterations = 7 in - (* [transfer_loop] is needed because after garbage collection there may be new objects - at the end of the suffix file that need to be copied over *) - let rec transfer_loop i ~off = - if i = 0 then off - else - let () = Fm.reload fm |> Errs.raise_if_error in - let pl : Payload.t = Fm.Control.payload (Fm.control fm) in - let end_offset = - Dispatcher.offset_of_suffix_poff dispatcher pl.suffix_end_poff - in - let len = Int63.Syntax.(end_offset - off) in - [%log.debug - "GC: transfer_loop iteration %d, offset %a, length %a" - (num_iterations - i + 1) - Int63.pp off Int63.pp len]; - stats := Gc_stats.Worker.add_suffix_transfer !stats len; - let () = Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len in - (* Check how many bytes are left, [4096*5] is selected because it is roughly the - number of bytes that requires a read from the block device on ext4 *) - if Int63.to_int len < 4096 * 5 then end_offset - else - let off = Int63.Syntax.(off + len) in - transfer_loop ~off (i - 1) - in - let new_end_suffix_offset = - transfer_loop ~off:new_prefix_end_offset num_iterations + (* Step 6. Calculate post-GC suffix parameters. *) + let suffix_params, removable_chunk_idxs = + stats := + Gc_stats.Worker.finish_current_step !stats + "suffix: calculate new values"; + let suffix = Fm.suffix fm in + let soff = Dispatcher.soff_of_offset dispatcher new_suffix_start_offset in + (* Step 6.1. Calculate chunks that we have GCed. *) + let open struct + type chunk = { idx : int; end_suffix_off : int63 } + end in + let removable_chunks = + match Fm.Suffix.chunk_num suffix with + | 1 -> [] (* We never remove a single chunk. *) + | _ -> + Fm.Suffix.fold_chunks + (fun ~acc ~idx ~start_suffix_off ~end_suffix_off ~is_appendable -> + (* Remove chunks that end at or before our new split point. + This will leave the chunk that starts with (or contains) the + split point but remove all previous chunks. + We never remove empty or appendable chunks. *) + let is_empty = start_suffix_off = end_suffix_off in + let ends_with_or_is_before_soff = end_suffix_off <= soff in + let is_removable = + (not is_appendable) + && (not is_empty) + && ends_with_or_is_before_soff + in + if is_removable then { idx; end_suffix_off } :: acc else acc) + [] suffix + in + (* Step 6.2. Calculate the new chunk starting idx. *) + let chunk_start_idx = + match removable_chunks with + | [] -> Fm.Suffix.start_idx suffix + | last_removed_chunk :: _ -> succ last_removed_chunk.idx + in + (* Step 6.3. Calculate new dead bytes at the beginning of the suffix. *) + let suffix_dead_bytes = + match removable_chunks with + (* If no chunks are GCed, the dead bytes are equivalent to the physical + offset of new suffix offset. *) + | [] -> Dispatcher.soff_of_offset dispatcher new_suffix_start_offset + (* Otherwise, it is the difference between the last chunk removed's end offset + and the new start offset. *) + | last_removed_chunk :: _ -> + let removed_end_offset = + last_removed_chunk.end_suffix_off + |> Dispatcher.offset_of_soff dispatcher + in + Int63.Syntax.(new_suffix_start_offset - removed_end_offset) + in + (* Step 6.4. Assertions and record construction. *) + assert (Int63.Syntax.(suffix_dead_bytes >= Int63.zero)); + let removable_chunk_idxs = + removable_chunks |> List.map (fun c -> c.idx) + in + ( { + start_offset = new_suffix_start_offset; + dead_bytes = suffix_dead_bytes; + chunk_start_idx; + }, + removable_chunk_idxs ) in - stats := Gc_stats.Worker.add_file_size !stats "suffix" new_end_suffix_offset; - Ao.flush suffix |> Errs.raise_if_error; - (* Step 8. Finalise stats and return. *) - Gc_stats.Worker.finalise !stats - - type gc_output = (Stats.Latest_gc.worker, Args.Errs.t) result - [@@deriving irmin] + (* Step 7. Finalise stats and return. *) + let stats = Gc_stats.Worker.finalise !stats in + { suffix_params; removable_chunk_idxs; stats } let write_gc_output ~root ~generation output = let open Result_syntax in @@ -330,10 +357,11 @@ module Make (Args : Gc_args.S) = struct (* No one catches errors when this function terminates. Write the result in a file and terminate. *) - let run_and_output_result ~generation root commit_key new_prefix_end_offset = + let run_and_output_result ~generation root commit_key new_suffix_start_offset + = let result = Errs.catch (fun () -> - run ~generation root commit_key new_prefix_end_offset) + run ~generation root commit_key new_suffix_start_offset) in let write_result = write_gc_output ~root ~generation result in write_result |> Errs.log_if_error "writing gc output" diff --git a/src/irmin-pack/unix/gc_worker.mli b/src/irmin-pack/unix/gc_worker.mli index fcafc34bb1..d7e359502c 100644 --- a/src/irmin-pack/unix/gc_worker.mli +++ b/src/irmin-pack/unix/gc_worker.mli @@ -25,7 +25,20 @@ module Make (Args : Gc_args.S) : sig val run_and_output_result : generation:int -> string -> Args.key -> int63 -> unit - type gc_output = (Stats.Latest_gc.worker, Args.Errs.t) result + type suffix_params = { + start_offset : int63; + chunk_start_idx : int; + dead_bytes : int63; + } [@@deriving irmin] + + type gc_results = { + suffix_params : suffix_params; + removable_chunk_idxs : int list; + stats : Stats.Latest_gc.worker; + } + [@@deriving irmin] + + type gc_output = (gc_results, Args.Errs.t) result [@@deriving irmin] end with module Args := Args diff --git a/src/irmin-pack/unix/import.ml b/src/irmin-pack/unix/import.ml index f272f81c4e..db51857912 100644 --- a/src/irmin-pack/unix/import.ml +++ b/src/irmin-pack/unix/import.ml @@ -34,6 +34,14 @@ module Array = struct loop 0 end +module List = struct + include List + + let rec iter_result f = function + | [] -> Ok () + | hd :: tl -> Result.bind (f hd) (fun () -> iter_result f tl) +end + module Int63 = struct include Optint.Int63 diff --git a/test/irmin-pack/test_gc.ml b/test/irmin-pack/test_gc.ml index 4ee3031161..1547cc196c 100644 --- a/test/irmin-pack/test_gc.ml +++ b/test/irmin-pack/test_gc.ml @@ -356,7 +356,7 @@ module Gc = struct let* () = S.Repo.close t.repo in Alcotest.(check bool) "RW cleaned up" false - (Sys.file_exists (Filename.concat store_name "store.0.suffix")); + (Sys.file_exists (Filename.concat store_name "store.0.prefix")); let* t = init ~readonly:false ~fresh:false ~root:store_name () in let* () = check_1 t c1 in let* () = check_2 t c2 in @@ -619,7 +619,6 @@ module Gc = struct "test" [ ("mapping", 72); - ("suffix", 531); ("prefix", 316); ("sorted", 128); ("reachable", 128); @@ -924,34 +923,62 @@ module Split = struct let* () = S.Repo.close t.repo in S.Repo.close ro_t.repo - let check_preexisting_commit t = - let s = "22e159de13b427226e5901defd17f0c14e744205" in - let h = - match Irmin.Type.(of_string Schema.Hash.t) s with + let load_commit t h = + let hash = + match Irmin.Type.(of_string Schema.Hash.t) h with | Error (`Msg s) -> Alcotest.failf "failed hash_of_string %s" s - | Ok h -> h + | Ok hash -> hash in - let* c = S.Commit.of_hash t.repo h in - match c with - | None -> Alcotest.failf "Commit %s not found" s - | Some commit -> - let tree = S.Commit.tree commit in - let+ got = S.Tree.find tree [ "step-n01"; "step-b01" ] in - Alcotest.(check (option string)) "find blob" (Some "b01") got + let+ commit = S.Commit.of_hash t.repo hash in + match commit with + | None -> Alcotest.failf "Commit %s not found" h + | Some commit -> commit - let two_splits_v3_migrated_store () = + let check_preexisting_commit t = + let h = "22e159de13b427226e5901defd17f0c14e744205" in + let* commit = load_commit t h in + let tree = S.Commit.tree commit in + let+ got = S.Tree.find tree [ "step-n01"; "step-b01" ] in + Alcotest.(check (option string)) "find blob" (Some "b01") got + + let v3_migrated_store_splits_and_gc () = let root = create_test_env () in let* t = init ~readonly:false ~fresh:false ~root () in - let () = S.split t.repo in + let* c0 = load_commit t "22e159de13b427226e5901defd17f0c14e744205" in let* t, c1 = commit_1 t in let () = S.split t.repo in let* t = checkout_exn t c1 in let* t, c2 = commit_2 t in + let () = S.split t.repo in [%log.debug - "chunk1 consists of the preexisting V3 suffix, created chunk2 and chunk3"]; + "chunk0 consists of the preexisting V3 suffix and c1, chunk1 is c2"]; + let* () = check_preexisting_commit t in let* () = check_1 t c1 in let* () = check_2 t c2 in + [%log.debug "GC at c0"]; + let* () = start_gc ~unlink:true t c0 in + let* () = finalise_gc t in let* () = check_preexisting_commit t in + let* () = check_1 t c1 in + let* () = check_2 t c2 in + Alcotest.(check bool) + "Chunk0 still exists" true + (Sys.file_exists (Filename.concat t.root "store.0.suffix")); + [%log.debug "GC at c1"]; + let* () = start_gc ~unlink:true t c1 in + let* () = finalise_gc t in + let* () = check_not_found t c0 "removed c0" in + let* () = check_1 t c1 in + let* () = check_2 t c2 in + Alcotest.(check bool) + "Chunk0 removed" false + (Sys.file_exists (Filename.concat t.root "store.0.suffix")); + [%log.debug "GC at c2"]; + let* () = start_gc ~unlink:true t c2 in + let* () = finalise_gc t in + let* () = check_not_found t c0 "removed c0" in + let* () = check_not_found t c1 "removed c1" in + let* () = check_2 t c2 in S.Repo.close t.repo let close_and_split () = @@ -988,26 +1015,48 @@ module Split = struct let* t, c3 = commit_3 t in let* () = start_gc t c3 in let* () = finalise_gc t in + let () = S.split t.repo in + let* t = checkout_exn t c3 in + let* t, c4 = commit_4 t in + let* () = check_not_found t c1 "removed c1" in + let* () = check_not_found t c2 "removed c2" in + let* () = check_3 t c3 in + let* () = check_4 t c4 in + S.Repo.close t.repo - (* TODO: for now gc create an empty chunk, to replace by simple call to - split.*) - let () = - Alcotest.check_raises "To remove" - (Irmin_pack_unix.Errors.Pack_error `Multiple_empty_chunks) (fun () -> - S.split t.repo) - in + let multi_split_and_gc () = + (* This test primarily checks that dead byte calculation + happens correctly by testing GCs on chunks past the first + one. When the calculation is incorrect, exceptions are thrown + when attempting to lookup keys in the store. *) + let* t = init () in + let* t, c1 = commit_1 t in + let () = S.split t.repo in + + let* t = checkout_exn t c1 in + let* t, c2 = commit_2 t in + + let () = S.split t.repo in + let* () = start_gc t c1 in + let* () = finalise_gc t in + + let* t = checkout_exn t c2 in + let* t, c3 = commit_3 t in + + let () = S.split t.repo in + let* () = start_gc t c2 in + let* () = finalise_gc t in - (* let () = S.split t.repo in *) let* t = checkout_exn t c3 in let* t, c4 = commit_4 t in + let* () = check_not_found t c1 "removed c1" in - let* () = check_not_found t c2 "removed c2" in + let* () = check_2 t c2 in let* () = check_3 t c3 in let* () = check_4 t c4 in S.Repo.close t.repo - (* TODO : following tests not working for now.*) - let _split_and_gc () = + let split_and_gc () = let* t = init () in let* t, c1 = commit_1 t in let () = S.split t.repo in @@ -1019,7 +1068,7 @@ module Split = struct let* () = check_not_found t c1 "removed c1" in S.Repo.close t.repo - let _another_split_and_gc () = + let another_split_and_gc () = let* t = init () in let* t, c1 = commit_1 t in let () = S.split t.repo in @@ -1031,12 +1080,56 @@ module Split = struct let* () = check_2 t c2 in S.Repo.close t.repo + let split_during_gc () = + let* t = init () in + let* t, c1 = commit_1 t in + let* () = start_gc t c1 in + let () = S.split t.repo in + let* t = checkout_exn t c1 in + let* t, c2 = commit_2 t in + let* () = finalise_gc t in + let* () = check_1 t c1 in + let* () = check_2 t c2 in + S.Repo.close t.repo + + let commits_and_splits_during_gc () = + (* This test primarily ensures that chunk num is calculated + correctly by intentionally creating chunks during a GC. *) + let* t = init () in + let* t, c1 = commit_1 t in + + let () = S.split t.repo in + let* t = checkout_exn t c1 in + let* t, c2 = commit_2 t in + + let* () = start_gc t c2 in + let () = S.split t.repo in + + let* t = checkout_exn t c2 in + let* t, c3 = commit_3 t in + + let () = S.split t.repo in + let* t = checkout_exn t c3 in + let* t, c4 = commit_4 t in + + let* () = finalise_gc t in + let* () = check_not_found t c1 "removed c1" in + let* () = check_2 t c2 in + let* () = check_3 t c3 in + let* () = check_4 t c4 in + S.Repo.close t.repo + let tests = [ tc "Test two splits" two_splits; tc "Test two splits for ro" ro_two_splits; - tc "Test splits on V3 store" two_splits_v3_migrated_store; + tc "Test splits and GC on V3 store" v3_migrated_store_splits_and_gc; tc "Test split and close" close_and_split; tc "Test two gc followed by split" two_gc_then_split; + tc "Test split and GC" split_and_gc; + tc "Test multi split and GC" multi_split_and_gc; + tc "Test another split and GC" another_split_and_gc; + tc "Test split during GC" split_during_gc; + tc "Test commits and splits during GC" commits_and_splits_during_gc; ] end From ab394e3fdfa3d4574da9b2fa143566d95745521c Mon Sep 17 00:00:00 2001 From: metanivek Date: Tue, 1 Nov 2022 15:53:16 -0400 Subject: [PATCH 2/3] examples: update GC example for chunked suffix --- examples/gc.ml | 103 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 34 deletions(-) diff --git a/examples/gc.ml b/examples/gc.ml index 506cb96ddd..6ee1a384a4 100644 --- a/examples/gc.ml +++ b/examples/gc.ml @@ -85,7 +85,7 @@ module Repo_config = struct end (** Utility for creating commit info *) -let info fmt = Irmin_unix.info ~author:"pack example" fmt +let info fmt key value = Irmin_unix.info ~author:"pack example" fmt key value () (** Utility for computing the size of a directory *) let rec megabytes_of_path path = @@ -95,54 +95,89 @@ let rec megabytes_of_path path = 0. (Sys.readdir path) else float_of_int Unix.((stat path).st_size) /. 1e6 -(** Demonstrate running GC on all commits before the head *) -let gc_all_but_head repo branch = - let* head = Store.Head.get branch in - let head_key = Store.Commit.key head in - let finished = function - | Ok stats -> - let duration = Irmin_pack_unix.Stats.Latest_gc.total_duration stats in - let finalise_duration = - Irmin_pack_unix.Stats.Latest_gc.finalise_duration stats +(** A utility module for tracking the latest commit and the commit we will want + to run GC for. *) +module Tracker = struct + type t = { + mutable latest_commit : Store.commit option; + mutable next_gc_commit : Store.commit option; + } + + let v () = { latest_commit = None; next_gc_commit = None } + let update_latest_commit t commit = t.latest_commit <- Some commit + + let latest_parents t = + match t.latest_commit with None -> [] | Some c -> Store.Commit.parents c + + let latest_tree t = + match t.latest_commit with + | None -> Store.Tree.empty () + | Some c -> Store.Commit.tree c + + let mark_next_gc_commit t = t.next_gc_commit <- t.latest_commit +end + +(** Demonstrate running GC on a previous commit aligned to the end of a chunk + for ideal GC space reclamation. *) +let run_gc repo tracker = + let* () = + match Tracker.(tracker.next_gc_commit) with + | None -> Lwt.return_unit + | Some commit -> ( + let finished = function + | Ok stats -> + let duration = + Irmin_pack_unix.Stats.Latest_gc.total_duration stats + in + let finalise_duration = + Irmin_pack_unix.Stats.Latest_gc.finalise_duration stats + in + Printf.printf + "GC finished in %.4fs. Finalise took %.4fs. Size of repo: \ + %.2fMB.\n" + duration finalise_duration + (megabytes_of_path Repo_config.root) + |> Lwt.return + | Error (`Msg err) -> print_endline err |> Lwt.return in - Printf.printf - "GC finished in %.4fs. Finalise took %.4fs. Size of repo: %.2fMB.\n" - duration finalise_duration - (megabytes_of_path Repo_config.root) - |> Lwt.return - | Error (`Msg err) -> print_endline err |> Lwt.return + (* Launch GC *) + let commit_key = Store.Commit.key commit in + let+ launched = Store.Gc.run ~finished repo commit_key in + match launched with + | Ok false -> + Printf.printf "GC did not launch. Already running? %B\n" + (Store.Gc.is_finished repo = false) + | Ok true -> + Printf.printf "GC started. Size of repo: %.2fMB\n" + (megabytes_of_path Repo_config.root) + | Error (`Msg err) -> print_endline err) in - let+ launched = Store.Gc.run ~finished repo head_key in - match launched with - | Ok false -> - Printf.printf "GC did not launch. Already running? %B\n" - (Store.Gc.is_finished repo = false) - | Ok true -> - Printf.printf "GC started. Size of repo: %.2fMB\n" - (megabytes_of_path Repo_config.root) - | Error (`Msg err) -> print_endline err + (* Create new split and mark the latest commit to be the next GC commit. *) + let () = Store.split repo in + Tracker.mark_next_gc_commit tracker |> Lwt.return let main () = - let run_gc = true in let num_of_commits = 200_000 in - let gc_every = 5_000 in + let gc_every = 1_000 in let* repo = Store.Repo.v Repo_config.config in - let* main_branch = Store.main repo in + let tracker = Tracker.v () in (* Create commits *) let* _ = let rec loop i n = - let key = Printf.sprintf "hello%d" i in + let key = "hello" in let value = Printf.sprintf "packfile%d" i in - let* _ = - Store.set ~info:(info "add %s = %s" key value) main_branch [ key ] value + let* tree = Store.Tree.add (Tracker.latest_tree tracker) [ key ] value in + let parents = Tracker.latest_parents tracker in + let* commit = + Store.Commit.v repo ~info:(info "add %s = %s" key value) ~parents tree in + Tracker.update_latest_commit tracker commit; let* _ = - if run_gc && i mod gc_every = 0 then gc_all_but_head repo main_branch - else Lwt.return_unit + if i mod gc_every = 0 then run_gc repo tracker else Lwt.return_unit in if i >= n then Lwt.return_unit else loop (i + 1) n in - loop 0 num_of_commits + loop 1 num_of_commits in (* A GC may still be running. Wait for GC to finish before ending the process *) let* _ = Store.Gc.wait repo in From 26bc5002367deb6f2a2c887a586085e8d1cd675f Mon Sep 17 00:00:00 2001 From: metanivek Date: Thu, 3 Nov 2022 11:41:48 -0400 Subject: [PATCH 3/3] Add changelog entry for chunk-based GC --- CHANGES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 25c7daf09e..4d61ded13d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,6 +17,9 @@ - Detecting control file corruption with a checksum (#2119, @art-w) - Change on-disk layout of the suffix from a single file to a multiple, chunked file design (#2115, @metanivek) + - Modify GC to work with new chunked suffix. See `examples/gc.ml` for a + demonstration of how it works with the new `split` function. (#2126, + @metanivek) ### Fixed