Skip to content

Commit

Permalink
irmin-pack: userland mmap
Browse files Browse the repository at this point in the history
  • Loading branch information
art-w committed Apr 18, 2023
1 parent 63e6e74 commit 794b7f9
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 113 deletions.
4 changes: 3 additions & 1 deletion src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ struct
else
let mapping = Layout.mapping ~generation ~root in
let data = Layout.prefix ~root ~generation in
let+ prefix = Sparse.open_ro ~mapping ~data in
let* mapping_size = Io.size_of_path mapping in
let mapping_size = Int63.to_int mapping_size in
let+ prefix = Sparse.open_ro ~mapping_size ~mapping ~data in
Some prefix

let reopen_prefix t ~generation =
Expand Down
36 changes: 15 additions & 21 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ module Make (Args : Gc_args.S) = struct
latest_gc_target_offset;
}

let swap_and_purge t removable_chunk_num modified_volume suffix_params =
let swap_and_purge t (gc_results : Worker.gc_results) =
let removable_chunk_num = List.length gc_results.removable_chunk_idxs in
let { generation; latest_gc_target_offset; _ } = t in
let Worker.
{
start_offset = suffix_start_offset;
chunk_start_idx;
dead_bytes = suffix_dead_bytes;
} =
suffix_params
gc_results.suffix_params
in
(* Calculate chunk num in main process since more chunks could have been
added while GC was running. GC process only tells us how many chunks are
Expand All @@ -126,8 +127,9 @@ module Make (Args : Gc_args.S) = struct
is guaranteed by the GC process. *)
assert (chunk_num >= 1);

Fm.swap t.fm ~generation ~suffix_start_offset ~chunk_start_idx ~chunk_num
~suffix_dead_bytes ~latest_gc_target_offset ~volume:modified_volume
Fm.swap t.fm ~generation ~mapping_size:gc_results.mapping_size
~suffix_start_offset ~chunk_start_idx ~chunk_num ~suffix_dead_bytes
~latest_gc_target_offset ~volume:gc_results.modified_volume

