Skip to content

Commit

Permalink
WithHandle for Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Jul 11, 2024
1 parent aef9471 commit 6888985
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/c_api/ext_fns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub extern "C" fn possum_value_writer_fd(value: *mut PossumValueWriter) -> RawFi

#[no_mangle]
pub extern "C" fn possum_writer_rename(
writer: *mut BatchWriter,
writer: *mut PossumWriter,
value: *const PossumValue,
new_key: PossumBuf,
) -> PossumError {
Expand Down
2 changes: 1 addition & 1 deletion src/c_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ fn items_list_to_c(
}

use PossumError::*;
use crate::handle::StartTransaction;

use crate::handle::StartTransaction;
use crate::item::Item;

impl From<Error> for PossumError {
Expand Down
4 changes: 2 additions & 2 deletions src/c_api/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{BatchWriter, Handle, ValueWriter};
pub(crate) type PossumOffset = u64;

pub type RawFileHandle = libc::intptr_t;
pub type PossumWriter = BatchWriter<'static>;
pub type PossumWriter = BatchWriter<&'static Handle>;

#[repr(C)]
#[derive(Debug)]
Expand Down Expand Up @@ -59,4 +59,4 @@ pub(crate) struct PossumLimits {

pub(crate) type PossumValueWriter = ValueWriter;

pub(crate) type PossumHandleRc = Rc<RwLock<Handle>>;
pub(crate) type PossumHandleRc = Rc<RwLock<Handle>>;
12 changes: 11 additions & 1 deletion src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Handle {
self.dir.block_size()
}

pub fn new_writer(&self) -> Result<BatchWriter> {
pub fn new_writer(&self) -> Result<BatchWriter<&Handle>> {
Ok(BatchWriter {
handle: self,
exclusive_files: Default::default(),
Expand Down Expand Up @@ -555,3 +555,13 @@ impl<'h, T> StartTransaction<'h, T> for Rc<RwLock<Handle>> {
})
}
}

pub trait WithHandle {
fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R;
}

impl WithHandle for &Handle {
fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R {
f(self)
}
}
89 changes: 56 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub mod env;
mod reader;
use reader::Reader;

use crate::handle::WithHandle;

/// Type to be exposed eventually from the lib instead of anyhow. Should be useful for the C API.
pub type PubResult<T> = Result<T, Error>;

Expand Down Expand Up @@ -112,11 +114,17 @@ fn init_manifest_schema(conn: &rusqlite::Connection) -> rusqlite::Result<()> {

/// The start of a write, before an exclusive file has been allocated. This allows for bringing your
/// own file, such as by rename, or file clone.
pub struct BeginWriteValue<'writer, 'handle> {
batch: &'writer mut BatchWriter<'handle>,
pub struct BeginWriteValue<'writer, H>
where
H: WithHandle,
{
batch: &'writer mut BatchWriter<H>,
}

impl BeginWriteValue<'_, '_> {
impl<H> BeginWriteValue<'_, H>
where
H: WithHandle,
{
// TODO: On Linux and Windows, this should be possible without creating a new file. I'm not sure
// if it's worth it however, since cloned blocks have to be a of a minimum size and alignment.
// See also
Expand All @@ -125,16 +133,18 @@ impl BeginWriteValue<'_, '_> {
/// Clone an entire file in. If cloning fails, this will fall back to copying the provided file.
/// Its file position may be altered.
pub fn clone_file(self, file: &mut File) -> PubResult<ValueWriter> {
if !self.batch.handle.dir_supports_file_cloning() {
if !self
.batch
.handle
.with_handle(Handle::dir_supports_file_cloning)
{
return self.copy_file(file);
}
let dst_path = loop {
let dst_path = self
.batch
.handle
.dir
.path()
.join(FileId::random().values_file_path());
.with_handle(|handle| handle.dir.path().join(FileId::random().values_file_path()));
match fclonefile_noflags(file, &dst_path) {
Err(err) if CloneFileError::is_unsupported(&err) => {
return self.copy_file(file);
Expand Down Expand Up @@ -225,8 +235,11 @@ struct ValueRename {

/// Manages uncommitted writes
#[derive(Debug)]
pub struct BatchWriter<'a> {
handle: &'a Handle,
pub struct BatchWriter<H>
where
H: WithHandle,
{
handle: H,
exclusive_files: Vec<ExclusiveFile>,
pending_writes: Vec<PendingWrite>,
value_renames: Vec<ValueRename>,
Expand Down Expand Up @@ -275,13 +288,16 @@ fn value_columns_sql() -> &'static str {
ONCE.get_or_init(|| VALUE_COLUMN_NAMES.join(", ")).as_str()
}

impl<'handle> BatchWriter<'handle> {
impl<H> BatchWriter<H>
where
H: WithHandle,
{
fn get_exclusive_file(&mut self) -> Result<ExclusiveFile> {
if let Some(ef) = self.exclusive_files.pop() {
debug!("reusing exclusive file from writer");
return Ok(ef);
}
self.handle.get_exclusive_file()
self.handle.with_handle(Handle::get_exclusive_file)
}

pub fn stage_write(&mut self, key: Vec<u8>, mut value: ValueWriter) -> anyhow::Result<()> {
Expand Down Expand Up @@ -311,7 +327,7 @@ impl<'handle> BatchWriter<'handle> {
Ok(())
}

pub fn new_value<'writer>(&'writer mut self) -> BeginWriteValue<'writer, 'handle> {
pub fn new_value(&mut self) -> BeginWriteValue<H> {
BeginWriteValue { batch: self }
}

Expand All @@ -332,22 +348,24 @@ impl<'handle> BatchWriter<'handle> {
assert!(ef.downgrade_lock()?);
}
}
let mut transaction: OwnedTx = self.handle.start_immediate_transaction()?;
let mut write_commit_res = WriteCommitResult { count: 0 };
for pw in self.pending_writes.drain(..) {
before_write();
transaction.delete_key(&pw.key)?;
transaction.insert_key(pw)?;
write_commit_res.count += 1;
}
for vr in self.value_renames.drain(..) {
transaction.rename_value(&vr.value, vr.new_key)?;
}
// TODO: On error here, rewind the exclusive to undo any writes that just occurred.
let work = transaction.commit().context("commit transaction")?;

let write_commit_res = self.handle.with_handle(|handle| {
let mut transaction: OwnedTx = handle.start_immediate_transaction()?;
let mut write_commit_res = WriteCommitResult { count: 0 };
for pw in self.pending_writes.drain(..) {
before_write();
transaction.delete_key(&pw.key)?;
transaction.insert_key(pw)?;
write_commit_res.count += 1;
}
for vr in self.value_renames.drain(..) {
transaction.rename_value(&vr.value, vr.new_key)?;
}
// TODO: On error here, rewind the exclusive to undo any writes that just occurred.
let work = transaction.commit().context("commit transaction")?;
work.complete();
anyhow::Ok(write_commit_res)
})?;
self.flush_exclusive_files();
work.complete();
Ok(write_commit_res)
}

Expand All @@ -365,15 +383,20 @@ impl<'handle> BatchWriter<'handle> {
if flocking() {
return;
}
let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap();
for ef in self.exclusive_files.drain(..) {
debug!("returning exclusive file {} to handle", ef.id);
assert!(handle_exclusive_files.insert(ef.id, ef).is_none());
}
self.handle.with_handle(|handle| {
let mut handle_exclusive_files = handle.exclusive_files.lock().unwrap();
for ef in self.exclusive_files.drain(..) {
debug!("returning exclusive file {} to handle", ef.id);
assert!(handle_exclusive_files.insert(ef.id, ef).is_none());
}
})
}
}

impl Drop for BatchWriter<'_> {
impl<H> Drop for BatchWriter<H>
where
H: WithHandle,
{
fn drop(&mut self) {
self.return_exclusive_files_to_handle()
}
Expand Down
6 changes: 3 additions & 3 deletions src/ownedtx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<'h> Deref for OwnedReadTx<'h> {

pub(crate) trait OwnedTxTrait {
type Tx;
fn end_tx<R>(self, take: impl FnOnce(Self::Tx)->R) -> R;
fn end_tx<R>(self, take: impl FnOnce(Self::Tx) -> R) -> R;
fn as_handle(&self) -> &Handle;
fn mut_transaction(&mut self) -> &mut Self::Tx;
fn transaction(&self) -> &Self::Tx;
Expand All @@ -84,7 +84,7 @@ pub(crate) trait OwnedTxTrait {
impl<'h> OwnedTxTrait for OwnedTx<'h> {
type Tx = Transaction<'h>;

fn end_tx<R>(self, take: impl FnOnce(Self::Tx)->R)->R {
fn end_tx<R>(self, take: impl FnOnce(Self::Tx) -> R) -> R {
todo!()
}

Expand All @@ -99,4 +99,4 @@ impl<'h> OwnedTxTrait for OwnedTx<'h> {
fn transaction(&self) -> &Self::Tx {
self.cell.deref()
}
}
}
16 changes: 12 additions & 4 deletions src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::ownedtx::OwnedTxTrait;
use super::*;
use crate::ownedtx::OwnedTxTrait;

// BTree possibly so we can merge extents in the future.
type Reads = HashMap<FileId, BTreeSet<ReadExtent>>;
Expand All @@ -9,7 +9,10 @@ pub struct Reader<T> {
pub(crate) reads: Reads,
}

impl<'a, T> Reader<T> where T: OwnedTxTrait<Tx=Transaction<'a>> {
impl<'a, T> Reader<T>
where
T: OwnedTxTrait<Tx = Transaction<'a>>,
{
pub fn add(&mut self, key: &[u8]) -> rusqlite::Result<Option<Value>> {
let res = self.owned_tx.mut_transaction().touch_for_read(key);
match res {
Expand All @@ -36,7 +39,8 @@ impl<'a, T> Reader<T> where T: OwnedTxTrait<Tx=Transaction<'a>> {
/// Takes a snapshot and commits the read transaction.
pub fn begin(self) -> Result<Snapshot> {
let file_clones = self.clone_files().context("cloning files")?;
self.owned_tx.end_tx(|tx|tx.commit())
self.owned_tx
.end_tx(|tx| tx.commit())
.context("committing transaction")?
.complete();
Ok(Snapshot { file_clones })
Expand Down Expand Up @@ -169,7 +173,11 @@ impl<'a, T> Reader<T> where T: OwnedTxTrait<Tx=Transaction<'a>> {
file_id: &FileId,
read_extents: &BTreeSet<ReadExtent>,
) -> PubResult<Arc<Mutex<FileClone>>> {
let mut file = open_file_id(OpenOptions::new().read(true), self.owned_tx.as_handle().dir(), file_id)?;
let mut file = open_file_id(
OpenOptions::new().read(true),
self.owned_tx.as_handle().dir(),
file_id,
)?;

Self::lock_read_extents(&file, read_extents.iter())?;
let len = file.seek(std::io::SeekFrom::End(0))?;
Expand Down

0 comments on commit 6888985

Please sign in to comment.