From 73d66eadf8961aa3951ef9e20874481c3c1f6c7f Mon Sep 17 00:00:00 2001 From: Radu Marias Date: Thu, 16 May 2024 06:17:07 +0300 Subject: [PATCH] add cache for decrypting dir entry names add cache for reading meta for dir entries do some operations concurrently when creating a nod on encryption use block index as AAD to make sure blocks are not reordered add bech tests force unmount --- README.md | 12 +- examples/okay_wal.rs | 140 ++++++ src/crypto.rs | 97 +--- src/crypto/reader.rs | 485 +++++++++---------- src/crypto/writer.rs | 806 ++++++++++++++++---------------- src/encryptedfs.rs | 575 ++++++++++++++--------- src/encryptedfs/test.rs | 353 +++++++++++++- src/encryptedfs/to_move_test.rs | 18 +- src/encryptedfs_fuse3.rs | 64 ++- src/expire_value.rs | 2 +- src/lib.rs | 4 + src/main.rs | 62 ++- src/test_util.rs | 10 + 13 files changed, 1599 insertions(+), 1029 deletions(-) create mode 100644 examples/okay_wal.rs create mode 100644 src/test_util.rs diff --git a/README.md b/README.md index d81439c7..9fa0bd49 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ An encrypted file system that mounts with FUSE on Linux. It can be used to create encrypted directories. -You can then safely backup the encrypted folder on an untrusted server without worrying about the data being exposed.\ +You can then safely backup the encrypted folder on an untrusted server without worrying about the data being exposed. You can also store it in any cloud storage like Google Drive, Dropbox, etc. and have it synced across multiple devices. [![rencfs-bin](https://img.shields.io/aur/version/rencfs-bin?color=1793d1&label=rencfs-bin&logo=arch-linux)](https://aur.archlinux.org/packages/rencfs-bin/) @@ -14,7 +14,7 @@ You can also store it in any cloud storage like Google Drive, Dropbox, etc. and > ⚠️ **Warning** > ***This is early in development. Please do not use it with sensitive data just yet. Please wait for a -stable release.\ +stable release. > It's mostly ideal for experimental and learning projects.*** # Functionality @@ -117,10 +117,10 @@ You can specify the encryption algorithm adding this argument to the command lin --cipher CIPHER ``` -Where `CIPHER` is the encryption algorithm.\ +Where `CIPHER` is the encryption algorithm. You can check the available ciphers with `rencfs --help`. -Default values are `ChaCha20` and `600_000` respectively. +Default value is `ChaCha20Poly1305`. ### Log level @@ -235,7 +235,7 @@ cargo run -- --mount-point MOUNT_POINT --data-dir DATA_DIR ## Developing inside a Container -See here how to configure for [VsCode](https://code.visualstudio.com/docs/devcontainers/containers)\ +See here how to configure for [VsCode](https://code.visualstudio.com/docs/devcontainers/containers) And here for [RustRover](https://www.jetbrains.com/help/rust/connect-to-devcontainer.html) You can use the `.devcontainer` directory from the project to start a container with all the necessary tools to build @@ -268,7 +268,7 @@ sharing pull requests are always appreciated. see [here](https://pubs.opengroup.org/onlinepubs/009695399/functions/rename.html) `That specification requires that the action of the function be atomic.` - Phantom reads: reading older content from a file, this is not possible. While writing, data is kept in a buffer and tmp file and on releasing the file handle we write the new content to the file (as per above the tmp file is moved - into place with `mv`). After that we reset all opened readers so any reads after that will pick up the new content\ + into place with `mv`). After that we reset all opened readers so any reads after that will pick up the new content One problem that may occur is if we do a truncate we change the content of the file but the process is killed before we write the metadata with the new filesize. In this case next time we mount the system we are still seeing the old filesize but the content of the file could be bigger, and we read until the old size offset, se we would not pick up diff --git a/examples/okay_wal.rs b/examples/okay_wal.rs new file mode 100644 index 00000000..96cec3db --- /dev/null +++ b/examples/okay_wal.rs @@ -0,0 +1,140 @@ +use std::io::{self, Read}; + +use okaywal::{Entry, EntryId, LogManager, ReadChunkResult, SegmentReader, WriteAheadLog}; + +fn main() -> io::Result<()> { + // begin rustme snippet: readme-example + // Open a log using an Checkpointer that echoes the information passed into each + // function that the Checkpointer trait defines. + let log = WriteAheadLog::recover("/tmp/my-log", LoggingCheckpointer)?; + log.checkpoint_active()?; + + // Begin writing an entry to the log. + let mut writer = log.begin_entry()?; + + // Each entry is one or more chunks of data. Each chunk can be individually + // addressed using its LogPosition. + for i in 0..10 { + writer.write_chunk(format!("this is the {} entry", i).as_bytes())?; + } + // let record = writer.write_chunk("this is the first entry".as_bytes())?; + + // To fully flush all written bytes to disk and make the new entry + // resilliant to a crash, the writer must be committed. + writer.commit()?; + // end rustme snippet + + // log.checkpoint_active()?; + + // Let's reopen the log. During this process, + // LoggingCheckpointer::should_recover_segment will be invoked for each segment + // file that has not been checkpointed yet. In this example, it will be called + // once. Once the Checkpointer confirms the data should be recovered, + // LoggingCheckpointer::recover will be invoked once for each entry in the WAL + // that hasn't been previously checkpointed. + // drop(log); + // let log = WriteAheadLog::recover("/tmp/my-log", LoggingCheckpointer)?; + + // We can use the previously returned DataRecord to read the original data. + // let mut reader = log.read_at(record.position)?; + // let mut buffer = vec![0; usize::try_from(record.length).unwrap()]; + // reader.read_exact(&mut buffer)?; + // println!( + // "Data read from log: {}", + // String::from_utf8(buffer).expect("invalid utf-8") + // ); + + // Cleanup + // drop(reader); + // drop(log); + // std::fs::remove_dir_all("my-log")?; + + log.shutdown()?; + // drop(log); + + Ok(()) +} + +#[derive(Debug)] +struct LoggingCheckpointer; + +impl LogManager for LoggingCheckpointer { + fn recover(&mut self, entry: &mut Entry<'_>) -> io::Result<()> { + // This example uses read_all_chunks to load the entire entry into + // memory for simplicity. The crate also supports reading each chunk + // individually to minimize memory usage. + if let Some(all_chunks) = entry.read_all_chunks()? { + // Convert the Vec's to Strings. + let all_chunks = all_chunks + .into_iter() + .map(String::from_utf8) + .collect::, _>>() + .expect("invalid utf-8"); + println!( + "LoggingCheckpointer::recover(entry_id: {:?}, data: {:?})", + entry.id(), + all_chunks, + ); + } else { + // This entry wasn't completely written. This could happen if a + // power outage or crash occurs while writing an entry. + } + + Ok(()) + } + + fn checkpoint_to( + &mut self, + last_checkpointed_id: EntryId, + _checkpointed_entries: &mut SegmentReader, + _wal: &WriteAheadLog, + ) -> io::Result<()> { + // checkpoint_to is called once enough data has been written to the + // WriteAheadLog. After this function returns, the log will recycle the + // file containing the entries being checkpointed. + // + // This function is where the entries must be persisted to the storage + // layer the WriteAheadLog is sitting in front of. To ensure ACID + // compliance of the combination of the WAL and the storage layer, the + // storage layer must be fully resilliant to losing any changes made by + // the checkpointed entries before this function returns. + println!("LoggingCheckpointer::checkpoint_to({last_checkpointed_id:?}"); + while let Some(mut entry) = _checkpointed_entries.read_entry()? { + println!( + "LoggingCheckpointer::checkpoint_to(entry_id: {:?})", + entry.id() + ); + while let chunk = entry.read_chunk() { + match chunk { + Ok(res) => match res { + ReadChunkResult::Chunk(chunk) => { + println!( + "LoggingCheckpointer::checkpoint_to(chunk bytes: {:?})", + chunk.bytes_remaining() + ); + } + ReadChunkResult::EndOfEntry => { + break; + } + ReadChunkResult::AbortedEntry => { + continue; + } + }, + Err(err) => {} + } + } + } + Ok(()) + } +} + +#[test] +fn test() -> io::Result<()> { + // Clean up any previous runs of this example. + // let path = std::path::Path::new("/tmp/my-log"); + // if path.exists() { + // std::fs::remove_dir_all("/tmp/my-log")?; + // } + + main() +} diff --git a/src/crypto.rs b/src/crypto.rs index 6479158d..2a7df47e 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -25,13 +25,10 @@ use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt}; use tracing::{debug, error, instrument}; -use crate::crypto::reader::{ - ChunkedFileCryptoReader, CryptoReader, FileCryptoReader, RingCryptoReader, -}; +use crate::crypto::reader::{CryptoReader, FileCryptoReader, RingCryptoReader}; use crate::crypto::writer::{ - ChunkedTmpFileCryptoWriter, CryptoWriter, CryptoWriterSeek, FileCryptoWriter, - FileCryptoWriterCallback, FileCryptoWriterMetadataProvider, RingCryptoWriter, - SequenceLockProvider, + CryptoWriter, CryptoWriterSeek, FileCryptoWriter, FileCryptoWriterCallback, + FileCryptoWriterMetadataProvider, RingCryptoWriter, }; use crate::encryptedfs::FsResult; use crate::{fs_util, stream_util}; @@ -124,10 +121,10 @@ pub fn create_writer( /// **`callback`** is called when the file content changes. It receives the position from where the file content changed and the last write position /// -/// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do\ +/// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do /// You need to provide the same lock to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same lock /// -/// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file\ +/// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file /// If the file exists or is created before flushing, in worse case scenarios, it can reduce the overall write speed by half, so it's recommended to provide it #[allow(clippy::missing_errors_doc)] pub fn create_file_writer( @@ -148,48 +145,6 @@ pub fn create_file_writer( )?)) } -/// **`callback`** is called when the file content changes. It receives the position from where the file content changed and the last write position -/// -/// **`locks`** is used to write lock the chunks files when accessing them. This ensures that we have exclusive write to a given chunk when we need to change it's content\ -/// If not provided, it will not ensure that other instances are not accessing the chunks while we do\ -/// You need to provide the same locks to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same locks -/// -/// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file\ -/// If the file exists or is created before flushing, in worse case scenarios, it can reduce the overall write speed by half, so it's recommended to provide it\ -#[allow(clippy::missing_errors_doc)] -pub fn create_chunked_tmp_file_writer( - file_dir: &Path, - cipher: Cipher, - key: Arc>, - callback: Option>, - locks: Option>>, - metadata_provider: Option>, -) -> io::Result>> { - Ok(Box::new(ChunkedTmpFileCryptoWriter::new( - file_dir, - cipher, - key, - callback, - locks, - metadata_provider, - )?)) -} - -/// **`locks`** is used to read lock the chunks files when accessing them. This ensures offer multiple reads but exclusive writes to a given chunk\ -/// If not provided, it will not ensure that other instances are not writing the chunks while we read them\ -/// You need to provide the same locks to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same locks -#[allow(clippy::missing_errors_doc)] -pub fn create_chunked_file_reader( - file_dir: &Path, - cipher: Cipher, - key: Arc>, - locks: Option>>, -) -> io::Result> { - Ok(Box::new(ChunkedFileCryptoReader::new( - file_dir, cipher, key, locks, - )?)) -} - fn create_ring_writer( writer: W, cipher: Cipher, @@ -214,23 +169,6 @@ fn create_ring_reader( RingCryptoReader::new(reader, algorithm, key) } -// fn _create_cryptostream_crypto_writer(mut file: File, cipher: &Cipher, key: &SecretVec) -> impl CryptoWriter { -// let iv_len = match cipher { -// Cipher::ChaCha20 => 16, -// Cipher::Aes256Gcm => 16, -// }; -// let mut iv: Vec = vec![0; iv_len]; -// if file.metadata().unwrap().size() == 0 { -// // generate random IV -// thread_rng().fill_bytes(&mut iv); -// file.write_all(&iv).unwrap(); -// } else { -// // read IV from file -// file.read_exact(&mut iv).unwrap(); -// } -// CryptostreamCryptoWriter::new(file, get_cipher(cipher), &key.expose_secret(), &iv).unwrap() -// } - pub fn create_reader( reader: R, cipher: Cipher, @@ -239,7 +177,7 @@ pub fn create_reader( create_ring_reader(reader, cipher, key) } -/// **`lock`** is used to read lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we read\ +/// **`lock`** is used to read lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we read /// You need to provide the same lock to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same lock #[allow(clippy::missing_errors_doc)] pub fn create_file_reader( @@ -251,29 +189,6 @@ pub fn create_file_reader( Ok(Box::new(FileCryptoReader::new(file, cipher, key, lock)?)) } -// fn _create_cryptostream_crypto_reader(mut file: File, cipher: &Cipher, key: &SecretVec) -> CryptostreamCryptoReader { -// let iv_len = match cipher { -// Cipher::ChaCha20 => 16, -// Cipher::Aes256Gcm => 16, -// }; -// let mut iv: Vec = vec![0; iv_len]; -// if file.metadata().unwrap().size() == 0 { -// // generate random IV -// thread_rng().fill_bytes(&mut iv); -// file.write_all(&iv).map_err(|err| { -// error!("{err}"); -// err -// }).unwrap(); -// } else { -// // read IV from file -// file.read_exact(&mut iv).map_err(|err| { -// error!("{err}"); -// err -// }).unwrap(); -// } -// CryptostreamCryptoReader::new(file, get_cipher(cipher), &key.expose_secret(), &iv).unwrap() -// } - #[allow(clippy::missing_errors_doc)] pub fn encrypt_string(s: &SecretString, cipher: Cipher, key: Arc>) -> Result { let mut cursor = io::Cursor::new(vec![]); diff --git a/src/crypto/reader.rs b/src/crypto/reader.rs index d6c99c66..06d0d65d 100644 --- a/src/crypto/reader.rs +++ b/src/crypto/reader.rs @@ -15,7 +15,7 @@ use tracing::{debug, error, instrument, warn}; use crate::arc_hashmap::Holder; use crate::crypto::buf_mut::BufMut; -use crate::crypto::writer::{SequenceLockProvider, BUF_SIZE, CHUNK_SIZE, WHOLE_FILE_CHUNK_INDEX}; +use crate::crypto::writer::BUF_SIZE; use crate::crypto::Cipher; use crate::{crypto, stream_util}; @@ -149,6 +149,7 @@ impl Read for RingCryptoReader { return Ok(0); } let data = &mut buffer[..len]; + let aad = Aad::from((self.pos / BUF_SIZE as u64).to_le_bytes()); // extract nonce self.last_nonce .lock() @@ -157,7 +158,7 @@ impl Read for RingCryptoReader { let data = &mut data[NONCE_LEN..]; let plaintext = self .opening_key - .open_within(Aad::empty(), data, 0..) + .open_within(aad, data, 0..) .map_err(|err| { error!("error opening within: {}", err); io::Error::new(io::ErrorKind::Other, "error opening within") @@ -231,7 +232,7 @@ pub struct FileCryptoReader { } impl FileCryptoReader { - /// **`lock`** is used to read lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we read\ + /// **`lock`** is used to read lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we read /// You need to provide the same lock to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same lock #[allow(clippy::missing_errors_doc)] pub fn new( @@ -307,245 +308,245 @@ impl Seek for FileCryptoReader { impl CryptoReader for FileCryptoReader {} -/// Chunked reader -/// File is split into chunks files. This reader iterates over the chunks and reads them one by one. - -#[allow(clippy::module_name_repetitions)] -pub struct ChunkedFileCryptoReader { - file_dir: PathBuf, - reader: Option>, - cipher: Cipher, - key: Arc>, - locks: Option>>, - chunk_size: u64, - chunk_index: u64, -} - -impl ChunkedFileCryptoReader { - /// **`locks`** is used to read lock the chunks files when accessing them. This ensures offer multiple reads but exclusive writes to a given chunk\ - /// If not provided, it will not ensure that other instances are not writing the chunks while we read them\ - /// You need to provide the same locks to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same locks - #[allow(clippy::missing_errors_doc)] - pub fn new( - file_dir: &Path, - cipher: Cipher, - key: Arc>, - locks: Option>>, - ) -> io::Result { - Ok(Self { - file_dir: file_dir.to_owned(), - reader: Self::try_create_reader( - 0, - CHUNK_SIZE, - file_dir.to_owned(), - cipher, - key.clone(), - &locks, - )?, - cipher, - key, - locks, - chunk_size: CHUNK_SIZE, - chunk_index: 0, - }) - } - - fn try_create_reader( - pos: u64, - chunk_size: u64, - file_dir: PathBuf, - cipher: Cipher, - key: Arc>, - locks: &Option>>, - ) -> io::Result>> { - let chunk_index = pos / chunk_size; - let chunk_file = file_dir.join(chunk_index.to_string()); - if !chunk_file.exists() { - return Ok(None); - } - Ok(Some(Self::create_reader( - pos, - chunk_size, - file_dir.to_owned(), - cipher, - key.clone(), - locks, - )?)) - } - - fn create_reader( - pos: u64, - chunk_size: u64, - file_dir: PathBuf, - cipher: Cipher, - key: Arc>, - locks: &Option>>, - ) -> io::Result> { - let chunk_index = pos / chunk_size; - let chunk_file = file_dir.join(chunk_index.to_string()); - Ok(crypto::create_file_reader( - &chunk_file, - cipher, - key.clone(), - locks.as_ref().map(|lock| lock.get(pos / chunk_size)), - )?) - } - - fn pos(&mut self) -> io::Result { - if self.reader.is_none() { - return Ok(self.chunk_index * self.chunk_size); - } - Ok(self.chunk_index * self.chunk_size + self.reader.as_mut().unwrap().stream_position()?) - } -} - -impl Read for ChunkedFileCryptoReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if buf.is_empty() { - return Ok(0); - } - - // obtain a read lock to whole file, we ue a special value to indicate this. - // this helps if someone is truncating the file while we are using it, they will to a write lock - let mut _lock = None; - let _guard_all = { - if let Some(locks) = &self.locks { - _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); - Some(_lock.as_ref().unwrap().read()) - } else { - None - } - }; - - if self.reader.is_none() { - // create the reader - let current_pos = self.pos()?; - self.reader = Self::try_create_reader( - current_pos, - self.chunk_size, - self.file_dir.to_owned(), - self.cipher, - self.key.clone(), - &self.locks, - )?; - } - if self.reader.is_none() { - // we don't have any more chunks - return Ok(0); - } - - debug!(len = buf.len().to_formatted_string(&Locale::en), "reading"); - let mut len = self.reader.as_mut().unwrap().read(buf)?; - - if len == 0 { - debug!("switching to next chunk"); - self.chunk_index += 1; - self.reader = Self::try_create_reader( - self.chunk_index * self.chunk_size, - self.chunk_size, - self.file_dir.to_owned(), - self.cipher, - self.key.clone(), - &self.locks, - )?; - if let Some(reader) = &mut self.reader { - debug!(len = len.to_formatted_string(&Locale::en), "reading"); - len = reader.read(buf)?; - } - } - - Ok(len) - } -} - -impl Seek for ChunkedFileCryptoReader { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - let new_pos = match pos { - SeekFrom::Start(pos) => pos, - SeekFrom::End(_) => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "can't seek from end", - )) - } - SeekFrom::Current(pos) => { - let new_pos = self.pos()? as i64 + pos; - if new_pos < 0 { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "can't seek before start", - )); - } - new_pos as u64 - } - }; - if self.pos()? != new_pos { - debug!( - pos = self.pos()?.to_formatted_string(&Locale::en), - new_pos = new_pos.to_formatted_string(&Locale::en), - "seeking" - ); - - // obtain a read lock to whole file, we ue a special value to indicate this. - // this helps if someone is truncating the file while we are using it, they will use a write lock - let mut _lock = None; - let _guard_all = { - if let Some(locks) = &self.locks { - _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); - Some(_lock.as_ref().unwrap().read()) - } else { - None - } - }; - - if self.reader.is_none() { - // create the reader - let current_pos = self.pos()?; - self.reader = Some(Self::create_reader( - current_pos, - self.chunk_size, - self.file_dir.to_owned(), - self.cipher, - self.key.clone(), - &self.locks, - )?); - } - let pos = self.pos()?; - if self.chunk_index == new_pos / self.chunk_size { - // seek in current chunk as much as we can - let reader = self.reader.as_mut().unwrap(); - let new_pos_in_chunk = new_pos % self.chunk_size; - reader.seek(SeekFrom::Start(new_pos_in_chunk))?; - } else { - // we need to switch to another chunk - debug!("switching to another chunk"); - self.reader = Self::try_create_reader( - new_pos, - self.chunk_size, - self.file_dir.to_owned(), - self.cipher, - self.key.clone(), - &self.locks, - )?; - if self.reader.is_none() { - return Ok(pos); - } - let reader = self.reader.as_mut().unwrap(); - // seek in chunk - let new_pos_in_chunk = new_pos % self.chunk_size; - debug!( - new_pos_in_chunk = new_pos_in_chunk.to_formatted_string(&Locale::en), - "seeking in new chunk" - ); - reader.seek(SeekFrom::Start(new_pos_in_chunk))?; - self.chunk_index = new_pos / self.chunk_size; - } - } - Ok(self.pos()?) - } -} - -impl CryptoReader for ChunkedFileCryptoReader {} +// /// Chunked reader +// /// File is split into chunks files. This reader iterates over the chunks and reads them one by one. +// +// #[allow(clippy::module_name_repetitions)] +// pub struct ChunkedFileCryptoReader { +// file_dir: PathBuf, +// reader: Option>, +// cipher: Cipher, +// key: Arc>, +// locks: Option>>, +// chunk_size: u64, +// chunk_index: u64, +// } +// +// impl ChunkedFileCryptoReader { +// /// **`locks`** is used to read lock the chunks files when accessing them. This ensures offer multiple reads but exclusive writes to a given chunk +// /// If not provided, it will not ensure that other instances are not writing the chunks while we read them +// /// You need to provide the same locks to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same locks +// #[allow(clippy::missing_errors_doc)] +// pub fn new( +// file_dir: &Path, +// cipher: Cipher, +// key: Arc>, +// locks: Option>>, +// ) -> io::Result { +// Ok(Self { +// file_dir: file_dir.to_owned(), +// reader: Self::try_create_reader( +// 0, +// CHUNK_SIZE, +// file_dir.to_owned(), +// cipher, +// key.clone(), +// &locks, +// )?, +// cipher, +// key, +// locks, +// chunk_size: CHUNK_SIZE, +// chunk_index: 0, +// }) +// } +// +// fn try_create_reader( +// pos: u64, +// chunk_size: u64, +// file_dir: PathBuf, +// cipher: Cipher, +// key: Arc>, +// locks: &Option>>, +// ) -> io::Result>> { +// let chunk_index = pos / chunk_size; +// let chunk_file = file_dir.join(chunk_index.to_string()); +// if !chunk_file.exists() { +// return Ok(None); +// } +// Ok(Some(Self::create_reader( +// pos, +// chunk_size, +// file_dir.to_owned(), +// cipher, +// key.clone(), +// locks, +// )?)) +// } +// +// fn create_reader( +// pos: u64, +// chunk_size: u64, +// file_dir: PathBuf, +// cipher: Cipher, +// key: Arc>, +// locks: &Option>>, +// ) -> io::Result> { +// let chunk_index = pos / chunk_size; +// let chunk_file = file_dir.join(chunk_index.to_string()); +// Ok(crypto::create_file_reader( +// &chunk_file, +// cipher, +// key.clone(), +// locks.as_ref().map(|lock| lock.get(pos / chunk_size)), +// )?) +// } +// +// fn pos(&mut self) -> io::Result { +// if self.reader.is_none() { +// return Ok(self.chunk_index * self.chunk_size); +// } +// Ok(self.chunk_index * self.chunk_size + self.reader.as_mut().unwrap().stream_position()?) +// } +// } +// +// impl Read for ChunkedFileCryptoReader { +// fn read(&mut self, buf: &mut [u8]) -> io::Result { +// if buf.is_empty() { +// return Ok(0); +// } +// +// // obtain a read lock to whole file, we ue a special value to indicate this. +// // this helps if someone is truncating the file while we are using it, they will to a write lock +// let mut _lock = None; +// let _guard_all = { +// if let Some(locks) = &self.locks { +// _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); +// Some(_lock.as_ref().unwrap().read()) +// } else { +// None +// } +// }; +// +// if self.reader.is_none() { +// // create the reader +// let current_pos = self.pos()?; +// self.reader = Self::try_create_reader( +// current_pos, +// self.chunk_size, +// self.file_dir.to_owned(), +// self.cipher, +// self.key.clone(), +// &self.locks, +// )?; +// } +// if self.reader.is_none() { +// // we don't have any more chunks +// return Ok(0); +// } +// +// debug!(len = buf.len().to_formatted_string(&Locale::en), "reading"); +// let mut len = self.reader.as_mut().unwrap().read(buf)?; +// +// if len == 0 { +// debug!("switching to next chunk"); +// self.chunk_index += 1; +// self.reader = Self::try_create_reader( +// self.chunk_index * self.chunk_size, +// self.chunk_size, +// self.file_dir.to_owned(), +// self.cipher, +// self.key.clone(), +// &self.locks, +// )?; +// if let Some(reader) = &mut self.reader { +// debug!(len = len.to_formatted_string(&Locale::en), "reading"); +// len = reader.read(buf)?; +// } +// } +// +// Ok(len) +// } +// } +// +// impl Seek for ChunkedFileCryptoReader { +// fn seek(&mut self, pos: SeekFrom) -> io::Result { +// let new_pos = match pos { +// SeekFrom::Start(pos) => pos, +// SeekFrom::End(_) => { +// return Err(io::Error::new( +// io::ErrorKind::InvalidInput, +// "can't seek from end", +// )) +// } +// SeekFrom::Current(pos) => { +// let new_pos = self.pos()? as i64 + pos; +// if new_pos < 0 { +// return Err(io::Error::new( +// io::ErrorKind::InvalidInput, +// "can't seek before start", +// )); +// } +// new_pos as u64 +// } +// }; +// if self.pos()? != new_pos { +// debug!( +// pos = self.pos()?.to_formatted_string(&Locale::en), +// new_pos = new_pos.to_formatted_string(&Locale::en), +// "seeking" +// ); +// +// // obtain a read lock to whole file, we ue a special value to indicate this. +// // this helps if someone is truncating the file while we are using it, they will use a write lock +// let mut _lock = None; +// let _guard_all = { +// if let Some(locks) = &self.locks { +// _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); +// Some(_lock.as_ref().unwrap().read()) +// } else { +// None +// } +// }; +// +// if self.reader.is_none() { +// // create the reader +// let current_pos = self.pos()?; +// self.reader = Some(Self::create_reader( +// current_pos, +// self.chunk_size, +// self.file_dir.to_owned(), +// self.cipher, +// self.key.clone(), +// &self.locks, +// )?); +// } +// let pos = self.pos()?; +// if self.chunk_index == new_pos / self.chunk_size { +// // seek in current chunk as much as we can +// let reader = self.reader.as_mut().unwrap(); +// let new_pos_in_chunk = new_pos % self.chunk_size; +// reader.seek(SeekFrom::Start(new_pos_in_chunk))?; +// } else { +// // we need to switch to another chunk +// debug!("switching to another chunk"); +// self.reader = Self::try_create_reader( +// new_pos, +// self.chunk_size, +// self.file_dir.to_owned(), +// self.cipher, +// self.key.clone(), +// &self.locks, +// )?; +// if self.reader.is_none() { +// return Ok(pos); +// } +// let reader = self.reader.as_mut().unwrap(); +// // seek in chunk +// let new_pos_in_chunk = new_pos % self.chunk_size; +// debug!( +// new_pos_in_chunk = new_pos_in_chunk.to_formatted_string(&Locale::en), +// "seeking in new chunk" +// ); +// reader.seek(SeekFrom::Start(new_pos_in_chunk))?; +// self.chunk_index = new_pos / self.chunk_size; +// } +// } +// Ok(self.pos()?) +// } +// } +// +// impl CryptoReader for ChunkedFileCryptoReader {} #[cfg(test)] mod test { diff --git a/src/crypto/writer.rs b/src/crypto/writer.rs index 998b7087..3fb0d9b3 100644 --- a/src/crypto/writer.rs +++ b/src/crypto/writer.rs @@ -2,10 +2,10 @@ use atomic_write_file::AtomicWriteFile; use num_format::{Locale, ToFormattedString}; use parking_lot::RwLock; use std::fs::File; +use std::io; use std::io::{BufWriter, Error, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; -use std::{fs, io}; use crate::arc_hashmap::Holder; use crate::{crypto, fs_util, stream_util}; @@ -70,6 +70,7 @@ pub struct RingCryptoWriter { sealing_key: SealingKey, buf: BufMut, nonce_sequence: Arc>, + block_index: u64, } impl RingCryptoWriter { @@ -85,6 +86,7 @@ impl RingCryptoWriter { sealing_key, buf, nonce_sequence, + block_index: 0, } } } @@ -126,9 +128,10 @@ impl Write for RingCryptoWriter { impl RingCryptoWriter { fn encrypt_and_write(&mut self) -> io::Result<()> { let data = self.buf.as_mut(); + let aad = Aad::from(self.block_index.to_le_bytes()); let tag = self .sealing_key - .seal_in_place_separate_tag(Aad::empty(), data) + .seal_in_place_separate_tag(aad, data) .map_err(|err| { error!("error sealing in place: {}", err); io::Error::from(io::ErrorKind::Other) @@ -143,6 +146,7 @@ impl RingCryptoWriter { out.write_all(data)?; self.buf.clear(); out.write_all(tag.as_ref())?; + self.block_index += 1; Ok(()) } } @@ -241,10 +245,10 @@ pub struct FileCryptoWriter { impl FileCryptoWriter { /// **`callback`** is called when the file content changes. It receives the position from where the file content changed and the last write position /// - /// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do\ + /// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do /// You need to provide the same lock to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same lock /// - /// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file\ + /// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file /// If the file exists or is created before flushing, in worse case scenarios, it can reduce the overall write speed by half, so it's recommended to provide it #[allow(clippy::missing_errors_doc)] pub fn new( @@ -453,400 +457,400 @@ impl Seek for FileCryptoWriter { impl CryptoWriterSeek for FileCryptoWriter {} -// todo: expose as param -#[cfg(test)] -pub(crate) const CHUNK_SIZE: u64 = 1024; // 1K for tests -#[cfg(not(test))] -// pub(crate) const CHUNK_SIZE: u64 = 16 * 1024 * 1024; // 64M -pub(crate) const CHUNK_SIZE: u64 = 512 * 1024; - -// use this when we want to lock the whole file -pub const WHOLE_FILE_CHUNK_INDEX: u64 = u64::MAX - 42_u64; - -pub trait SequenceLockProvider: Send + Sync { - fn get(&self, index: u64) -> Holder>; -} - -#[allow(clippy::module_name_repetitions)] -pub struct ChunkedTmpFileCryptoWriter { - file_dir: PathBuf, - cipher: Cipher, - key: Arc>, - callback: Option>>, - chunk_size: u64, - chunk_index: u64, - writer: Option>>, - locks: Option>>, - metadata_provider: Option>>, -} - -struct CallbackWrapper(Arc>, u64); -impl FileCryptoWriterCallback for CallbackWrapper { - fn on_file_content_changed( - &self, - changed_from_pos: i64, - last_write_pos: u64, - ) -> io::Result<()> { - self.0 - .on_file_content_changed(self.1 as i64 + changed_from_pos, self.1 + last_write_pos) - } -} - -struct FileCryptoWriterMetadataProviderImpl { - chunk_index: u64, - chunk_size: u64, - file_dir: PathBuf, - provider: Arc>, -} -impl FileCryptoWriterMetadataProvider for FileCryptoWriterMetadataProviderImpl { - fn size(&self) -> io::Result { - let mut size = self.provider.size()?; - // check if we are in the last chunk - let path = Path::new(&self.file_dir).join((self.chunk_index + 1).to_string()); - if !path.exists() { - // we are in the last chunk, size is remaining after multiple of chunk size - size %= self.chunk_size; - } else { - // we are NOT in the last chunk, size is a full chunk size - size = self.chunk_size; - } - Ok(size) - } -} - -// todo: create traits for lock and metadata provider and don't use [`Guard`] -impl ChunkedTmpFileCryptoWriter { - /// **`callback`** is called when the file content changes. It receives the position from where the file content changed and the last write position - /// - /// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do\ - /// You need to provide the same lock to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same lock - /// - /// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file\ - /// If the file exists or is created before flushing, in worse case scenarios, it can reduce the overall write speed by half, so it's recommended to provide it - pub fn new( - file_dir: &Path, - cipher: Cipher, - key: Arc>, - callback: Option>, - locks: Option>>, - metadata_provider: Option>, - ) -> io::Result { - if !file_dir.exists() { - fs::create_dir_all(file_dir)?; - } - Ok(Self { - file_dir: file_dir.to_owned(), - cipher, - key: key.clone(), - callback: callback.map(|c| Arc::new(c)), - chunk_size: CHUNK_SIZE, - chunk_index: 0, - writer: None, - locks, - metadata_provider: metadata_provider.map(|m| Arc::new(m)), - }) - } - - fn create_new_writer(&mut self, pos: u64) -> io::Result>> { - Self::create_writer( - pos, - &self.file_dir, - self.cipher, - self.key.clone(), - self.chunk_size, - &self.locks, - self.callback.clone(), - self.metadata_provider.as_ref().map(|m| { - Box::new(FileCryptoWriterMetadataProviderImpl { - chunk_size: self.chunk_size, - chunk_index: pos / self.chunk_size, - file_dir: self.file_dir.clone(), - provider: m.clone(), - }) as Box - }), - ) - } - - fn create_writer( - pos: u64, - file_dir: &Path, - cipher: Cipher, - key: Arc>, - chunk_size: u64, - locks: &Option>>, - callback: Option>>, - metadata_provider: Option>, - ) -> io::Result>> { - let chunk_index = pos / chunk_size; - debug!( - chunk_index = chunk_index.to_formatted_string(&Locale::en), - "creating new writer" - ); - let chunk_file = file_dir.join(chunk_index.to_string()); - { - let mut _lock = None; - let mut _lock2 = None; - let (_g1, _g2) = if let Some(locks) = locks { - _lock = Some(locks.get(chunk_index)); - let guard = _lock.as_ref().unwrap().write(); - // obtain a write lock to whole file, we ue a special value to indicate this. - _lock2 = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); - let guard_all = _lock2.as_ref().unwrap().read(); - (Some(guard), Some(guard_all)) - } else { - (None, None) - }; - - if !chunk_file.exists() { - File::create(&chunk_file)?; - } - } - crypto::create_file_writer( - chunk_file.as_path(), - cipher, - key.clone(), - callback.as_ref().map(|c| { - Box::new(CallbackWrapper(c.clone(), pos / chunk_size)) - as Box - }), - locks.as_ref().map(|lock| lock.get(chunk_index)), - metadata_provider, - ) - } - - fn seek_from_start(&mut self, pos: u64) -> io::Result { - if pos == self.pos()? { - return Ok(pos); - } - debug!(pos = pos.to_formatted_string(&Locale::en), "seeking"); - - // obtain a read lock to whole file, we ue a special value to indicate this. - // this helps if someone is truncating the file while we are using it, they will to a write lock - let mut _lock = None; - let _guard_all = { - if let Some(locks) = &self.locks { - _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); - Some(_lock.as_ref().unwrap().read()) - } else { - None - } - }; - - let new_chunk_index = pos / self.chunk_size; - if pos == 0 { - // reset the writer if we seek at the beginning to pick up any filesize changes - if let Some(mut writer) = self.writer.take() { - writer.flush()?; - writer.finish()?; - } - self.writer = Some(self.create_new_writer(pos)?); - } else { - if self.chunk_index != new_chunk_index { - // we need to switch to a new chunk - debug!( - chunk_index = new_chunk_index.to_formatted_string(&Locale::en), - "switching to new chunk" - ); - if self.chunk_index < new_chunk_index { - // we need to seek forward, maybe we don't yet have chunks created until new chunk - // in that case create them and fill up with zeros - if self.writer.is_none() { - let current_pos = self.pos()?; - self.writer = Some(self.create_new_writer(current_pos)?); - } - // first seek in current chunk to the end to fill up with zeros as needed - self.writer - .as_mut() - .unwrap() - .seek(SeekFrom::Start(self.chunk_size))?; - // iterate through all chunks until new chunk and create missing ones - for i in self.chunk_index + 1..new_chunk_index { - let current_pos = i * self.chunk_size; - if !self.chunk_exists(i) { - let mut writer = self.create_new_writer(current_pos)?; - writer.seek(SeekFrom::Start(self.chunk_size))?; // fill up with zeros - writer.flush()?; - writer.finish()?; - } - } - } - // finish any existing writer - if let Some(mut writer) = self.writer.take() { - writer.flush()?; - writer.finish()?; - } - } - // seeking in current chunk - let offset_in_chunk = pos % self.chunk_size; - debug!( - offset_in_chunk = offset_in_chunk.to_formatted_string(&Locale::en), - "seeking in chunk" - ); - if self.writer.is_none() { - self.writer = Some(self.create_new_writer(pos)?); - } - self.writer - .as_mut() - .unwrap() - .seek(SeekFrom::Start(offset_in_chunk))?; - self.chunk_index = pos / self.chunk_size; - } - Ok(pos) - } - - fn chunk_exists(&self, chunk_index: u64) -> bool { - let path = self.file_dir.join(chunk_index.to_string()); - path.exists() - } - - fn pos(&mut self) -> io::Result { - if self.writer.is_none() { - self.writer = Some(self.create_new_writer(self.chunk_index * self.chunk_size)?); - } - Ok(self.chunk_index * self.chunk_size + self.writer.as_mut().unwrap().stream_position()?) - } -} - -impl Write for ChunkedTmpFileCryptoWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - debug!( - pos = self.pos()?.to_formatted_string(&Locale::en), - chunk_index = self.chunk_index.to_formatted_string(&Locale::en), - "writing {} bytes", - buf.len().to_formatted_string(&Locale::en) - ); - if buf.is_empty() { - return Ok(0); - } - - // obtain a read lock to whole file, we ue a special value to indicate this. - // this helps if someone is truncating the file while we are using it, they will to a write lock - let mut _lock = None; - let _guard_all = if let Some(locks) = &self.locks { - _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); - Some(_lock.as_ref().unwrap().read()) - } else { - None - }; - - let mut buf = &buf[..]; - let mut written = 0_u64; - loop { - let current_pos = self.pos()?; - if self.writer.is_none() { - let pos = current_pos; - self.writer = Some(self.create_new_writer(pos)?); - } - - let remaining = self.chunk_size - self.writer.as_mut().unwrap().stream_position()?; - let (current_buf, next_buf) = if buf.len() > remaining as usize { - // buf expands to next chunk, split it - debug!( - at = remaining.to_formatted_string(&Locale::en), - pos = self.pos()?.to_formatted_string(&Locale::en), - chunk_index = self.chunk_index.to_formatted_string(&Locale::en), - "splitting buf" - ); - let (buf1, buf2) = buf.split_at(remaining as usize); - (buf1, Some(buf2)) - } else { - (buf, None) - }; - - // write current buf - match self.writer.as_mut().unwrap().write(current_buf) { - Ok(len) => { - written += len as u64; - if len < current_buf.len() && next_buf.is_some() { - // we didn't write all the current buf, but we have a next buf also, return early - return Ok(written as usize + len); - } - } - Err(err) => { - error!("error writing to chunk: {}", err); - return Err(err); - } - } - - let remaining = self.chunk_size - self.writer.as_mut().unwrap().stream_position()?; - if remaining == 0 { - // flush and finish current chunk - if let Err(err) = self.writer.as_mut().unwrap().flush() { - error!("error flushing chunk: {}", err); - return Err(err); - } - if let Err(err) = self.writer.as_mut().unwrap().finish() { - error!("error finishing chunk: {}", err); - return Err(err); - } - self.writer.take(); - self.chunk_index += 1; - } - - if next_buf.is_none() { - // we're done writing - return Ok(written as usize); - } else { - // prepare writing to next chunk - debug!( - pos = self.pos()?.to_formatted_string(&Locale::en), - len = next_buf - .as_ref() - .unwrap() - .len() - .to_formatted_string(&Locale::en), - chunk_index = self.chunk_index.to_formatted_string(&Locale::en), - "writing to next chunk" - ); - buf = next_buf.unwrap(); - } - } - } - - fn flush(&mut self) -> io::Result<()> { - if let Some(writer) = self.writer.as_mut() { - writer.flush()?; - } - Ok(()) - } -} - -impl CryptoWriter for ChunkedTmpFileCryptoWriter { - fn finish(&mut self) -> io::Result { - if let Some(mut writer) = self.writer.take() { - let _ = writer.flush(); - let _ = writer.finish(); - } - - let path = self.file_dir.join(0.to_string()); - if !path.exists() { - File::create(&path)?; - } - Ok(File::open(path)?) - } -} - -impl Seek for ChunkedTmpFileCryptoWriter { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - let new_pos = match pos { - SeekFrom::Start(pos) => pos as i64, - SeekFrom::End(_) => { - return Err(Error::new( - io::ErrorKind::Other, - "seek from end not supported", - )) - } - SeekFrom::Current(pos) => self.pos()? as i64 + pos, - }; - if new_pos < 0 { - return Err(Error::new( - io::ErrorKind::InvalidInput, - "can't seek before start", - )); - } - self.seek_from_start(new_pos as u64)?; - Ok(self.pos()?) - } -} - -impl CryptoWriterSeek for ChunkedTmpFileCryptoWriter {} +// // todo: expose as param +// #[cfg(test)] +// pub(crate) const CHUNK_SIZE: u64 = 1024; // 1K for tests +// #[cfg(not(test))] +// // pub(crate) const CHUNK_SIZE: u64 = 16 * 1024 * 1024; // 64M +// pub(crate) const CHUNK_SIZE: u64 = 512 * 1024; +// +// // use this when we want to lock the whole file +// pub const WHOLE_FILE_CHUNK_INDEX: u64 = u64::MAX - 42_u64; +// +// pub trait SequenceLockProvider: Send + Sync { +// fn get(&self, index: u64) -> Holder>; +// } +// +// #[allow(clippy::module_name_repetitions)] +// pub struct ChunkedTmpFileCryptoWriter { +// file_dir: PathBuf, +// cipher: Cipher, +// key: Arc>, +// callback: Option>>, +// chunk_size: u64, +// chunk_index: u64, +// writer: Option>>, +// locks: Option>>, +// metadata_provider: Option>>, +// } +// +// struct CallbackWrapper(Arc>, u64); +// impl FileCryptoWriterCallback for CallbackWrapper { +// fn on_file_content_changed( +// &self, +// changed_from_pos: i64, +// last_write_pos: u64, +// ) -> io::Result<()> { +// self.0 +// .on_file_content_changed(self.1 as i64 + changed_from_pos, self.1 + last_write_pos) +// } +// } +// +// struct FileCryptoWriterMetadataProviderImpl { +// chunk_index: u64, +// chunk_size: u64, +// file_dir: PathBuf, +// provider: Arc>, +// } +// impl FileCryptoWriterMetadataProvider for FileCryptoWriterMetadataProviderImpl { +// fn size(&self) -> io::Result { +// let mut size = self.provider.size()?; +// // check if we are in the last chunk +// let path = Path::new(&self.file_dir).join((self.chunk_index + 1).to_string()); +// if !path.exists() { +// // we are in the last chunk, size is remaining after multiple of chunk size +// size %= self.chunk_size; +// } else { +// // we are NOT in the last chunk, size is a full chunk size +// size = self.chunk_size; +// } +// Ok(size) +// } +// } +// +// // todo: create traits for lock and metadata provider and don't use [`Guard`] +// impl ChunkedTmpFileCryptoWriter { +// /// **`callback`** is called when the file content changes. It receives the position from where the file content changed and the last write position +// /// +// /// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do +// /// You need to provide the same lock to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same lock +// /// +// /// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file +// /// If the file exists or is created before flushing, in worse case scenarios, it can reduce the overall write speed by half, so it's recommended to provide it +// pub fn new( +// file_dir: &Path, +// cipher: Cipher, +// key: Arc>, +// callback: Option>, +// locks: Option>>, +// metadata_provider: Option>, +// ) -> io::Result { +// if !file_dir.exists() { +// fs::create_dir_all(file_dir)?; +// } +// Ok(Self { +// file_dir: file_dir.to_owned(), +// cipher, +// key: key.clone(), +// callback: callback.map(|c| Arc::new(c)), +// chunk_size: CHUNK_SIZE, +// chunk_index: 0, +// writer: None, +// locks, +// metadata_provider: metadata_provider.map(|m| Arc::new(m)), +// }) +// } +// +// fn create_new_writer(&mut self, pos: u64) -> io::Result>> { +// Self::create_writer( +// pos, +// &self.file_dir, +// self.cipher, +// self.key.clone(), +// self.chunk_size, +// &self.locks, +// self.callback.clone(), +// self.metadata_provider.as_ref().map(|m| { +// Box::new(FileCryptoWriterMetadataProviderImpl { +// chunk_size: self.chunk_size, +// chunk_index: pos / self.chunk_size, +// file_dir: self.file_dir.clone(), +// provider: m.clone(), +// }) as Box +// }), +// ) +// } +// +// fn create_writer( +// pos: u64, +// file_dir: &Path, +// cipher: Cipher, +// key: Arc>, +// chunk_size: u64, +// locks: &Option>>, +// callback: Option>>, +// metadata_provider: Option>, +// ) -> io::Result>> { +// let chunk_index = pos / chunk_size; +// debug!( +// chunk_index = chunk_index.to_formatted_string(&Locale::en), +// "creating new writer" +// ); +// let chunk_file = file_dir.join(chunk_index.to_string()); +// { +// let mut _lock = None; +// let mut _lock2 = None; +// let (_g1, _g2) = if let Some(locks) = locks { +// _lock = Some(locks.get(chunk_index)); +// let guard = _lock.as_ref().unwrap().write(); +// // obtain a write lock to whole file, we ue a special value to indicate this. +// _lock2 = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); +// let guard_all = _lock2.as_ref().unwrap().read(); +// (Some(guard), Some(guard_all)) +// } else { +// (None, None) +// }; +// +// if !chunk_file.exists() { +// File::create(&chunk_file)?; +// } +// } +// crypto::create_file_writer( +// chunk_file.as_path(), +// cipher, +// key.clone(), +// callback.as_ref().map(|c| { +// Box::new(CallbackWrapper(c.clone(), pos / chunk_size)) +// as Box +// }), +// locks.as_ref().map(|lock| lock.get(chunk_index)), +// metadata_provider, +// ) +// } +// +// fn seek_from_start(&mut self, pos: u64) -> io::Result { +// if pos == self.pos()? { +// return Ok(pos); +// } +// debug!(pos = pos.to_formatted_string(&Locale::en), "seeking"); +// +// // obtain a read lock to whole file, we ue a special value to indicate this. +// // this helps if someone is truncating the file while we are using it, they will to a write lock +// let mut _lock = None; +// let _guard_all = { +// if let Some(locks) = &self.locks { +// _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); +// Some(_lock.as_ref().unwrap().read()) +// } else { +// None +// } +// }; +// +// let new_chunk_index = pos / self.chunk_size; +// if pos == 0 { +// // reset the writer if we seek at the beginning to pick up any filesize changes +// if let Some(mut writer) = self.writer.take() { +// writer.flush()?; +// writer.finish()?; +// } +// self.writer = Some(self.create_new_writer(pos)?); +// } else { +// if self.chunk_index != new_chunk_index { +// // we need to switch to a new chunk +// debug!( +// chunk_index = new_chunk_index.to_formatted_string(&Locale::en), +// "switching to new chunk" +// ); +// if self.chunk_index < new_chunk_index { +// // we need to seek forward, maybe we don't yet have chunks created until new chunk +// // in that case create them and fill up with zeros +// if self.writer.is_none() { +// let current_pos = self.pos()?; +// self.writer = Some(self.create_new_writer(current_pos)?); +// } +// // first seek in current chunk to the end to fill up with zeros as needed +// self.writer +// .as_mut() +// .unwrap() +// .seek(SeekFrom::Start(self.chunk_size))?; +// // iterate through all chunks until new chunk and create missing ones +// for i in self.chunk_index + 1..new_chunk_index { +// let current_pos = i * self.chunk_size; +// if !self.chunk_exists(i) { +// let mut writer = self.create_new_writer(current_pos)?; +// writer.seek(SeekFrom::Start(self.chunk_size))?; // fill up with zeros +// writer.flush()?; +// writer.finish()?; +// } +// } +// } +// // finish any existing writer +// if let Some(mut writer) = self.writer.take() { +// writer.flush()?; +// writer.finish()?; +// } +// } +// // seeking in current chunk +// let offset_in_chunk = pos % self.chunk_size; +// debug!( +// offset_in_chunk = offset_in_chunk.to_formatted_string(&Locale::en), +// "seeking in chunk" +// ); +// if self.writer.is_none() { +// self.writer = Some(self.create_new_writer(pos)?); +// } +// self.writer +// .as_mut() +// .unwrap() +// .seek(SeekFrom::Start(offset_in_chunk))?; +// self.chunk_index = pos / self.chunk_size; +// } +// Ok(pos) +// } +// +// fn chunk_exists(&self, chunk_index: u64) -> bool { +// let path = self.file_dir.join(chunk_index.to_string()); +// path.exists() +// } +// +// fn pos(&mut self) -> io::Result { +// if self.writer.is_none() { +// self.writer = Some(self.create_new_writer(self.chunk_index * self.chunk_size)?); +// } +// Ok(self.chunk_index * self.chunk_size + self.writer.as_mut().unwrap().stream_position()?) +// } +// } +// +// impl Write for ChunkedTmpFileCryptoWriter { +// fn write(&mut self, buf: &[u8]) -> io::Result { +// debug!( +// pos = self.pos()?.to_formatted_string(&Locale::en), +// chunk_index = self.chunk_index.to_formatted_string(&Locale::en), +// "writing {} bytes", +// buf.len().to_formatted_string(&Locale::en) +// ); +// if buf.is_empty() { +// return Ok(0); +// } +// +// // obtain a read lock to whole file, we ue a special value to indicate this. +// // this helps if someone is truncating the file while we are using it, they will to a write lock +// let mut _lock = None; +// let _guard_all = if let Some(locks) = &self.locks { +// _lock = Some(locks.get(WHOLE_FILE_CHUNK_INDEX)); +// Some(_lock.as_ref().unwrap().read()) +// } else { +// None +// }; +// +// let mut buf = &buf[..]; +// let mut written = 0_u64; +// loop { +// let current_pos = self.pos()?; +// if self.writer.is_none() { +// let pos = current_pos; +// self.writer = Some(self.create_new_writer(pos)?); +// } +// +// let remaining = self.chunk_size - self.writer.as_mut().unwrap().stream_position()?; +// let (current_buf, next_buf) = if buf.len() > remaining as usize { +// // buf expands to next chunk, split it +// debug!( +// at = remaining.to_formatted_string(&Locale::en), +// pos = self.pos()?.to_formatted_string(&Locale::en), +// chunk_index = self.chunk_index.to_formatted_string(&Locale::en), +// "splitting buf" +// ); +// let (buf1, buf2) = buf.split_at(remaining as usize); +// (buf1, Some(buf2)) +// } else { +// (buf, None) +// }; +// +// // write current buf +// match self.writer.as_mut().unwrap().write(current_buf) { +// Ok(len) => { +// written += len as u64; +// if len < current_buf.len() && next_buf.is_some() { +// // we didn't write all the current buf, but we have a next buf also, return early +// return Ok(written as usize + len); +// } +// } +// Err(err) => { +// error!("error writing to chunk: {}", err); +// return Err(err); +// } +// } +// +// let remaining = self.chunk_size - self.writer.as_mut().unwrap().stream_position()?; +// if remaining == 0 { +// // flush and finish current chunk +// if let Err(err) = self.writer.as_mut().unwrap().flush() { +// error!("error flushing chunk: {}", err); +// return Err(err); +// } +// if let Err(err) = self.writer.as_mut().unwrap().finish() { +// error!("error finishing chunk: {}", err); +// return Err(err); +// } +// self.writer.take(); +// self.chunk_index += 1; +// } +// +// if next_buf.is_none() { +// // we're done writing +// return Ok(written as usize); +// } else { +// // prepare writing to next chunk +// debug!( +// pos = self.pos()?.to_formatted_string(&Locale::en), +// len = next_buf +// .as_ref() +// .unwrap() +// .len() +// .to_formatted_string(&Locale::en), +// chunk_index = self.chunk_index.to_formatted_string(&Locale::en), +// "writing to next chunk" +// ); +// buf = next_buf.unwrap(); +// } +// } +// } +// +// fn flush(&mut self) -> io::Result<()> { +// if let Some(writer) = self.writer.as_mut() { +// writer.flush()?; +// } +// Ok(()) +// } +// } +// +// impl CryptoWriter for ChunkedTmpFileCryptoWriter { +// fn finish(&mut self) -> io::Result { +// if let Some(mut writer) = self.writer.take() { +// let _ = writer.flush(); +// let _ = writer.finish(); +// } +// +// let path = self.file_dir.join(0.to_string()); +// if !path.exists() { +// File::create(&path)?; +// } +// Ok(File::open(path)?) +// } +// } +// +// impl Seek for ChunkedTmpFileCryptoWriter { +// fn seek(&mut self, pos: SeekFrom) -> io::Result { +// let new_pos = match pos { +// SeekFrom::Start(pos) => pos as i64, +// SeekFrom::End(_) => { +// return Err(Error::new( +// io::ErrorKind::Other, +// "seek from end not supported", +// )) +// } +// SeekFrom::Current(pos) => self.pos()? as i64 + pos, +// }; +// if new_pos < 0 { +// return Err(Error::new( +// io::ErrorKind::InvalidInput, +// "can't seek before start", +// )); +// } +// self.seek_from_start(new_pos as u64)?; +// Ok(self.pos()?) +// } +// } +// +// impl CryptoWriterSeek for ChunkedTmpFileCryptoWriter {} diff --git a/src/encryptedfs.rs b/src/encryptedfs.rs index 834c2957..913f8561 100644 --- a/src/encryptedfs.rs +++ b/src/encryptedfs.rs @@ -1,5 +1,6 @@ use std::cmp::max; use std::collections::{HashMap, HashSet, VecDeque}; +use std::convert::Infallible; use std::fmt::Debug; use std::fs::{DirEntry, File, OpenOptions, ReadDir}; use std::io::ErrorKind::Other; @@ -22,22 +23,22 @@ use secrecy::{ExposeSecret, SecretString, SecretVec}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use thiserror::Error; use tokio::sync::{Mutex, RwLock}; +use tokio::task::JoinError; use tokio_stream::wrappers::ReadDirStream; -use tracing::{debug, error, instrument, warn}; +use tracing::{debug, error, info, instrument, warn}; use crate::arc_hashmap::{ArcHashMap, Holder}; use crate::async_util::call_async; use crate::crypto::reader::CryptoReader; use crate::crypto::writer::{ CryptoWriter, CryptoWriterSeek, FileCryptoWriterCallback, FileCryptoWriterMetadataProvider, - SequenceLockProvider, }; use crate::crypto::Cipher; use crate::expire_value::{ExpireValue, Provider}; use crate::{crypto, fs_util, stream_util}; #[cfg(test)] -mod test; +pub(crate) mod test; pub(crate) const INODES_DIR: &str = "inodes"; pub(crate) const CONTENTS_DIR: &str = "contents"; @@ -62,7 +63,7 @@ async fn reset_handles( // if we wrote pass the filesize we need to update the filesize if last_write_pos > attr.size { attr.size = last_write_pos; - fs.write_inode_to_storage(&attr, fs.key.get().await?)?; + fs.write_inode_to_storage(&attr).await?; } } fs.reset_handles(ino, changed_from_pos, Some(fh)).await?; @@ -70,7 +71,12 @@ async fn reset_handles( } async fn get_metadata(fs: Arc, ino: u64) -> FsResult { - let mut guard = fs.attr_cache.lock(); + let lock = fs + .attr_cache + .get() + .await + .map_err(|_| FsError::Other("should not happen"))?; + let mut guard = lock.lock(); let attr = guard.get(&ino); if let Some(attr) = attr { Ok(*attr) @@ -334,6 +340,12 @@ pub enum FsError { source: ParseIntError, // backtrace: Backtrace, }, + #[error("tokio join error: {source}")] + JoinError { + #[from] + source: JoinError, + // backtrace: Backtrace, + }, #[error("max filesize exceeded, max allowed {0}")] MaxFilesizeExceeded(usize), } @@ -389,7 +401,7 @@ impl From for SetFileAttr { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DirectoryEntry { pub ino: u64, pub name: SecretString, @@ -531,6 +543,26 @@ pub trait PasswordProvider: Send + Sync + 'static { fn get_password(&self) -> Option; } +struct DirEntriesCacheProvider {} +impl Provider>, Infallible> + for DirEntriesCacheProvider +{ + fn provide(&self) -> Result>, Infallible> { + Ok(parking_lot::Mutex::new(LruCache::new( + NonZeroUsize::new(1000).unwrap(), + ))) + } +} + +struct AttrCacheProvider {} +impl Provider>, Infallible> for AttrCacheProvider { + fn provide(&self) -> Result>, Infallible> { + Ok(parking_lot::Mutex::new(LruCache::new( + NonZeroUsize::new(1000).unwrap(), + ))) + } +} + /// Encrypted FS that stores encrypted files in a dedicated directory with a specific structure based on `inode`. pub struct EncryptedFs { pub(crate) data_dir: PathBuf, @@ -550,12 +582,22 @@ pub struct EncryptedFs { serialize_update_inode_locks: Mutex>>, // use std::sync::RwLock instead of tokio::sync::RwLock because we need to use it also in sync code in `DirectoryEntryIterator` and `DirectoryEntryPlusIterator` // todo: remove external std::sync::RwLock, ArcHashMap is thread safe - serialize_dir_entries_locks: + serialize_dir_entries_ls_locks: + Arc>>>, + // todo: remove external std::sync::RwLock, ArcHashMap is thread safe + serialize_dir_entries_hash_locks: Arc>>>, read_write_locks: ArcHashMap>, key: ExpireValue, FsError, KeyProvider>, self_weak: std::sync::Mutex>>, - attr_cache: parking_lot::Mutex>, + attr_cache: + ExpireValue>, Infallible, AttrCacheProvider>, + dir_entries_name_cache: ExpireValue< + parking_lot::Mutex>, + Infallible, + DirEntriesCacheProvider, + >, + dir_entries_meta_cache: parking_lot::Mutex>, } impl EncryptedFs { @@ -584,21 +626,35 @@ impl EncryptedFs { opened_files_for_write: RwLock::new(HashMap::new()), serialize_inode_locks: Arc::new(parking_lot::RwLock::new(ArcHashMap::default())), serialize_update_inode_locks: Mutex::new(ArcHashMap::default()), - serialize_dir_entries_locks: Arc::new(parking_lot::RwLock::new(ArcHashMap::default())), + serialize_dir_entries_ls_locks: Arc::new(parking_lot::RwLock::new( + ArcHashMap::default(), + )), + serialize_dir_entries_hash_locks: Arc::new(parking_lot::RwLock::new( + ArcHashMap::default(), + )), // todo: take duration from param key: ExpireValue::new(key_provider, Duration::from_secs(10 * 60)), self_weak: std::sync::Mutex::new(None), read_write_locks: ArcHashMap::default(), - attr_cache: parking_lot::Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap())), + // todo: take duration from param + attr_cache: ExpireValue::new(AttrCacheProvider {}, Duration::from_secs(10 * 60)), + dir_entries_name_cache: ExpireValue::new( + DirEntriesCacheProvider {}, + Duration::from_secs(10 * 60), + ), + dir_entries_meta_cache: parking_lot::Mutex::new(LruCache::new( + NonZeroUsize::new(1000).unwrap(), + )), }; - fs.ensure_root_exists().await?; - let arc = Arc::new(fs); arc.self_weak .lock() .expect("cannot obtain lock") .replace(Arc::downgrade(&arc)); + + arc.ensure_root_exists().await?; + Ok(arc) } @@ -631,70 +687,119 @@ impl EncryptedFs { if !self.node_exists(parent) { return Err(FsError::InodeNotFound); } - if self.find_by_name(parent, name).await?.is_some() { + if self.exists_by_name(parent, name).await? { return Err(FsError::AlreadyExists); } let mut attr: FileAttr = create_attr.into(); attr.ino = self.generate_next_inode(); + let fs = self + .self_weak + .lock() + .unwrap() + .as_mut() + .unwrap() + .upgrade() + .unwrap(); + let mut handles = vec![]; + // write inode - self.write_inode_to_storage(&attr, self.key.get().await?)?; + let self_clone = fs.clone(); + let attr_clone = attr; + // handles.push(tokio::spawn(async move { + self_clone.write_inode_to_storage(&attr_clone).await?; + // Ok::<(), FsError>(()) + // })); match attr.kind { FileType::RegularFile => { - // create in contents directory - let file = tokio::fs::File::create(self.contents_path(attr.ino)).await?; - file.sync_all().await?; + let self_clone = fs.clone(); + handles.push(tokio::spawn(async move { + // create in contents directory + let file = File::create(self_clone.contents_path(attr.ino))?; + // sync_all file and parent + // these operations are a bit slow, but are needed to make sure the file is correctly created + // i.e. creating 100 files takes 0.965 sec with sync_all and 0.130 sec without + file.sync_all()?; + File::open( + self_clone + .contents_path(attr.ino) + .parent() + .expect("we don't have parent"), + )? + .sync_all()?; + Ok::<(), FsError>(()) + })); } FileType::Directory => { - // create in contents directory - let contents_dir = self.contents_path(attr.ino); - tokio::fs::create_dir(contents_dir.clone()).await?; - // used to keep encrypted file names used by [`read_dir`] and [`read_dir_plus`] - tokio::fs::create_dir(contents_dir.join(LS_DIR)).await?; - // used to keep hashes of encrypted file names used by [`exists_by_name`] and [`find_by_name`] - // this optimizes the search process as we don't need to decrypt all file names and search - tokio::fs::create_dir(contents_dir.join(HASH_DIR)).await?; - // add "." and ".." entries - self.insert_directory_entry( - attr.ino, - &DirectoryEntry { - ino: attr.ino, - name: SecretString::from_str("$.").expect("cannot parse"), - kind: FileType::Directory, - }, - self.key.get().await?, - )?; - self.insert_directory_entry( - attr.ino, - &DirectoryEntry { - ino: parent, - name: SecretString::from_str("$..").expect("cannot parse"), - kind: FileType::Directory, - }, - self.key.get().await?, - )?; + let self_clone = fs.clone(); + handles.push(tokio::spawn(async move { + // create in contents directory + let contents_dir = self_clone.contents_path(attr.ino); + fs::create_dir(contents_dir.clone())?; + // used to keep encrypted file names used by [`read_dir`] and [`read_dir_plus`] + fs::create_dir(contents_dir.join(LS_DIR))?; + // used to keep hashes of encrypted file names used by [`exists_by_name`] and [`find_by_name`] + // this optimizes the search process as we don't need to decrypt all file names and search + fs::create_dir(contents_dir.join(HASH_DIR))?; + + // add "." and ".." entries + self_clone + .insert_directory_entry( + attr_clone.ino, + &DirectoryEntry { + ino: attr_clone.ino, + name: SecretString::from_str("$.").expect("cannot parse"), + kind: FileType::Directory, + }, + ) + .await?; + self_clone + .insert_directory_entry( + attr_clone.ino, + &DirectoryEntry { + ino: parent, + name: SecretString::from_str("$..").expect("cannot parse"), + kind: FileType::Directory, + }, + ) + .await?; + Ok::<(), FsError>(()) + })); } } // edd entry in parent directory, used for listing - self.insert_directory_entry( - parent, - &DirectoryEntry { - ino: attr.ino, - name: SecretString::new(name.expose_secret().to_owned()), - kind: attr.kind, - }, - self.key.get().await?, - )?; - self.update_inode( - parent, - SetFileAttr::default() - .with_mtime(SystemTime::now()) - .with_ctime(SystemTime::now()), - ) - .await?; + let self_clone = fs.clone(); + let attr_clone = attr; + let name_clone = name.clone(); + handles.push(tokio::spawn(async move { + self_clone + .insert_directory_entry( + parent, + &DirectoryEntry { + ino: attr_clone.ino, + name: name_clone, + kind: attr_clone.kind, + }, + ) + .await?; + Ok::<(), FsError>(()) + })); + + let self_clone = fs.clone(); + handles.push(tokio::spawn(async move { + self_clone + .update_inode( + parent, + SetFileAttr::default() + .with_mtime(SystemTime::now()) + .with_ctime(SystemTime::now()), + ) + .await?; + Ok::<(), FsError>(()) + })); let handle = if attr.kind == FileType::RegularFile { if read || write { @@ -708,6 +813,11 @@ impl EncryptedFs { 0 }; + // wait for all tasks to finish + for h in handles { + h.await??; + } + Ok((handle, attr)) } @@ -730,26 +840,17 @@ impl EncryptedFs { return Ok(None); } let lock = self - .serialize_dir_entries_locks + .serialize_dir_entries_hash_locks .read() .get_or_insert_with(hash_path.to_str().unwrap().to_string(), || { parking_lot::RwLock::new(false) }); - let _guard = lock.read(); - let hash_file = File::open(hash_path)?; - let entry: (u64, FileType) = bincode::deserialize_from(crypto::create_reader( - hash_file, - self.cipher, - self.key.get().await?, - ))?; - let buf = self.ino_file(entry.0); - let path = buf.as_path(); - let attr_file = File::open(path)?; - Ok(Some(bincode::deserialize_from(crypto::create_reader( - attr_file, - self.cipher, - self.key.get().await?, - ))?)) + let guard = lock.read(); + let (ino, _, _): (u64, FileType, String) = bincode::deserialize_from( + crypto::create_reader(File::open(hash_path)?, self.cipher, self.key.get().await?), + )?; + drop(guard); + self.get_inode_from_cache_or_storage(ino).await.map(Some) } /// Count children of a directory. This **EXCLUDES** "." and "..". @@ -758,7 +859,15 @@ impl EncryptedFs { if !self.is_dir(ino).await? { return Err(FsError::InvalidInodeType); } - Ok(fs::read_dir(self.contents_path(ino).join(LS_DIR))?.count()) + let mut count = fs::read_dir(self.contents_path(ino).join(LS_DIR))?.count(); + if ino == ROOT_INODE { + // we don't count "." + count -= 1; + } else { + // we don't count "." and ".." + count -= 2; + } + Ok(count) } #[allow(clippy::missing_panics_doc)] @@ -862,10 +971,9 @@ impl EncryptedFs { if !self.is_dir(parent).await? { return Err(FsError::InvalidInodeType); } - Ok(self.read_dir(parent, 0).await?.any(|entry| { - let entry = entry.expect("cannot read entry"); - entry.name.expose_secret() == name.expose_secret() - })) + let hash = hex::encode(crypto::hash_secret_string(name)); + let hash_path = self.contents_path(parent).join(HASH_DIR).join(hash); + Ok(hash_path.is_file()) } #[allow(clippy::missing_errors_doc)] @@ -901,30 +1009,13 @@ impl EncryptedFs { entry: io::Result, ) -> FsResult { let entry = self.create_directory_entry(entry).await?; - let lock_ino = self .serialize_inode_locks .clone() .read() .get_or_insert_with(entry.ino, || parking_lot::RwLock::new(false)); let _guard_ino = lock_ino.read(); - - let file = File::open(self.ino_file(entry.ino)); - if let Err(e) = file { - error!(err = %e, "opening file"); - return Err(e.into()); - } - let file = file.unwrap(); - let attr = bincode::deserialize_from(crypto::create_reader( - file, - self.cipher, - self.key.get().await?, - )); - if let Err(e) = attr { - error!(err = %e, "deserializing file attr"); - return Err(e.into()); - } - let attr = attr.unwrap(); + let attr = self.get_inode_from_cache_or_storage(entry.ino).await?; Ok(DirectoryEntryPlus { ino: entry.ino, name: entry.name, @@ -952,7 +1043,7 @@ impl EncryptedFs { }; tokio::spawn(async move { fs.create_directory_entry_plus(entry).await }) }) - .skip(offset) + // .skip(offset) .collect(); // do these futures in parallel and return them @@ -975,19 +1066,12 @@ impl EncryptedFs { return Err(e.into()); } let entry = entry.unwrap(); + let file_path = entry.path().to_str().unwrap().to_string(); let lock = self - .serialize_dir_entries_locks + .serialize_dir_entries_ls_locks .read() - .get_or_insert_with(entry.path().to_str().unwrap().to_string(), || { - parking_lot::RwLock::new(false) - }); + .get_or_insert_with(file_path.clone(), || parking_lot::RwLock::new(false)); let _guard = lock.read(); - let file = File::open(entry.path()); - if let Err(e) = file { - error!(err = %e, "opening file"); - return Err(e.into()); - } - let file = file.unwrap(); let name = entry.file_name().to_string_lossy().to_string(); let name = { @@ -996,28 +1080,64 @@ impl EncryptedFs { } else if name == "$.." { SecretString::from_str("..").unwrap() } else { - let name = crypto::decrypt_file_name(&name, self.cipher, self.key.get().await?) - .map_err(|err| { - error!(err = %err, "decrypting file name"); - err - }); - if name.is_err() { - return Err(FsError::InvalidInput("invalid file name")); + // try from cache + let lock = self.get_dir_entries_name_cache().await?; + let mut cache = lock.lock(); + if let Some(name_cached) = cache.get(&name).cloned() { + name_cached + } else { + drop(cache); + if let Ok(decrypted_name) = + crypto::decrypt_file_name(&name, self.cipher, self.key.get().await?) + .map_err(|err| { + error!(err = %err, "decrypting file name"); + err + }) + { + lock.lock().put(name.clone(), decrypted_name.clone()); + decrypted_name + } else { + return Err(FsError::InvalidInput("invalid file name")); + } } - name.unwrap() } }; - let res: bincode::Result<(u64, FileType)> = bincode::deserialize_from( - crypto::create_reader(file, self.cipher, self.key.get().await?), - ); - if let Err(e) = res { - error!(err = %e, "deserializing directory entry"); - return Err(e.into()); + // try from cache + let mut cache = self.dir_entries_meta_cache.lock(); + if let Some(meta) = cache.get(&file_path) { + return Ok(DirectoryEntry { + ino: meta.0, + name, + kind: meta.1, + }); + } else { + drop(cache); + let file = File::open(entry.path())?; + let res: bincode::Result<(u64, FileType)> = bincode::deserialize_from( + crypto::create_reader(file, self.cipher, self.key.get().await?), + ); + if let Err(e) = res { + error!(err = %e, "deserializing directory entry"); + return Err(e.into()); + } + let (ino, kind): (u64, FileType) = res.unwrap(); + // add to cache + self.dir_entries_meta_cache + .lock() + .put(file_path, (ino, kind)); + Ok(DirectoryEntry { ino, name, kind }) } - let (ino, kind): (u64, FileType) = res.unwrap(); + } - Ok(DirectoryEntry { ino, name, kind }) + async fn get_dir_entries_name_cache( + &self, + ) -> FsResult>>> { + Ok(self + .dir_entries_name_cache + .get() + .await + .map_err(|_| FsError::Other("should not happen"))?) } async fn create_directory_entry_iterator( @@ -1039,7 +1159,7 @@ impl EncryptedFs { }; tokio::spawn(async move { fs.create_directory_entry(entry).await }) }) - .skip(offset) + // .skip(offset) .collect(); // do these futures in parallel and return them @@ -1074,7 +1194,12 @@ impl EncryptedFs { } async fn get_inode_from_cache_or_storage(&self, ino: u64) -> FsResult { - let mut guard = self.attr_cache.lock(); + let lock = self + .attr_cache + .get() + .await + .map_err(|_| FsError::Other("should not happen"))?; + let mut guard = lock.lock(); let attr = guard.get(&ino); if let Some(attr) = attr { Ok(*attr) @@ -1131,24 +1256,31 @@ impl EncryptedFs { let mut attr = self.get_inode(ino).await?; merge_attr(&mut attr, &set_attr); - self.write_inode_to_storage(&attr, self.key.get().await?)?; + self.write_inode_to_storage(&attr).await?; Ok(()) } - fn write_inode_to_storage( - &self, - attr: &FileAttr, - key: Arc>, - ) -> Result<(), FsError> { + async fn write_inode_to_storage(&self, attr: &FileAttr) -> Result<(), FsError> { let lock = self .serialize_inode_locks .write() .get_or_insert_with(attr.ino, || parking_lot::RwLock::new(false)); - let _guard = lock.write(); - crypto::atomic_serialize_encrypt_into(&self.ino_file(attr.ino), attr, self.cipher, key)?; + let guard = lock.write(); + crypto::atomic_serialize_encrypt_into( + &self.ino_file(attr.ino), + attr, + self.cipher, + self.key.get().await?, + )?; + drop(guard); // update cache also { - let mut guard = self.attr_cache.lock(); + let lock = self + .attr_cache + .get() + .await + .map_err(|_| FsError::Other("should not happen"))?; + let mut guard = lock.lock(); guard.put(attr.ino, attr.clone()); } Ok(()) @@ -1296,12 +1428,12 @@ impl EncryptedFs { self.write_handles.read().await.contains_key(&fh) } - /// Writes the contents of `buf` to the file at `ino` starting at `offset`.\ + /// Writes the contents of `buf` to the file at `ino` starting at `offset`. /// Depending on the encryption type we might need to re-write bytes until the 'offset', in some case even /// from the beginning of the file to the desired `offset`. This will slow down the write operation if we - /// write to very distanced offsets.\ - /// The most speed is obtained when we write sequentially from the beginning of the file.\ - /// If we write outside of file size, we fill up with zeros until offset.\ + /// write to very distanced offsets. + /// The most speed is obtained when we write sequentially from the beginning of the file. + /// If we write outside of file size, we fill up with zeros until offset. /// If the file is not opened for write, it will return an error of type ['FsError::InvalidFileHandle']. #[instrument(skip(self, buf))] pub async fn write(&self, ino: u64, offset: u64, buf: &[u8], handle: u64) -> FsResult { @@ -1612,11 +1744,11 @@ impl EncryptedFs { new_parent, &DirectoryEntry { ino: attr.ino, - name: SecretString::new(new_name.expose_secret().to_owned()), + name: new_name.clone(), kind: attr.kind, }, - self.key.get().await?, - )?; + ) + .await?; let mut parent_attr = self.get_inode(parent).await?; parent_attr.mtime = SystemTime::now(); @@ -1637,8 +1769,8 @@ impl EncryptedFs { name: SecretString::from_str("$..").expect("cannot parse"), kind: FileType::Directory, }, - self.key.get().await?, - )?; + ) + .await?; } Ok(()) @@ -1660,10 +1792,10 @@ impl EncryptedFs { /// /// **`callback`** is called when the file content changes. It receives the position from where the file content changed and the last write position /// - /// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do\ + /// **`lock`** is used to write lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we do /// You need to provide the same lock to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same lock /// - /// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file\ + /// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file /// If the file exists or is created before flushing, in worse case scenarios, it can reduce the overall write speed by half, so it's recommended to provide it pub async fn create_file_writer( &self, @@ -1682,50 +1814,6 @@ impl EncryptedFs { )?) } - /// Create a crypto writer that writes to a file in chunks using internal encryption info. - /// - /// **`callback`** is called when the file content changes. It receives the position from where the file content changed and the last write position - /// - /// **`locks`** is used to write lock the chunks files when accessing them. This ensures that we have exclusive write to a given chunk when we need to change it's content\ - /// If not provided, it will not ensure that other instances are not accessing the chunks while we do\ - /// You need to provide the same locks to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same locks - /// - /// **`metadata_provider`** it's used to do some optimizations to reduce some copy operations from original file\ - /// If the file exists or is created before flushing, in worse case scenarios, it can reduce the overall write speed by half, so it's recommended to provide it - pub async fn create_chunked_tmp_file_writer( - &self, - file_dir: &Path, - callback: Option>, - locks: Option>>, - metadata_provider: Option>, - ) -> FsResult>> { - Ok(crypto::create_chunked_tmp_file_writer( - file_dir, - self.cipher, - self.key.get().await?, - callback, - locks, - metadata_provider, - )?) - } - - /// Create a crypto reader that reads from a file in chunks using internal encryption info.\ - /// **`locks`** is used to read lock the chunks files when accessing them. This ensures offer multiple reads but exclusive writes to a given chunk\ - /// If not provided, it will not ensure that other instances are not writing the chunks while we read them\ - /// You need to provide the same locks to all writers and readers of this file, you should obtain a new [`Holder`] that wraps the same locks - pub async fn create_chunked_file_reader( - &self, - file_dir: &Path, - locks: Option>>, - ) -> FsResult> { - Ok(crypto::create_chunked_file_reader( - file_dir, - self.cipher, - self.key.get().await?, - locks, - )?) - } - /// Create a crypto reader from file using internal encryption info. /// **`lock`** is used to read lock the file when accessing it. If not provided, it will not ensure that other instances are not writing to the file while we read\ /// You need to provide the same lock to any writers to this file, you should obtain a new [`Holder`] that wraps the same lock, @@ -1939,7 +2027,7 @@ impl EncryptedFs { attr.gid = libc::getgid(); } - self.write_inode_to_storage(&attr, self.key.get().await?)?; + self.write_inode_to_storage(&attr).await?; // create in contents directory tokio::fs::create_dir(self.contents_path(attr.ino)).await?; @@ -1954,46 +2042,84 @@ impl EncryptedFs { name: SecretString::from_str("$.").unwrap(), kind: FileType::Directory, }, - self.key.get().await?, - )?; + ) + .await?; } Ok(()) } - fn insert_directory_entry( + async fn insert_directory_entry( &self, ino_contents_dir: u64, entry: &DirectoryEntry, - key: Arc>, ) -> FsResult<()> { let parent_path = self.contents_path(ino_contents_dir); + let encrypted_name = + crypto::encrypt_file_name(&entry.name, self.cipher, self.key.get().await?)?; // add to LS directory - { - let name = crypto::encrypt_file_name(&entry.name, self.cipher, key.clone())?; - let file_path = parent_path.join(LS_DIR).join(name.clone()); - let map = self.serialize_dir_entries_locks.write(); + let self_clone = self + .self_weak + .lock() + .unwrap() + .as_ref() + .unwrap() + .upgrade() + .unwrap(); + let parent_path_clone = parent_path.clone(); + let encrypted_name_clone = encrypted_name.clone(); + let entry_clone = entry.clone(); + // spawn a task to do concurrently with adding to HASH directory + let h = tokio::spawn(async move { + let file_path = parent_path_clone + .join(LS_DIR) + .join(encrypted_name_clone.clone()); + let map = self_clone.serialize_dir_entries_ls_locks.write(); let lock = map.get_or_insert_with(file_path.to_str().unwrap().to_string(), || { parking_lot::RwLock::new(false) }); let _guard = lock.write(); // write inode and file type - let entry = (entry.ino, entry.kind); - crypto::atomic_serialize_encrypt_into(&file_path, &entry, self.cipher, key.clone())?; - } + let entry = (entry_clone.ino, entry_clone.kind); + crypto::atomic_serialize_encrypt_into( + &file_path, + &entry, + self_clone.cipher, + self_clone.key.get().await?, + )?; + Ok::<(), FsError>(()) + }); // add to HASH directory - { - let name = crypto::hash_file_name(&entry.name)?; + let self_clone = self + .self_weak + .lock() + .unwrap() + .as_ref() + .unwrap() + .upgrade() + .unwrap(); + let entry_hash = entry.clone(); + tokio::spawn(async move { + let name = crypto::hash_file_name(&entry_hash.name)?; let file_path = parent_path.join(HASH_DIR).join(name); - let map = self.serialize_dir_entries_locks.write(); + let map = self_clone.serialize_dir_entries_hash_locks.write(); let lock = map.get_or_insert_with(file_path.to_str().unwrap().to_string(), || { parking_lot::RwLock::new(false) }); let _guard = lock.write(); // write inode and file type - let entry = (entry.ino, entry.kind); - crypto::atomic_serialize_encrypt_into(&file_path, &entry, self.cipher, key)?; - } + // we save the encrypted name also because we need it to remove the entry on [`remove_directory_entry`] + let entry = (entry_hash.ino, entry_hash.kind, encrypted_name); + crypto::atomic_serialize_encrypt_into( + &file_path, + &entry, + self_clone.cipher, + self_clone.key.get().await?, + )?; + Ok::<(), FsError>(()) + }) + .await??; + h.await??; Ok(()) } @@ -2007,16 +2133,31 @@ impl EncryptedFs { async fn remove_directory_entry(&self, parent: u64, name: &SecretString) -> FsResult<()> { let parent_path = self.contents_path(parent); - let name = crypto::encrypt_file_name(name, self.cipher, self.key.get().await?)?; - let ls_file_path = parent_path.join(LS_DIR).join(name); - - let map = self.serialize_dir_entries_locks.write(); - let lock = map.get_or_insert_with(ls_file_path.to_str().unwrap().to_string(), || { + // remove from HASH + let name = crypto::hash_file_name(name)?; + let path = parent_path.join(HASH_DIR).join(name); + let map = self.serialize_dir_entries_hash_locks.write(); + let lock = map.get_or_insert_with(path.to_str().unwrap().to_string(), || { + parking_lot::RwLock::new(false) + }); + let guard = lock.write(); + let (_, _, name): (u64, FileType, String) = + bincode::deserialize_from(crypto::create_reader( + File::open(path.clone())?, + self.cipher, + self.key.get().await?, + ))?; + fs::remove_file(path)?; + drop(guard); + drop(map); + // remove from LS + let path = parent_path.join(LS_DIR).join(name); + let map = self.serialize_dir_entries_ls_locks.write(); + let lock = map.get_or_insert_with(path.to_str().unwrap().to_string(), || { parking_lot::RwLock::new(false) }); let _guard = lock.write(); - - fs::remove_file(ls_file_path)?; + fs::remove_file(path)?; Ok(()) } diff --git a/src/encryptedfs/test.rs b/src/encryptedfs/test.rs index 0c2deabb..b7a39485 100644 --- a/src/encryptedfs/test.rs +++ b/src/encryptedfs/test.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::ops::DerefMut; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -8,13 +9,18 @@ use secrecy::{ExposeSecret, SecretString}; use tokio::sync::Mutex; use tracing_test::traced_test; +use crate::{async_util, test_util}; +use rand::Rng; +use test::{black_box, Bencher}; + use crate::encryptedfs::write_all_bytes_to_fs; use crate::encryptedfs::{ Cipher, CreateFileAttr, DirectoryEntry, DirectoryEntryPlus, EncryptedFs, FileType, FsError, FsResult, PasswordProvider, CONTENTS_DIR, ROOT_INODE, }; +use crate::test_util::block_on; -const TESTS_DATA_DIR: &str = "/tmp/rencfs-test-data/"; +pub(crate) const TESTS_DATA_DIR: &str = "/tmp/rencfs-test-data"; #[derive(Debug, Clone)] struct TestSetup { @@ -81,6 +87,21 @@ where // assert!(res.is_ok()); } +pub(crate) fn test(key: &str, worker_threads: usize, f: F) { + block_on( + async { + run_test( + TestSetup { + data_path: format!("{TESTS_DATA_DIR}/{key}"), + }, + f, + ) + .await; + }, + worker_threads, + ); +} + thread_local!(static SETUP_RESULT: Arc>> = Arc::new(Mutex::new(None))); fn create_attr_from_type(kind: FileType) -> CreateFileAttr { @@ -154,7 +175,7 @@ async fn read_exact(fs: &EncryptedFs, ino: u64, offset: u64, buf: &mut [u8], han async fn test_write() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_write"), + data_path: format!("{TESTS_DATA_DIR}/test_write"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -325,7 +346,7 @@ async fn test_write() { async fn test_read() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_read"), + data_path: format!("{TESTS_DATA_DIR}/test_read"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -505,7 +526,7 @@ async fn test_read() { async fn test_truncate() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_truncate"), + data_path: format!("{TESTS_DATA_DIR}/test_truncate"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -599,7 +620,7 @@ async fn test_truncate() { async fn test_copy_file_range() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_copy_file_range"), + data_path: format!("{TESTS_DATA_DIR}/test_copy_file_range"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -686,7 +707,7 @@ async fn test_copy_file_range() { async fn test_read_dir() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_read_dir"), + data_path: format!("{TESTS_DATA_DIR}/test_read_dir"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -762,12 +783,12 @@ async fn test_read_dir() { }, DirectoryEntry { ino: file_attr.ino, - name: SecretString::new(test_file.expose_secret().to_owned()), + name: test_file.clone(), kind: FileType::RegularFile, }, DirectoryEntry { ino: dir_attr.ino, - name: SecretString::new(test_dir.expose_secret().to_owned()), + name: test_dir.clone(), kind: FileType::Directory, }, ]; @@ -843,12 +864,12 @@ async fn test_read_dir() { }, DirectoryEntry { ino: file_attr.ino, - name: SecretString::new(test_file_2.expose_secret().to_owned()), + name: test_file_2.clone(), kind: FileType::RegularFile, }, DirectoryEntry { ino: dir_attr.ino, - name: SecretString::new(test_dir_2.expose_secret().to_owned()), + name: test_dir_2.clone(), kind: FileType::Directory, }, ]; @@ -865,7 +886,7 @@ async fn test_read_dir() { async fn test_read_dir_plus() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_read_dir_plus"), + data_path: format!("{TESTS_DATA_DIR}/test_read_dir_plus"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -947,13 +968,13 @@ async fn test_read_dir_plus() { }, DirectoryEntryPlus { ino: file_attr.ino, - name: SecretString::new(test_file.expose_secret().to_owned()), + name: test_file.clone(), kind: FileType::RegularFile, attr: file_attr, }, DirectoryEntryPlus { ino: dir_attr.ino, - name: SecretString::new(test_dir.expose_secret().to_owned()), + name: test_dir.clone(), kind: FileType::Directory, attr: dir_attr, }, @@ -1039,13 +1060,13 @@ async fn test_read_dir_plus() { }, DirectoryEntryPlus { ino: file_attr.ino, - name: SecretString::new(test_file_2.expose_secret().to_owned()), + name: test_file_2.clone(), kind: FileType::RegularFile, attr: file_attr, }, DirectoryEntryPlus { ino: dir_attr.ino, - name: SecretString::new(test_dir_2.expose_secret().to_owned()), + name: test_dir_2.clone(), kind: FileType::Directory, attr: dir_attr, }, @@ -1063,7 +1084,7 @@ async fn test_read_dir_plus() { async fn test_find_by_name() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_find_by_name"), + data_path: format!("{TESTS_DATA_DIR}/test_find_by_name"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -1099,10 +1120,10 @@ async fn test_find_by_name() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] -async fn test_read_exists_by_name() { +async fn test_exists_by_name() { run_test( TestSetup { - data_path: format!("{TESTS_DATA_DIR}test_read_exists_by_name"), + data_path: format!("{TESTS_DATA_DIR}/test_exists_by_name"), }, async { let fs = SETUP_RESULT.with(|s| Arc::clone(s)); @@ -1132,3 +1153,299 @@ async fn test_read_exists_by_name() { ) .await } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[traced_test] +async fn test_remove_dir() { + run_test( + TestSetup { + data_path: format!("{TESTS_DATA_DIR}/test_remove_dir"), + }, + async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + let test_dir = SecretString::from_str("test-dir").unwrap(); + let _ = fs + .create_nod( + ROOT_INODE, + &test_dir, + create_attr_from_type(FileType::Directory), + false, + false, + ) + .await + .unwrap(); + + assert!(fs.exists_by_name(ROOT_INODE, &test_dir).await.unwrap()); + fs.remove_dir(ROOT_INODE, &test_dir).await.unwrap(); + assert_eq!( + false, + fs.exists_by_name(ROOT_INODE, &test_dir).await.unwrap() + ); + assert_eq!(None, fs.find_by_name(ROOT_INODE, &test_dir).await.unwrap()); + assert_eq!( + 0, + fs.read_dir(ROOT_INODE, 0) + .await + .unwrap() + .filter(|entry| { + entry.as_ref().unwrap().name.expose_secret() == test_dir.expose_secret() + }) + .count() + ) + }, + ) + .await +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[traced_test] +async fn test_remove_file() { + run_test( + TestSetup { + data_path: format!("{TESTS_DATA_DIR}/test_remove_file"), + }, + async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + let test_file = SecretString::from_str("test-file").unwrap(); + let _ = fs + .create_nod( + ROOT_INODE, + &test_file, + create_attr_from_type(FileType::RegularFile), + false, + false, + ) + .await + .unwrap(); + + assert!(fs.exists_by_name(ROOT_INODE, &test_file).await.unwrap()); + fs.remove_file(ROOT_INODE, &test_file).await.unwrap(); + assert_eq!( + false, + fs.exists_by_name(ROOT_INODE, &test_file).await.unwrap() + ); + assert_eq!(None, fs.find_by_name(ROOT_INODE, &test_file).await.unwrap()); + assert_eq!( + 0, + fs.read_dir(ROOT_INODE, 0) + .await + .unwrap() + .filter(|entry| { + entry.as_ref().unwrap().name.expose_secret() == test_file.expose_secret() + }) + .count() + ) + }, + ) + .await +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[traced_test] +async fn test_find_by_name_exists_by_name_many_files() { + run_test( + TestSetup { + data_path: format!("{TESTS_DATA_DIR}/test_find_by_name_exists_by_name_many_files"), + }, + async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + for i in 0..100 { + let test_file = SecretString::from_str(&format!("test-file-{i}")).unwrap(); + let _ = fs + .create_nod( + ROOT_INODE, + &test_file, + create_attr_from_type(FileType::RegularFile), + false, + false, + ) + .await + .unwrap(); + } + + let test_file = SecretString::from_str("test-file-42").unwrap(); + assert!(fs.exists_by_name(ROOT_INODE, &test_file).await.unwrap()); + assert!(matches!( + fs.find_by_name(ROOT_INODE, &test_file).await.unwrap(), + Some(_) + )); + }, + ) + .await +} + +#[bench] +fn bench_create_nod(b: &mut Bencher) { + test("bench_create_nod", 1, async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + let mut i = 1; + let i = &mut i; + b.iter(|| { + black_box({ + async_util::call_async(async { + let test_file = SecretString::from_str(&format!("test-file-{i}")).unwrap(); + let _ = fs + .create_nod( + ROOT_INODE, + &test_file, + create_attr_from_type(FileType::RegularFile), + false, + false, + ) + .await + .unwrap(); + }); + *i += 1; + i.clone() + }) + }); + println!("i: {}", i); + }); +} + +#[bench] +fn bench_exists_by_name(b: &mut Bencher) { + test("exists_by_name", 1, async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + let mut rnd = rand::thread_rng(); + b.iter(|| { + black_box({ + async_util::call_async(async { + let _ = fs + .exists_by_name( + ROOT_INODE, + &SecretString::from_str(&format!( + "test-file-{}", + rnd.gen_range(1..100) + )) + .unwrap(), + ) + .await + .unwrap(); + }); + }) + }); + }); +} + +#[bench] +fn bench_find_by_name(b: &mut Bencher) { + test("bench_find_by_name", 1, async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + for i in 0..100 { + let test_file = SecretString::from_str(&format!("test-file-{i}")).unwrap(); + let _ = fs + .create_nod( + ROOT_INODE, + &test_file, + create_attr_from_type(FileType::RegularFile), + false, + false, + ) + .await + .unwrap(); + } + + let mut rnd = rand::thread_rng(); + b.iter(|| { + black_box({ + async_util::call_async(async { + let _ = fs.get_inode(ROOT_INODE).await.unwrap(); + let _ = fs + .find_by_name( + ROOT_INODE, + &SecretString::from_str(&format!( + "test-file-{}", + rnd.gen_range(1..100) + )) + .unwrap(), + ) + .await + .unwrap(); + }); + }) + }); + }); +} + +#[bench] +fn bench_read_dir(b: &mut Bencher) { + test("bench_read_dir", 1, async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + for i in 0..100 { + let test_file = SecretString::from_str(&format!("test-file-{i}")).unwrap(); + let _ = fs + .create_nod( + ROOT_INODE, + &test_file, + create_attr_from_type(FileType::RegularFile), + false, + false, + ) + .await + .unwrap(); + } + + b.iter(|| { + black_box({ + async_util::call_async(async { + let iter = fs.read_dir(ROOT_INODE, 0).await.unwrap(); + let v: Vec = iter.map(|e| e.unwrap()).collect(); + }); + }) + }); + }); +} + +#[bench] +fn bench_read_dir_plus(b: &mut Bencher) { + test("bench_read_dir_plus", 1, async { + let fs = SETUP_RESULT.with(|s| Arc::clone(s)); + let mut fs = fs.lock().await; + let fs = fs.as_mut().unwrap().fs.as_ref().unwrap(); + + for i in 0..100 { + let test_file = SecretString::from_str(&format!("test-file-{i}")).unwrap(); + let _ = fs + .create_nod( + ROOT_INODE, + &test_file, + create_attr_from_type(FileType::RegularFile), + false, + false, + ) + .await + .unwrap(); + } + + b.iter(|| { + black_box({ + async_util::call_async(async { + let iter = fs.read_dir_plus(ROOT_INODE, 0).await.unwrap(); + let v: Vec = iter.map(|e| e.unwrap()).collect(); + }); + }) + }); + }); +} diff --git a/src/encryptedfs/to_move_test.rs b/src/encryptedfs/to_move_test.rs index aac0ff66..88ed1877 100644 --- a/src/encryptedfs/to_move_test.rs +++ b/src/encryptedfs/to_move_test.rs @@ -77,7 +77,7 @@ // // #[test] // fn test_write_and_get_inode() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_write_and_get_inode") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_write_and_get_inode") }, |setup| { // let fs = setup.fs.as_mut().unwrap(); // // let attr = create_attr(42, FileType::RegularFile); @@ -113,7 +113,7 @@ // // #[test] // fn test_create_structure_and_root() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_create_structure_and_root") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_create_structure_and_root") }, |setup| { // let fs = setup.fs.as_mut().unwrap(); // // assert!(fs.node_exists(ROOT_INODE)); @@ -130,7 +130,7 @@ // // #[test] // fn test_create_nod() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_create_nod") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_create_nod") }, |setup| { // let mut fs = setup.fs.as_mut().unwrap(); // // // file in root @@ -201,7 +201,7 @@ // // #[test] // fn test_find_by_name() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_find_by_name") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_find_by_name") }, |setup| { // let fs = setup.fs.as_mut().unwrap(); // // let test_file = SecretString::from_str("test-file").unwrap(); @@ -213,7 +213,7 @@ // // #[test] // fn test_remove_dir() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_remove_dir") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_remove_dir") }, |setup| { // let fs = setup.fs.as_mut().unwrap(); // // let test_dir = SecretString::from_str("test-dir").unwrap(); @@ -237,7 +237,7 @@ // // #[test] // fn test_remove_file() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_remove_file") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_remove_file") }, |setup| { // let fs = setup.fs.as_mut().unwrap(); // // let test_file = SecretString::from_str("test-file").unwrap(); @@ -258,7 +258,7 @@ // // #[test] // fn test_rename() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_rename") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_rename") }, |setup| { // let fs = setup.fs.as_mut().unwrap(); // // // new file in same directory @@ -499,7 +499,7 @@ // // #[test] // fn test_open() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_open") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_open") }, |setup| { // let fs = setup.fs.as_mut().unwrap(); // // let test_file = SecretString::from_str("test-file").unwrap(); @@ -520,7 +520,7 @@ // #[allow(dead_code)] // // #[test] // fn test_sample() { -// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}test_sample") }, |setup| { +// run_test(TestSetup { data_path: format!("{TESTS_DATA_DIR}/test_sample") }, |setup| { // let _fs = setup.fs.as_mut().unwrap(); // }); // } diff --git a/src/encryptedfs_fuse3.rs b/src/encryptedfs_fuse3.rs index 85b3d658..e68712a5 100644 --- a/src/encryptedfs_fuse3.rs +++ b/src/encryptedfs_fuse3.rs @@ -1,3 +1,4 @@ +use rand::RngCore; use std::ffi::{OsStr, OsString}; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -20,9 +21,10 @@ use futures_util::stream; use futures_util::stream::Iter; use libc::{EACCES, EEXIST, EFBIG, EIO, EISDIR, ENAMETOOLONG, ENOENT, ENOTDIR, ENOTEMPTY, EPERM}; use secrecy::{ExposeSecret, SecretString}; -use tracing::Level; use tracing::{debug, error, instrument, trace, warn}; +use tracing::{info, Level}; +use crate::crypto; use crate::crypto::Cipher; use crate::encryptedfs::{ CreateFileAttr, EncryptedFs, FileAttr, FileType, FsError, FsResult, PasswordProvider, @@ -168,7 +170,7 @@ impl EncryptedFsFuse3 { } } - #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn create_nod( &self, parent: u64, @@ -273,7 +275,7 @@ impl From for fuse3::raw::prelude::FileAttr { } impl Filesystem for EncryptedFsFuse3 { - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn init(&self, req: Request) -> Result { trace!(""); @@ -287,7 +289,7 @@ impl Filesystem for EncryptedFsFuse3 { trace!(""); } - #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn lookup(&self, req: Request, parent: u64, name: &OsStr) -> Result { trace!(""); @@ -345,7 +347,7 @@ impl Filesystem for EncryptedFsFuse3 { trace!(""); } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn getattr( &self, req: Request, @@ -367,7 +369,7 @@ impl Filesystem for EncryptedFsFuse3 { } } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] #[allow(clippy::cast_possible_truncation)] async fn setattr( &self, @@ -537,7 +539,7 @@ impl Filesystem for EncryptedFsFuse3 { }) } - #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn mknod( &self, req: Request, @@ -560,26 +562,22 @@ impl Filesystem for EncryptedFsFuse3 { return Err(libc::ENOSYS.into()); } - match self - .create_nod(parent, mode, &req, name, false, false) + self.create_nod(parent, mode, &req, name, false, false) .await - { - Ok((_, attr)) => { - // TODO: implement flags + .map_err(|err| { + error!(err = %err); + Errno::from(err) + }) + .map(|(_, attr)| { Ok(ReplyEntry { ttl: TTL, attr: attr.into(), generation: 0, }) - } - Err(err) => { - error!(err = %err); - Err(err.into()) - } - } + })? } - #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn mkdir( &self, req: Request, @@ -646,7 +644,7 @@ impl Filesystem for EncryptedFsFuse3 { }) } - #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn unlink(&self, req: Request, parent: Inode, name: &OsStr) -> Result<()> { trace!(""); @@ -711,7 +709,7 @@ impl Filesystem for EncryptedFsFuse3 { Ok(()) } - #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn rmdir(&self, req: Request, parent: Inode, name: &OsStr) -> Result<()> { trace!(""); @@ -776,7 +774,7 @@ impl Filesystem for EncryptedFsFuse3 { Ok(()) } - #[instrument(skip(self, name, new_name), fields(name = name.to_str().unwrap(), new_name = new_name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name, new_name), fields(name = name.to_str().unwrap(), new_name = new_name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn rename( &self, req: Request, @@ -887,7 +885,7 @@ impl Filesystem for EncryptedFsFuse3 { } } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn open(&self, req: Request, inode: Inode, flags: u32) -> Result { trace!(""); @@ -1002,14 +1000,14 @@ impl Filesystem for EncryptedFsFuse3 { }) } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn statfs(&self, req: Request, inode: u64) -> Result { trace!(""); warn!("implementation is a stub"); Ok(STATFS) } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn release( &self, req: Request, @@ -1056,7 +1054,7 @@ impl Filesystem for EncryptedFsFuse3 { Ok(()) } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn flush(&self, req: Request, inode: Inode, fh: u64, lock_owner: u64) -> Result<()> { trace!(""); @@ -1068,7 +1066,7 @@ impl Filesystem for EncryptedFsFuse3 { Ok(()) } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] #[allow(clippy::cast_possible_wrap)] async fn opendir(&self, req: Request, inode: Inode, flags: u32) -> Result { trace!(""); @@ -1118,7 +1116,7 @@ impl Filesystem for EncryptedFsFuse3 { fh: u64, offset: i64, ) -> Result>> { - trace!(""); + info!(""); let iter = match self.get_fs().read_dir(inode, offset as usize).await { Err(err) => { @@ -1136,14 +1134,14 @@ impl Filesystem for EncryptedFsFuse3 { }) } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn releasedir(&self, req: Request, inode: Inode, fh: u64, flags: u32) -> Result<()> { trace!(""); Ok(()) } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn access(&self, req: Request, inode: u64, mask: u32) -> Result<()> { trace!(""); @@ -1160,7 +1158,7 @@ impl Filesystem for EncryptedFsFuse3 { ) } - #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self, name), fields(name = name.to_str().unwrap()), err(level = Level::INFO), ret(level = Level::INFO))] async fn create( &self, req: Request, @@ -1209,7 +1207,7 @@ impl Filesystem for EncryptedFsFuse3 { offset: u64, lock_owner: u64, ) -> Result>> { - trace!(""); + info!(""); let iter = match self.get_fs().read_dir_plus(parent, offset as usize).await { Err(err) => { @@ -1226,7 +1224,7 @@ impl Filesystem for EncryptedFsFuse3 { }) } - #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::DEBUG))] + #[instrument(skip(self), err(level = Level::INFO), ret(level = Level::INFO))] async fn copy_file_range( &self, req: Request, diff --git a/src/expire_value.rs b/src/expire_value.rs index 8dc5b0b4..12b638cd 100644 --- a/src/expire_value.rs +++ b/src/expire_value.rs @@ -24,7 +24,7 @@ pub struct ExpireValue< monitor: Option>, provider: P, duration: Duration, - _marker: std::marker::PhantomData, + _marker: PhantomData, } impl> diff --git a/src/lib.rs b/src/lib.rs index e88c2ba8..6b8cc1ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(test)] // #![feature(error_generic_member_access)] #![deny(clippy::all)] #![deny(clippy::pedantic)] @@ -225,6 +226,8 @@ use crate::crypto::Cipher; use crate::encryptedfs::PasswordProvider; use crate::encryptedfs_fuse3::EncryptedFsFuse3; +extern crate test; + pub mod arc_hashmap; pub mod async_util; pub mod crypto; @@ -233,6 +236,7 @@ pub mod encryptedfs_fuse3; pub mod expire_value; pub mod fs_util; pub mod stream_util; +pub(crate) mod test_util; #[allow(unreachable_code)] #[must_use] diff --git a/src/main.rs b/src/main.rs index d5af6a09..a1850a2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,19 +65,25 @@ async fn main() -> Result<()> { Ok(Ok(Err(err))) => { let err2 = err.downcast_ref::(); if let Some(ExitStatusError::Failure(code)) = err2 { + drop(guard); process::exit(*code); } error!("{err}"); if let Some(mount_point) = mount_point { - umount(mount_point)?; + let _ = umount(mount_point).map_err(|err| { + warn!("Cannot umount, maybe it was not mounted: {err}"); + err + }); } - drop(guard); Err(err) } Ok(Err(err)) => { error!("{err:#?}"); if let Some(mount_point) = mount_point { - umount(mount_point)?; + let _ = umount(mount_point).map_err(|err| { + warn!("Cannot umount, maybe it was not mounted: {err}"); + err + }); } drop(guard); panic!("{err:#?}"); @@ -85,7 +91,10 @@ async fn main() -> Result<()> { Err(err) => { error!("{err}"); if let Some(mount_point) = mount_point { - umount(mount_point)?; + let _ = umount(mount_point).map_err(|err| { + warn!("Cannot umount, maybe it was not mounted: {err}"); + err + }); } drop(guard); panic!("{err}"); @@ -329,7 +338,10 @@ async fn run_mount(matches: &ArgMatches) -> Result<()> { })?; if matches.get_flag("umount-on-start") { - umount(mountpoint.as_str())?; + let _ = umount(mountpoint.as_str()).map_err(|err| { + warn!("Cannot umount, maybe it was not mounted: {err}"); + err + }); } let auto_unmount = matches.get_flag("auto_unmount"); @@ -391,13 +403,41 @@ async fn run_mount(matches: &ArgMatches) -> Result<()> { } fn umount(mountpoint: &str) -> Result<()> { - let output = process::Command::new("umount").arg(mountpoint).output()?; - - if !output.status.success() { - warn!("Cannot umount, maybe it was not mounted"); + // try normal umount + if process::Command::new("umount") + .arg(mountpoint) + .output()? + .status + .success() + { + return Ok(()); + } + // force umount + if process::Command::new("umount") + .arg("-f") + .arg(mountpoint) + .output()? + .status + .success() + { + return Ok(()); + } + // lazy umount + if process::Command::new("umount") + .arg("-l") + .arg(mountpoint) + .output()? + .status + .success() + { + return Ok(()); + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("cannot umount {}", mountpoint), + ) + .into()); } - - Ok(()) } #[allow(clippy::missing_panics_doc)] diff --git a/src/test_util.rs b/src/test_util.rs new file mode 100644 index 00000000..6532e526 --- /dev/null +++ b/src/test_util.rs @@ -0,0 +1,10 @@ +use std::future::Future; + +pub fn block_on(future: F, worker_threads: usize) -> F::Output { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(worker_threads) + .enable_all() + .build() + .unwrap() + .block_on(future) +}