Skip to content

Commit

Permalink
Merge pull request #2232 from art-w/no-mmap
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek authored Apr 19, 2023
2 parents 20048d1 + ccaa77e commit 03ad27f
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 174 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
- Unhandled exceptions in GC worker process are now reported as a failure
(#2163, @metanivek)
- Fix the silent mode for the integrity checks. (#2179, @icristescu)
- Fix file descriptor leak caused by `mmap`. (#2232, @art-w)

## 3.6.1 (2023-03-15)

Expand Down
22 changes: 21 additions & 1 deletion src/irmin-pack/unix/control_file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ module Serde = struct
generation = x.generation;
latest_gc_target_offset = x.suffix_start_offset;
suffix_dead_bytes = Int63.zero;
mapping_end_poff = None;
}
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13
| T14 | T15 ->
Expand All @@ -200,14 +201,33 @@ module Serde = struct
volume_num = 0;
}

let upgrade_status_from_v4 = function
| Payload.Upper.V4.From_v1_v2_post_upgrade x ->
Latest.From_v1_v2_post_upgrade x
| No_gc_yet -> No_gc_yet
| Used_non_minimal_indexing_strategy -> Used_non_minimal_indexing_strategy
| Gced x ->
Gced
{
suffix_start_offset = x.suffix_start_offset;
generation = x.generation;
latest_gc_target_offset = x.latest_gc_target_offset;
suffix_dead_bytes = x.suffix_dead_bytes;
mapping_end_poff = None;
}
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
(* Unreachable *)
assert false

let upgrade_from_v4 (pl : Payload.Upper.V4.t) : payload =
{
dict_end_poff = pl.dict_end_poff;
appendable_chunk_poff = pl.appendable_chunk_poff;
checksum = Int63.zero;
chunk_start_idx = pl.chunk_start_idx;
chunk_num = pl.chunk_num;
status = pl.status;
status = upgrade_status_from_v4 pl.status;
upgraded_from = Some (Version.to_int `V4);
volume_num = 0;
}
Expand Down
7 changes: 5 additions & 2 deletions src/irmin-pack/unix/control_file_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,16 @@ module Payload = struct
end

module V5 = struct
type gced = V4.gced = {
type gced = {
suffix_start_offset : int63;
generation : int;
latest_gc_target_offset : int63;
suffix_dead_bytes : int63;
mapping_end_poff : int63 option;
}
[@@deriving irmin]

type status = V4.status =
type status =
| From_v1_v2_post_upgrade of V3.from_v1_v2_post_upgrade
| No_gc_yet
| Used_non_minimal_indexing_strategy
Expand Down Expand Up @@ -239,6 +240,8 @@ module Payload = struct
New fields
- [volume_num] stores the number of volumes in the lower layer.
- [mapping_end_poff] stores the mapping file size (optional if missing
or unknown after a migration from V4).
Changed fields
Expand Down
69 changes: 36 additions & 33 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@ struct
let register_suffix_consumer t ~after_flush =
t.suffix_consumers <- { after_flush } :: t.suffix_consumers

let generation = function
| Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
| No_gc_yet ->
0
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
(* Unreachable *)
assert false
| Gced x -> x.generation
let get_gced = function Payload.Gced x -> Some x | _ -> None

let generation payload =
match get_gced payload with Some x -> x.generation | None -> 0

let mapping_size payload =
match get_gced payload with Some x -> x.mapping_end_poff | None -> None

let notify_reload_consumers consumers =
List.fold_left
Expand Down Expand Up @@ -215,18 +213,24 @@ struct

module Layout = Irmin_pack.Layout.V5

let open_prefix ~root ~generation =
let open_prefix ~root ~generation ~mapping_size =
let open Result_syntax in
if generation = 0 then Ok None
else
let mapping = Layout.mapping ~generation ~root in
let data = Layout.prefix ~root ~generation in
let+ prefix = Sparse.open_ro ~mapping ~data in
let* mapping_size =
match mapping_size with
| Some size -> Ok size
| None -> Io.size_of_path mapping
in
let mapping_size = Int63.to_int mapping_size in
let+ prefix = Sparse.open_ro ~mapping_size ~mapping ~data in
Some prefix

let reopen_prefix t ~generation =
let reopen_prefix t ~generation ~mapping_size =
let open Result_syntax in
let* some_prefix = open_prefix ~root:t.root ~generation in
let* some_prefix = open_prefix ~root:t.root ~generation ~mapping_size in
match some_prefix with
| None -> Ok ()
| Some _ ->
Expand Down Expand Up @@ -310,12 +314,12 @@ struct
let use_fsync = Irmin_pack.Conf.use_fsync config in
let indexing_strategy = Conf.indexing_strategy config in
let pl : Payload.t = Control.payload control in
let generation =
let generation, mapping_size =
match pl.status with
| From_v1_v2_post_upgrade _ | No_gc_yet
| Used_non_minimal_indexing_strategy ->
0
| Gced x -> x.generation
(0, None)
| Gced x -> (x.generation, x.mapping_end_poff)
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
assert false
Expand Down Expand Up @@ -348,7 +352,7 @@ struct
let cb _ = suffix_requires_a_flush_exn (get_instance ()) in
make_suffix ~auto_flush_threshold ~auto_flush_procedure:(`External cb)
in
let* prefix = open_prefix ~root ~generation in
let* prefix = open_prefix ~root ~generation ~mapping_size in
let* dict =
let path = Layout.dict ~root in
let auto_flush_threshold =
Expand Down Expand Up @@ -435,7 +439,10 @@ struct
in
(* Step 3.2. Potentially reload prefix *)
let* () =
if gen0 = gen1 then Ok () else reopen_prefix t ~generation:gen1
if gen0 = gen1 then Ok ()
else
reopen_prefix t ~generation:gen1
~mapping_size:(mapping_size pl1.status)
in
(* Step 3.3. Potentially reload lower *)
if gen0 = gen1 && pl0.volume_num = pl1.volume_num then Ok ()
Expand Down Expand Up @@ -575,6 +582,7 @@ struct
generation;
latest_gc_target_offset = Int63.zero;
suffix_dead_bytes = Int63.zero;
mapping_end_poff = Some Int63.zero;
};
}
in
Expand Down Expand Up @@ -766,7 +774,9 @@ struct
Suffix.open_ro ~root ~appendable_chunk_poff ~start_idx ~chunk_num
~dead_header_size
in
let* prefix = open_prefix ~root ~generation in
let* prefix =
open_prefix ~root ~generation ~mapping_size:(mapping_size status)
in
let* dict =
let path = Layout.dict ~root in
Dict.open_ro ~path ~end_poff:dict_end_poff ~dead_header_size
Expand Down Expand Up @@ -831,8 +841,8 @@ struct
| `Unknown_major_pack_version _ ) as e ->
e)

let swap t ~generation ~suffix_start_offset ~chunk_start_idx ~chunk_num
~suffix_dead_bytes ~latest_gc_target_offset ~volume =
let swap t ~generation ~mapping_size ~suffix_start_offset ~chunk_start_idx
~chunk_num ~suffix_dead_bytes ~latest_gc_target_offset ~volume =
let open Result_syntax in
[%log.debug
"Gc in main: swap gen %d; suffix start %a; chunk start idx %d; chunk num \
Expand All @@ -843,7 +853,8 @@ struct
let pl = Control.payload t.control in

(* Step 1. Reopen files *)
let* () = reopen_prefix t ~generation in
let mapping_size = Some mapping_size in
let* () = reopen_prefix t ~generation ~mapping_size in
let* () =
reopen_suffix t ~chunk_start_idx ~chunk_num
~appendable_chunk_poff:pl.appendable_chunk_poff
Expand All @@ -868,6 +879,7 @@ struct
generation;
latest_gc_target_offset;
suffix_dead_bytes;
mapping_end_poff = mapping_size;
}
in

Expand Down Expand Up @@ -973,8 +985,7 @@ struct
let lower = t.lower in
cleanup ~root ~generation ~chunk_start_idx ~chunk_num ~lower

let create_one_commit_store t config ~generation ~latest_gc_target_offset
~suffix_start_offset commit_key =
let create_one_commit_store t config gced commit_key =
let open Result_syntax in
let src_root = t.root in
let dst_root = Irmin_pack.Conf.root config in
Expand All @@ -990,15 +1001,7 @@ struct
in
let* () = Suffix.close suffix in
(* Step 3. Create the control file and close it. *)
let status =
Payload.Gced
{
suffix_start_offset;
generation;
latest_gc_target_offset;
suffix_dead_bytes = Int63.zero;
}
in
let status = Payload.Gced gced in
let dict_end_poff = Io.size_of_path dst_dict |> Errs.raise_if_error in
let pl =
{
Expand Down
5 changes: 2 additions & 3 deletions src/irmin-pack/unix/file_manager_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ module type S = sig
val swap :
t ->
generation:int ->
mapping_size:int63 ->
suffix_start_offset:int63 ->
chunk_start_idx:int ->
chunk_num:int ->
Expand Down Expand Up @@ -292,9 +293,7 @@ module type S = sig
val create_one_commit_store :
t ->
Irmin.Backend.Conf.t ->
generation:int ->
latest_gc_target_offset:int63 ->
suffix_start_offset:int63 ->
Control_file.Payload.Upper.Latest.gced ->
Index.key Pack_key.t ->
(unit, [> open_rw_error | close_error ]) result
(** [create_one_commit_store t conf generation new_store_root key] is called
Expand Down
41 changes: 20 additions & 21 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ module Make (Args : Gc_args.S) = struct
latest_gc_target_offset;
}

let swap_and_purge t removable_chunk_num modified_volume suffix_params =
let swap_and_purge t (gc_results : Worker.gc_results) =
let removable_chunk_num = List.length gc_results.removable_chunk_idxs in
let { generation; latest_gc_target_offset; _ } = t in
let Worker.
{
start_offset = suffix_start_offset;
chunk_start_idx;
dead_bytes = suffix_dead_bytes;
} =
suffix_params
gc_results.suffix_params
in
(* Calculate chunk num in main process since more chunks could have been
added while GC was running. GC process only tells us how many chunks are
Expand All @@ -126,8 +127,9 @@ module Make (Args : Gc_args.S) = struct
is guaranteed by the GC process. *)
assert (chunk_num >= 1);

Fm.swap t.fm ~generation ~suffix_start_offset ~chunk_start_idx ~chunk_num
~suffix_dead_bytes ~latest_gc_target_offset ~volume:modified_volume
Fm.swap t.fm ~generation ~mapping_size:gc_results.mapping_size
~suffix_start_offset ~chunk_start_idx ~chunk_num ~suffix_dead_bytes
~latest_gc_target_offset ~volume:gc_results.modified_volume

let unlink_all { root; generation; _ } removable_chunk_idxs =
(* Unlink suffix chunks *)
Expand Down Expand Up @@ -231,33 +233,22 @@ module Make (Args : Gc_args.S) = struct
let result =
let open Result_syntax in
match (status, gc_output) with
| ( `Success,
Ok
{
suffix_params;
removable_chunk_idxs;
stats = worker_stats;
modified_volume;
} ) ->
| `Success, Ok gc_results ->
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats
"swap and purge"
in
let* () =
swap_and_purge t
(List.length removable_chunk_idxs)
modified_volume suffix_params
in
let* () = swap_and_purge t gc_results in
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "unlink"
in
if t.unlink then unlink_all t removable_chunk_idxs;
if t.unlink then unlink_all t gc_results.removable_chunk_idxs;

let stats =
let after_suffix_end_offset =
Dispatcher.end_offset t.dispatcher
in
Gc_stats.Main.finalise partial_stats worker_stats
Gc_stats.Main.finalise partial_stats gc_results.stats
~after_suffix_end_offset
in
Stats.report_latest_gc stats;
Expand Down Expand Up @@ -289,8 +280,16 @@ module Make (Args : Gc_args.S) = struct
let* status = Async.await t.task in
let gc_output = read_gc_output ~root:t.root ~generation:t.generation in
match (status, gc_output) with
| `Success, Ok _ ->
Lwt.return (t.latest_gc_target_offset, t.new_suffix_start_offset)
| `Success, Ok gc_results ->
Lwt.return
{
Control_file_intf.Payload.Upper.Latest.generation =
Fm.generation t.fm + 1;
latest_gc_target_offset = t.latest_gc_target_offset;
suffix_start_offset = t.new_suffix_start_offset;
suffix_dead_bytes = Int63.zero;
mapping_end_poff = Some gc_results.mapping_size;
}
| _ ->
let r = gc_errors status gc_output |> Errs.raise_if_error in
Lwt.return r
Expand Down
7 changes: 3 additions & 4 deletions src/irmin-pack/unix/gc.mli
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ module Make (Args : Gc_args.S) : sig

val cancel : t -> bool

val finalise_without_swap : t -> (int63 * int63) Lwt.t
val finalise_without_swap :
t -> Control_file_intf.Payload.Upper.Latest.gced Lwt.t
(** Waits for the current gc to finish and returns immediately without
swapping the files and doing the other finalisation steps from [finalise].
It returns the [latest_gc_target_offset] and the
[new_suffix_start_offset]. *)
Returns the [gced] status to create a fresh control file for the snapshot. *)
end
with module Args = Args
Loading

0 comments on commit 03ad27f

Please sign in to comment.