let unlink_all { root; generation; _ } removable_chunk_idxs =
(* Unlink suffix chunks *)
Expand Down Expand Up @@ -231,33 +233,22 @@ module Make (Args : Gc_args.S) = struct
let result =
let open Result_syntax in
match (status, gc_output) with
| ( `Success,
Ok
{
suffix_params;
removable_chunk_idxs;
stats = worker_stats;
modified_volume;
} ) ->
| `Success, Ok gc_results ->
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats
"swap and purge"
in
let* () =
swap_and_purge t
(List.length removable_chunk_idxs)
modified_volume suffix_params
in
let* () = swap_and_purge t gc_results in
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "unlink"
in
if t.unlink then unlink_all t removable_chunk_idxs;
if t.unlink then unlink_all t gc_results.removable_chunk_idxs;

let stats =
let after_suffix_end_offset =
Dispatcher.end_offset t.dispatcher
in
Gc_stats.Main.finalise partial_stats worker_stats
Gc_stats.Main.finalise partial_stats gc_results.stats
~after_suffix_end_offset
in
Stats.report_latest_gc stats;
Expand Down Expand Up @@ -289,8 +280,11 @@ module Make (Args : Gc_args.S) = struct
let* status = Async.await t.task in
let gc_output = read_gc_output ~root:t.root ~generation:t.generation in
match (status, gc_output) with
| `Success, Ok _ ->
Lwt.return (t.latest_gc_target_offset, t.new_suffix_start_offset)
| `Success, Ok gc_results ->
Lwt.return
( t.latest_gc_target_offset,
t.new_suffix_start_offset,
gc_results.mapping_size )
| _ ->
let r = gc_errors status gc_output |> Errs.raise_if_error in
Lwt.return r
Expand Down
25 changes: 18 additions & 7 deletions src/irmin-pack/unix/gc_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ module Make (Args : Gc_args.S) = struct

type gc_results = {
suffix_params : suffix_params;
mapping_size : int63;
removable_chunk_idxs : int list;
modified_volume : Lower.volume_identifier option;
stats : Stats.Latest_gc.worker;
Expand Down Expand Up @@ -288,14 +289,14 @@ module Make (Args : Gc_args.S) = struct
Irmin_pack.Layout.V4.mapping ~root:new_files_path ~generation
in
let data = Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation in
let () =
let mapping_size =
let prefix = Sparse.Ao.create ~mapping ~data |> Errs.raise_if_error in
(* Step 5. Transfer to the new prefix, flush and close. *)
[%log.debug "GC: transfering to the new prefix"];
stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer";
Errors.finalise_exn (fun _outcome ->
Errors.finalise_exn (fun outcome ->
Sparse.Ao.flush prefix
>>= (fun _ -> Sparse.Ao.close prefix)
>>= (fun _ -> Sparse.Ao.close prefix >>= fun _ -> Ok outcome)
|> Errs.log_if_error "GC: Close prefix after data copy")
@@ fun () ->
(* Step 5.1. Transfer all. *)
Expand All @@ -304,15 +305,18 @@ module Make (Args : Gc_args.S) = struct
let len = Int63.of_int len in
let str = Dispatcher.read_seq_exn dispatcher ~off ~len in
Sparse.Ao.append_seq_exn prefix ~off str)
live_entries
live_entries;
Int63.to_int (Sparse.Ao.mapping_size prefix)
in
(* Step 5.2. Update the parent commits to be dangling: reopen the new
prefix, this time in write-only as we have to
modify data inside the file. *)
stats :=
Gc_stats.Worker.finish_current_step !stats
"prefix: rewrite commit parents";
let prefix = Sparse.Wo.open_wo ~mapping ~data |> Errs.raise_if_error in
let prefix =
Sparse.Wo.open_wo ~mapping_size ~mapping ~data |> Errs.raise_if_error
in
Errors.finalise_exn (fun _outcome ->
Sparse.Wo.fsync prefix
>>= (fun _ -> Sparse.Wo.close prefix)
Expand All @@ -326,7 +330,7 @@ module Make (Args : Gc_args.S) = struct
let () = report_new_file_sizes ~root ~generation stats |> ignore in

(* Step 6. Calculate post-GC suffix parameters. *)
let suffix_params, removable_chunk_idxs =
let suffix_params, mapping_size, removable_chunk_idxs =
stats :=
Gc_stats.Worker.finish_current_step !stats
"suffix: calculate new values";
Expand Down Expand Up @@ -387,6 +391,7 @@ module Make (Args : Gc_args.S) = struct
dead_bytes = suffix_dead_bytes;
chunk_start_idx;
},
mapping_size,
removable_chunk_idxs )
in

Expand Down Expand Up @@ -421,7 +426,13 @@ module Make (Args : Gc_args.S) = struct

(* Step 8. Finalise stats and return. *)
let stats = Gc_stats.Worker.finalise !stats in
{ suffix_params; removable_chunk_idxs; modified_volume; stats }
{
suffix_params;
mapping_size;
removable_chunk_idxs;
modified_volume;
stats;
}

let write_gc_output ~root ~generation output =
let open Result_syntax in
Expand Down
1 change: 1 addition & 0 deletions src/irmin-pack/unix/gc_worker.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module Make (Args : Gc_args.S) : sig

type gc_results = {
suffix_params : suffix_params;
mapping_size : int63;
removable_chunk_idxs : int list;
modified_volume : Lower.volume_identifier option;
stats : Stats.Latest_gc.worker;
Expand Down
5 changes: 3 additions & 2 deletions src/irmin-pack/unix/lower.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,15 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct

let open_ = function
| Empty _ -> Ok () (* Opening an empty volume is a no-op *)
| Nonempty ({ path = root; sparse; _ } as t) -> (
| Nonempty ({ path = root; sparse; control; _ } as t) -> (
match sparse with
| Some _ -> Ok () (* Sparse file is already open *)
| None ->
let open Result_syntax in
let mapping = Layout.mapping ~root in
let data = Layout.data ~root in
let+ sparse = Sparse.open_ro ~mapping ~data in
let mapping_size = Int63.to_int control.Payload.mapping_end_poff in
let+ sparse = Sparse.open_ro ~mapping_size ~mapping ~data in
t.sparse <- Some sparse)

let close = function
Expand Down
Loading

0 comments on commit 794b7f9

Please sign in to comment.