diff --git a/src/irmin-pack/unix/chunked_suffix.ml b/src/irmin-pack/unix/chunked_suffix.ml new file mode 100644 index 00000000000..76e0250b913 --- /dev/null +++ b/src/irmin-pack/unix/chunked_suffix.ml @@ -0,0 +1,189 @@ +(* + * Copyright (c) 2022-2022 Tarides + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +open Import +include Chunked_suffix_intf + +module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct + module Io = Io + module Errs = Errs + module Ao = Append_only_file.Make (Io) (Errs) + + type chunk = { idx : int; start_off : int63; ao : Ao.t } + + let chunk_off_to_poff c off = Int63.Syntax.(off - c.start_off) + + type create_error = Io.create_error + + type open_error = + [ Io.open_error + | `Closed + | `Invalid_argument + | `Inconsistent_store + | `Read_out_of_bounds ] + + (** A simple container for chunks. *) + module Inventory : sig + type t + + val v : int -> (int -> chunk) -> t + val iter_skip_appendable : (chunk -> unit) -> t -> unit + val appendable : t -> chunk + val find : off:int63 -> t -> chunk + + val open_ : + start_idx:int -> + chunk_num:int -> + open_chunk: + (chunk_idx:int -> is_appendable:bool -> (Ao.t, open_error) result) -> + (t, [> open_error ]) result + end = struct + type t = chunk Array.t + + exception OpenInventoryError of open_error + + let v = Array.init + + let iter_skip_appendable f t = + Array.sub t 0 (Array.length t - 1) |> Array.iter f + + let appendable t = Array.get t (Array.length t - 1) + + let find ~off t = + let find c = + let end_poff = Ao.end_poff c.ao in + let is_after_start = c.start_off <= off in + let is_before_end = chunk_off_to_poff c off < end_poff in + is_after_start && is_before_end + in + match Array.find_opt find t with + | None -> raise (Errors.Pack_error `Read_out_of_bounds) + | Some c -> c + + let open_ ~start_idx ~chunk_num ~open_chunk = + let off = ref Int63.zero in + let create_chunk i = + let start_off = !off in + let chunk_idx = start_idx + i in + let is_appendable = i = chunk_num - 1 in + let open_result = open_chunk ~chunk_idx ~is_appendable in + match open_result with + | Error err -> raise (OpenInventoryError err) + | Ok ao -> + let end_poff = Ao.end_poff ao in + (off := Int63.Syntax.(!off + end_poff)); + { idx = chunk_idx; start_off; ao } + in + try Ok (v chunk_num create_chunk) + with OpenInventoryError err -> + Error (err : open_error :> [> open_error ]) + end + + type t = { inventory : Inventory.t } + + (* A lightweight wrapper around creating/opening append only files as chunks *) + module Ao_chunk = struct + let path = Layout.V4.suffix_chunk + + let create_rw ~root ~chunk_idx ~overwrite ~auto_flush_threshold + ~auto_flush_procedure = + let path = path ~root ~chunk_idx in + Ao.create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_procedure + + let open_rw ~root ~chunk_idx ~auto_flush_threshold ~auto_flush_procedure + ~end_poff ~dead_header_size = + let path = path ~root ~chunk_idx in + Ao.open_rw ~path ~end_poff ~dead_header_size ~auto_flush_threshold + ~auto_flush_procedure + + let open_ro ~root ~chunk_idx ~end_poff ~dead_header_size ~is_appendable = + let open Result_syntax in + let path = path ~root ~chunk_idx in + let dead_header_size = if is_appendable then dead_header_size else 0 in + (* Passing [end_poff] is required by Ao for its consistency check. + TODO: We could change its API in the future so we can not do + [Io.size_of_path], which is admittedly very silly.*) + let* end_poff = + if is_appendable then Ok end_poff else Io.size_of_path path + in + Ao.open_ro ~path ~end_poff ~dead_header_size + end + + let create_rw ~root ~start_idx ~overwrite ~auto_flush_threshold + ~auto_flush_procedure = + let open Result_syntax in + let chunk_idx = start_idx in + let+ ao = + Ao_chunk.create_rw ~root ~chunk_idx ~overwrite ~auto_flush_threshold + ~auto_flush_procedure + in + let chunk = { idx = chunk_idx; start_off = Int63.zero; ao } in + let inventory = Inventory.v 1 (Fun.const chunk) in + { inventory } + + let open_rw ~root ~end_poff ~start_idx ~chunk_num ~dead_header_size + ~auto_flush_threshold ~auto_flush_procedure = + let open Result_syntax in + let open_chunk ~chunk_idx ~is_appendable = + match is_appendable with + | true -> + Ao_chunk.open_rw ~root ~chunk_idx ~end_poff ~auto_flush_threshold + ~auto_flush_procedure ~dead_header_size + | false -> + Ao_chunk.open_ro ~root ~chunk_idx ~end_poff ~dead_header_size + ~is_appendable + in + let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in + { inventory } + + let open_ro ~root ~end_poff ~dead_header_size ~start_idx ~chunk_num = + let open Result_syntax in + let open_chunk = Ao_chunk.open_ro ~root ~end_poff ~dead_header_size in + let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in + { inventory } + + let appendable_ao t = (Inventory.appendable t.inventory).ao + let end_poff t = appendable_ao t |> Ao.end_poff + + let read_exn t ~off ~len buf = + let chunk = Inventory.find ~off t.inventory in + let poff = chunk_off_to_poff chunk off in + Ao.read_exn chunk.ao ~off:poff ~len buf + + let append_exn t s = Ao.append_exn (appendable_ao t) s + + let close t = + (* Close immutable chunks, ignoring errors. *) + let _ = + t.inventory + |> Inventory.iter_skip_appendable @@ fun chunk -> + let _ = Ao.close chunk.ao in + () + in + (* Close appendable chunk and keep error since this + is the one that can have a pending flush. *) + appendable_ao t |> Ao.close + + let empty_buffer t = appendable_ao t |> Ao.empty_buffer + let flush t = appendable_ao t |> Ao.flush + let fsync t = appendable_ao t |> Ao.fsync + + let refresh_end_poff t new_end_poff = + Ao.refresh_end_poff (appendable_ao t) new_end_poff + + let readonly t = appendable_ao t |> Ao.readonly + let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold +end diff --git a/src/irmin-pack/unix/chunked_suffix.mli b/src/irmin-pack/unix/chunked_suffix.mli new file mode 100644 index 00000000000..36e443f191b --- /dev/null +++ b/src/irmin-pack/unix/chunked_suffix.mli @@ -0,0 +1,18 @@ +(* + * Copyright (c) 2022-2022 Tarides + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +include Chunked_suffix_intf.Sigs +(** @inline *) diff --git a/src/irmin-pack/unix/chunked_suffix_intf.ml b/src/irmin-pack/unix/chunked_suffix_intf.ml new file mode 100644 index 00000000000..54286e8e877 --- /dev/null +++ b/src/irmin-pack/unix/chunked_suffix_intf.ml @@ -0,0 +1,85 @@ +(* + * Copyright (c) 2022-2022 Tarides + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +open Import + +module type S = sig + (** Abstraction for a chunked suffix. It is functionally equivalent to + {!Append_only_file} but with a chunked implementation that is + parameterized by + + - [start_idx] for {!create_rw} to know the starting file name, and + - [start_idx] and [chunk_num] for the open functions to know the starting + file name and how many files there are. *) + + module Io : Io.S + module Errs : Io_errors.S + module Ao : Append_only_file.S + + type t + type create_error = Io.create_error + + type open_error = + [ Io.open_error + | `Closed + | `Invalid_argument + | `Inconsistent_store + | `Read_out_of_bounds ] + + val create_rw : + root:string -> + start_idx:int -> + overwrite:bool -> + auto_flush_threshold:int -> + auto_flush_procedure:Ao.auto_flush_procedure -> + (t, [> create_error ]) result + + val open_rw : + root:string -> + end_poff:int63 -> + start_idx:int -> + chunk_num:int -> + dead_header_size:int -> + auto_flush_threshold:int -> + auto_flush_procedure:Ao.auto_flush_procedure -> + (t, [> open_error ]) result + + val open_ro : + root:string -> + end_poff:int63 -> + dead_header_size:int -> + start_idx:int -> + chunk_num:int -> + (t, [> open_error ]) result + + val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result + val empty_buffer : t -> bool + val flush : t -> (unit, [> Io.write_error ]) result + val fsync : t -> (unit, [> Io.write_error ]) result + val end_poff : t -> int63 + val read_exn : t -> off:int63 -> len:int -> bytes -> unit + val append_exn : t -> string -> unit + val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result + val readonly : t -> bool + val auto_flush_threshold : t -> int option +end + +module type Sigs = sig + module type S = S + + module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) : + S with module Io = Io and module Errs = Errs +end