Skip to content

Commit

Permalink
Merge pull request #2105 from Ngoguey42/split-gc-file
Browse files Browse the repository at this point in the history
irmin-pack: Cleanup GC code prior to file split
  • Loading branch information
Ngoguey42 authored Oct 7, 2022
2 parents 73b5dd3 + d1b74ab commit f83f1b4
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 80 deletions.
116 changes: 81 additions & 35 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
type t = { fm : Fm.t }
type location = Prefix | Suffix [@@deriving irmin]

type accessor = { poff : int63; len : int; location : location }
type accessor = { poff : int63; len : int63; location : location }
[@@deriving irmin]
(** [poff] is a physical offset in a file. It is meant to be passed to [Io] or
[Append_only]
Expand Down Expand Up @@ -140,15 +140,14 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
let chunk, shift_in_chunk, max_entry_len = chunk_of_off_exn mapping off in

(* Case 3: The entry ends after the chunk *)
let open Int63 in
let open Int63.Syntax in
(if of_int len > max_entry_len then
(if len > max_entry_len then
let s =
Fmt.str
"entry (off=%a, len=%d) is supposed to be contained in chunk \
"entry (off=%a, len=%a) is supposed to be contained in chunk \
(poff=%a,len=%d) and starting at %a but is larger than it can be\n\
\ contained in chunk" Int63.pp off len Int63.pp chunk.poff chunk.len
Int63.pp shift_in_chunk
\ contained in chunk" Int63.pp off Int63.pp len Int63.pp chunk.poff
chunk.len Int63.pp shift_in_chunk
in
raise (Errors.Pack_error (`Invalid_prefix_read s)));

Expand All @@ -159,7 +158,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
module Accessor = struct
let v_in_suffix_exn t ~off ~len =
let open Int63.Syntax in
let entry_end_offset = off + Int63.of_int len in
let entry_end_offset = off + len in
if entry_end_offset > end_offset t then
raise (Errors.Pack_error `Read_out_of_bounds)
else
Expand All @@ -177,8 +176,6 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
else v_in_prefix_exn (get_mapping t) ~off ~len

let v_range_in_suffix_exn t ~off ~min_len ~max_len =
let min_len = Int63.of_int min_len in
let max_len = Int63.of_int max_len in
let len =
let open Int63.Syntax in
let bytes_after_off = end_offset t - off in
Expand All @@ -188,25 +185,21 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
else bytes_after_off
in
let poff = Suffix_arithmetic.poff_of_off t off in
{ poff; len = Int63.to_int len; location = Suffix }
{ poff; len; location = Suffix }

let v_range_in_prefix_exn t ~off ~min_len ~max_len =
let mapping = get_mapping t in
let chunk, shift_in_chunk, max_entry_len =
Prefix_arithmetic.chunk_of_off_exn mapping off
in
let open Int63 in
let open Int63.Syntax in
let len =
let min_len = of_int min_len in
let max_len = of_int max_len in
if max_entry_len < min_len then
raise (Errors.Pack_error `Read_out_of_bounds)
else if max_entry_len > max_len then max_len
else max_entry_len
in
let poff = chunk.poff + shift_in_chunk in
let len = Int63.to_int len in
{ poff; len; location = Prefix }

let v_range_exn t ~off ~min_len ~max_len =
Expand All @@ -218,39 +211,92 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
end

let read_exn t { poff; len; location } buf =
assert (len <= Int63.of_int Stdlib.max_int);
(* This assetion cannot be triggered because:
- The user of Dispatcher's API is only able to construct accessors from
[int].
- The internals of this file may construct very large accessors but they
will be chopped before being passed to [read_exn]. *)
let len = Int63.to_int len in
match location with
| Prefix -> Io.read_exn (get_prefix t) ~off:poff ~len buf
| Suffix -> Suffix.read_exn (Fm.suffix t.fm) ~off:poff ~len buf

let read_in_prefix_and_suffix_exn t ~off ~len buf =
let ( -- ) a b = a - b in
let read_bytes_exn t ~f ~off ~len =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
if off < suffix_start_offset && off + Int63.of_int len > suffix_start_offset
then (
let read_in_prefix = suffix_start_offset - off |> Int63.to_int in
let accessor = Accessor.v_exn t ~off ~len:read_in_prefix in
read_exn t accessor buf;
let read_in_suffix = len -- read_in_prefix in
let buf_suffix = Bytes.create read_in_suffix in
let accessor =
Accessor.v_exn t ~off:suffix_start_offset ~len:read_in_suffix
in
read_exn t accessor buf_suffix;
Bytes.blit buf_suffix 0 buf read_in_prefix read_in_suffix)
else read_exn t (Accessor.v_exn t ~off ~len) buf
let bytes_in_prefix =
let prefix_bytes_after_off = suffix_start_offset t - off in
if prefix_bytes_after_off <= Int63.zero then Int63.zero
else min len prefix_bytes_after_off
in
let bytes_in_suffix =
if bytes_in_prefix < len then len - bytes_in_prefix else Int63.zero
in
assert (bytes_in_prefix + bytes_in_suffix = len);
let prefix_accessor_opt =
if bytes_in_prefix > Int63.zero then
Some (Accessor.v_exn t ~off ~len:bytes_in_prefix)
else None
in
let suffix_accessor_opt =
if bytes_in_suffix > Int63.zero then
let off = off + bytes_in_prefix in
Some (Accessor.v_exn t ~off ~len:bytes_in_suffix)
else None
in

(* Now that we have the accessor(s), we're sure the range is valid:
- it doesn't include dead data from the prefix,
- it doesn't go after the end of the suffix.
Go for read. *)
let max_read_size = 8192 in
let buffer = Bytes.create max_read_size in
let max_read_size = Int63.of_int max_read_size in
let rec aux accessor =
if accessor.len = Int63.zero then ()
else if accessor.len < max_read_size then (
read_exn t accessor buffer;
f (Bytes.sub_string buffer 0 (Int63.to_int accessor.len)))
else
let left, right =
( { accessor with len = max_read_size },
{
accessor with
poff = accessor.poff + max_read_size;
len = accessor.len - max_read_size;
} )
in
read_exn t left buffer;
f (Bytes.to_string buffer);
aux right
in
Option.iter aux prefix_accessor_opt;
Option.iter aux suffix_accessor_opt

let create_accessor_exn t ~off ~len =
let len = Int63.of_int len in
Accessor.v_exn t ~off ~len

let create_accessor_exn = Accessor.v_exn
let create_accessor_from_range_exn = Accessor.v_range_exn
let create_accessor_to_prefix_exn = Accessor.v_in_prefix_exn
let create_accessor_from_range_exn t ~off ~min_len ~max_len =
let min_len = Int63.of_int min_len in
let max_len = Int63.of_int max_len in
Accessor.v_range_exn t ~off ~min_len ~max_len

let create_accessor_to_prefix_exn t ~off ~len =
let len = Int63.of_int len in
Accessor.v_in_prefix_exn t ~off ~len

let shrink_accessor_exn a ~new_len =
let open Int63.Syntax in
let new_len = Int63.of_int new_len in
if new_len > a.len then failwith "shrink_accessor_exn to larger accessor";
{ a with len = new_len }

let create_sequential_accessor_exn location rem_len ~poff ~len =
if len > rem_len then raise (Errors.Pack_error `Read_out_of_bounds)
else { poff; len; location }
else { poff; len = Int63.of_int len; location }

let create_sequential_accessor_from_range_exn location rem_len ~poff ~min_len
~max_len =
Expand All @@ -259,7 +305,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
else if rem_len > max_len then max_len
else rem_len
in
{ poff; len; location }
{ poff; len = Int63.of_int len; location }

let create_sequential_accessor_seq t ~min_header_len ~max_header_len ~read_len
=
Expand Down
31 changes: 23 additions & 8 deletions src/irmin-pack/unix/dispatcher_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module type S = sig
type t
type location = private Prefix | Suffix [@@deriving irmin]

type accessor = private { poff : int63; len : int; location : location }
type accessor = private { poff : int63; len : int63; location : location }
[@@deriving irmin]
(** An [accessor] designates a valid readable area in one of the pack files.
Expand All @@ -48,6 +48,11 @@ module type S = sig
except that the precise length of the span will be decided during the
call. *)

val create_accessor_to_prefix_exn :
Mapping_file.t -> off:int63 -> len:int -> accessor
(** [create_accessor_to_prefix_exn mapping ~off ~len] returns an accessor for
the prefix file associated with [mapping]. *)

val shrink_accessor_exn : accessor -> new_len:int -> accessor
(** [shrink_accessor_exn a ~new_len] is [a] where the length is smaller than
in [a].*)
Expand Down Expand Up @@ -83,14 +88,24 @@ module type S = sig
(** [offset_of_suffix_poff t suffix_off] converts a suffix offset into a
(global) offset. *)

val read_in_prefix_and_suffix_exn : t -> off:int63 -> len:int -> bytes -> unit
(** Simlar to [read_exn] but if [off + len] is greater than the end of the
prefix, it will read the remaining in the prefix. *)
val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit
(** [read_bytes_exn] reads a slice of the global offset space defined by [off]
and [len].
val create_accessor_to_prefix_exn :
Mapping_file.t -> off:int63 -> len:int -> accessor
(** [create_accessor_to_prefix_exn mapping ~off ~len] returns an accessor for
the prefix file associated with [mapping]. *)
The calls to [f] ignore the objects boundaries (i.e. the string passed to
[f] will most of the time not be the beginning of an object).
The strings passed to [f] are safe. They can be kept around, they are not
the result of an [unsafe_to_string] conversion.
The call will fail if the [(off, len)] range is invalid. It will succeed
in these cases:
- If the range designates a slice of the suffix.
- If the range designates a slice of contiguous live bytes in the prefix
- If the range designates a slice of contiguous live bytes that starts in
the prefix and ends in the suffix. This implies that the last chunk of
the prefix is contiguous to the start of the suffix. *)
end

module type Sigs = sig
Expand Down
43 changes: 6 additions & 37 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -323,23 +323,13 @@ end
module Worker = struct
module Payload = Control_file.Latest_payload

let buffer_size = 8192

exception Pack_error = Errors.Pack_error

module type S = sig
module Args : Args

val run_and_output_result : generation:int -> string -> Args.key -> unit

val transfer_append_exn :
dispatcher:Args.Dispatcher.t ->
append_exn:(string -> unit) ->
off:int63 ->
len:int63 ->
bytes ->
unit

type gc_output = (Stats.Latest_gc.worker, Args.Errs.t) result
[@@deriving irmin]
end
Expand Down Expand Up @@ -370,25 +360,6 @@ module Worker = struct

let string_of_key = Irmin.Type.to_string key_t

let transfer_append_exn ~dispatcher ~append_exn ~(off : int63)
~(len : int63) buffer =
let read_exn = Dispatcher.read_in_prefix_and_suffix_exn dispatcher in
let buffer_size = Bytes.length buffer |> Int63.of_int in
let rec aux off len_remaining =
let open Int63.Syntax in
let min a b = if a < b then a else b in
let len = min buffer_size len_remaining in
let len' = Int63.to_int len in
read_exn ~off ~len:len' buffer;
let () =
if len = buffer_size then append_exn (Bytes.to_string buffer)
else append_exn (Bytes.sub_string buffer 0 len')
in
let len_remaining = len_remaining - len in
if len_remaining > Int63.zero then aux (off + len) len_remaining
in
aux off len

(** [iter_from_node_key node_key _ _ ~f] calls [f] with the key of the node
and iterates over its children.
Expand Down Expand Up @@ -567,12 +538,11 @@ module Worker = struct
(* Step 5. Transfer to the new prefix, flush and close. *)
[%log.debug "GC: transfering to the new prefix"];
stats := Worker_stats.finish_current_step !stats "prefix: transfer";
let buffer = Bytes.create buffer_size in
(* Step 5.1. Transfer all. *)
let append_exn = Ao.append_exn prefix in
let f ~off ~len =
let len = Int63.of_int len in
transfer_append_exn ~dispatcher ~append_exn ~off ~len buffer
Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len
in
let () = Mapping_file.iter_exn mapping f in
Ao.flush prefix |> Errs.raise_if_error
Expand Down Expand Up @@ -606,7 +576,6 @@ module Worker = struct
(* Step 6. Create the new suffix and prepare 2 functions for read and write
operations. *)
stats := Worker_stats.finish_current_step !stats "suffix: start";
let buffer = Bytes.create buffer_size in
[%log.debug "GC: creating new suffix"];
let suffix = create_new_suffix ~root ~generation in
Errors.finalise_exn (fun _outcome ->
Expand All @@ -615,7 +584,6 @@ module Worker = struct
|> Errs.log_if_error "GC: Close suffix")
@@ fun () ->
let append_exn = Ao.append_exn suffix in
let transfer_exn = transfer_append_exn ~dispatcher ~append_exn buffer in

(* Step 7. Transfer to the next suffix. *)
[%log.debug "GC: transfering to the new suffix"];
Expand All @@ -637,7 +605,9 @@ module Worker = struct
(num_iterations - i + 1)
Int63.pp off Int63.pp len];
stats := Worker_stats.add_suffix_transfer !stats len;
let () = transfer_exn ~off ~len in
let () =
Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len
in
(* Check how many bytes are left, [4096*5] is selected because it is roughly the
number of bytes that requires a read from the block device on ext4 *)
if Int63.to_int len < 4096 * 5 then end_offset
Expand Down Expand Up @@ -788,13 +758,12 @@ module Make (Args : Args) = struct
Ao.close new_suffix
|> Errs.log_if_error "GC: Close suffix after copy latest newies")
@@ fun () ->
let buffer = Bytes.create 8192 in
let append_exn = Ao.append_exn new_suffix in
let flush_and_raise () = Ao.flush new_suffix |> Errs.raise_if_error in
let* () =
Errs.catch (fun () ->
Worker.transfer_append_exn ~dispatcher:t.dispatcher ~append_exn
~off:new_suffix_end_offset ~len:remaining buffer;
Dispatcher.read_bytes_exn t.dispatcher ~f:append_exn
~off:new_suffix_end_offset ~len:remaining;
flush_and_raise ())
in
Ok old_suffix_end_offset
Expand Down

0 comments on commit f83f1b4

Please sign in to comment.