Skip to content

Commit

Permalink
piecrust: chunking for writing merkle position file
Browse files Browse the repository at this point in the history
  • Loading branch information
miloszm committed Dec 3, 2024
1 parent d7f8597 commit 2749b8e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 62 deletions.
35 changes: 6 additions & 29 deletions piecrust/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::fs::{create_dir_all, OpenOptions};
use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::sync::{mpsc, Arc, Mutex};
use std::time::SystemTime;
use std::{fs, io, thread};

use dusk_wasmtime::Engine;
Expand Down Expand Up @@ -370,11 +369,11 @@ fn base_path_main<P: AsRef<Path>, S: AsRef<str>>(
fn tree_pos_path_main<P: AsRef<Path>, S: AsRef<str>>(
main_dir: P,
commit_id: S,
) -> io::Result<(PathBuf, PathBuf)> {
) -> io::Result<PathBuf> {
let commit_id = commit_id.as_ref();
let dir = main_dir.as_ref().join(commit_id);
fs::create_dir_all(&dir)?;
Ok((dir.join(TREE_POS_FILE), dir.join(TREE_POS_OPT_FILE)))
Ok(dir.join(TREE_POS_OPT_FILE))
}

fn commit_id_to_hash<S: AsRef<str>>(commit_id: S) -> Hash {
Expand Down Expand Up @@ -1120,37 +1119,15 @@ fn write_commit_inner<P: AsRef<Path>, S: AsRef<str>>(
})?;
fs::write(base_main_path, base_info_bytes)?;

let (tree_pos_main_path, tree_pos_opt_path) =
let tree_pos_opt_path =
tree_pos_path_main(&directories.main_dir, commit_id.as_ref())?;
let start = SystemTime::now();
let tree_pos_bytes = rkyv::to_bytes::<_, 128>(
commit.contracts_merkle.tree_pos(),
)
.map_err(|err| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed serializing tree positions file: {err}"),
)
})?;
fs::write(tree_pos_main_path.clone(), tree_pos_bytes)?;
let stop = SystemTime::now();
println!(
"WRITE TREE POS RKYV FINISHED, ELAPSED TIME={:?}",
stop.duration_since(start).expect("duration should work")
);

let start = SystemTime::now();

let f = OpenOptions::new()
.append(true)
.create(true)
.open(tree_pos_opt_path)?;
let mut buf_f = BufWriter::new(f);
commit.contracts_merkle.tree_pos().marshall(&mut buf_f)?;
let stop = SystemTime::now();
println!(
"WRITE TREE POS BINARY FINISHED, ELAPSED TIME={:?}",
stop.duration_since(start).expect("duration should work")
);

Ok(())
}
Expand Down Expand Up @@ -1223,8 +1200,8 @@ fn finalize_commit<P: AsRef<Path>>(
}

fs::remove_file(base_info_path)?;
fs::remove_file(tree_pos_path)?;
fs::remove_file(tree_pos_opt_path)?;
let _ = fs::remove_file(tree_pos_path);
let _ = fs::remove_file(tree_pos_opt_path);
fs::remove_dir(commit_path)?;

Ok(())
Expand Down
85 changes: 52 additions & 33 deletions piecrust/src/store/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,46 +178,62 @@ impl TreePos {
}

pub fn marshall<W: Write>(&self, w: &mut W) -> io::Result<()> {
const CHUNK_SIZE: usize = 8192;
const ELEM_SIZE: usize = 4 + 32 + 4;
let mut b = [0u8; ELEM_SIZE * CHUNK_SIZE];
let mut chk = 0;
for (k, (h, p)) in self.tree_pos.iter() {
w.write_all(&(*k).to_le_bytes())?;
w.write_all(h.as_bytes())?;
w.write_all(&(*p as u32).to_le_bytes())?;
let offset = chk * ELEM_SIZE;
b[offset..(offset + 4)].copy_from_slice(&(*k).to_le_bytes());
b[(offset + 4)..(offset + 36)].copy_from_slice(h.as_bytes());
b[(offset + 36)..(offset + 40)]
.copy_from_slice(&(*p as u32).to_le_bytes());
chk = (chk + 1) % CHUNK_SIZE;
if chk == 0 {
w.write_all(b.as_slice())?;
}
}
if chk != 0 {
w.write_all(&b[..(chk * ELEM_SIZE)])?;
}
Ok(())
}

fn read_bytes<R: Read, const N: usize>(r: &mut R) -> io::Result<[u8; N]> {
let mut buffer = [0u8; N];
r.read_exact(&mut buffer)?;
Ok(buffer)
}

fn is_eof<T>(r: &io::Result<T>) -> bool {
if let Err(ref e) = r {
if e.kind() == ErrorKind::UnexpectedEof {
return true;
}
}
false
}

pub fn unmarshall<R: Read>(r: &mut R) -> io::Result<Self> {
let mut slf = Self::default();
loop {
let mut buf_k = [0u8; 4];
let e = r.read_exact(&mut buf_k);
if e.as_ref()
.is_err_and(|e| e.kind() == ErrorKind::UnexpectedEof)
{
let res = Self::read_bytes(r);
if Self::is_eof(&res) {
break;
}
e?;
let k = u32::from_le_bytes(buf_k);

let mut buf_h = [0u8; 32];
let e = r.read_exact(&mut buf_h);
if e.as_ref()
.is_err_and(|e| e.kind() == ErrorKind::UnexpectedEof)
{
let k = u32::from_le_bytes(res?);

let res = Self::read_bytes(r);
if Self::is_eof(&res) {
break;
}
e?;
let hash = Hash::from(buf_h);

let mut buf_p = [0u8; 4];
let e = r.read_exact(&mut buf_p);
if e.as_ref()
.is_err_and(|e| e.kind() == ErrorKind::UnexpectedEof)
{
let hash = Hash::from(res?);

let res = Self::read_bytes(r);
if Self::is_eof(&res) {
break;
}
e?;
let p = u32::from_le_bytes(buf_p);
let p = u32::from_le_bytes(res?);
slf.tree_pos.insert(k, (hash, p as u64));
}
Ok(slf)
Expand Down Expand Up @@ -543,18 +559,21 @@ mod tests {

#[test]
fn merkle_position_serialization() -> Result<(), io::Error> {
const TEST_SIZE: u32 = 262144;
const ELEM_SIZE: usize = 4 + 32 + 4;
let mut marshalled = TreePos::default();
let h1 = Hash::from([1u8; 32]);
let h2 = Hash::from([2u8; 32]);
marshalled.insert(2, (h1, 3));
marshalled.insert(4, (h2, 5));
let h = Hash::from([1u8; 32]);
for i in 0..TEST_SIZE {
marshalled.insert(i, (h, i as u64));
}
let v: Vec<u8> = Vec::new();
let mut w = BufWriter::new(v);
let mut w = BufWriter::with_capacity(TEST_SIZE as usize * ELEM_SIZE, v);
marshalled.marshall(&mut w)?;
let mut r = BufReader::new(w.buffer());
let unmarshalled = TreePos::unmarshall(&mut r)?;
assert_eq!(unmarshalled.tree_pos.get(&2), Some(&(h1, 3)));
assert_eq!(unmarshalled.tree_pos.get(&4), Some(&(h2, 5)));
for i in 0..TEST_SIZE {
assert_eq!(unmarshalled.tree_pos.get(&i), Some(&(h, i as u64)));
}
Ok(())
}
}

0 comments on commit 2749b8e

Please sign in to comment.