Skip to content

Commit

Permalink
Use flocking env in a few places
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Mar 3, 2024
1 parent 7f71359 commit cb8faec
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 34 deletions.
4 changes: 4 additions & 0 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ pub(crate) fn emulate_freebsd() -> bool {
}
}
}

pub(crate) fn flocking() -> bool {
emulate_freebsd()
}
39 changes: 28 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use std::io::{ErrorKind, Read, Seek, Write};
use std::num::TryFromIntError;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::process::abort;
use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
use std::time::Duration;
use std::{fs, io, str};

use anyhow::{bail, Context, Result};
use cfg_if::cfg_if;
use chrono::NaiveDateTime;
use env::flocking;
pub use error::*;
use exclusive_file::ExclusiveFile;
use file_id::{FileId, FileIdFancy};
Expand Down Expand Up @@ -346,8 +346,12 @@ impl<'handle> BatchWriter<'handle> {
let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap();
for mut ef in self.exclusive_files.drain(..) {
ef.committed().unwrap();
debug!("returning exclusive file {} to handle", ef.id);
assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none());
// When we're flocking, we can't have writers and readers at the same time and still be
// able to punch values asynchronously.
if flocking() {
debug!("returning exclusive file {} to handle", ef.id);
assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none());
}
}
}
}
Expand Down Expand Up @@ -536,9 +540,9 @@ where
.0;
let mut file_clone = self.file_clone().unwrap().lock().unwrap();
let file = &mut file_clone.file;
let file_offset = file_offset+pos;
let file_offset = file_offset + pos;
// Getting lazy: Using positioned-io's ReadAt because it works on Windows.
let res = file.read_at( file_offset, buf);
let res = file.read_at(file_offset, buf);
debug!(?file, ?file_offset, len=?buf, ?res, "snapshot value read_at");
res
}
Expand Down Expand Up @@ -628,7 +632,7 @@ where
}
}

#[derive(Ord, PartialOrd, Eq, PartialEq)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)]
struct ReadExtent {
pub offset: u64,
pub len: u64,
Expand Down Expand Up @@ -782,17 +786,30 @@ impl<'a> Reader<'a> {
Ok(file_clone)
}

fn lock_read_extents<'b>(
file: &File,
read_extents: impl Iterator<Item = &'b ReadExtent>,
) -> io::Result<()> {
// This might require a conditional var if it's used everywhere
#[cfg(not(windows))]
if flocking() {
// Possibly we want to block if we're flocking.
assert!(file.flock(LockShared)?);
}
for extent in read_extents {
assert!(file.lock_segment(LockSharedNonblock, Some(extent.len), extent.offset)?);
}
Ok(())
}

fn get_file_for_read_by_segment_locking(
&self,
file_id: &FileId,
read_extents: &BTreeSet<ReadExtent>,
) -> PubResult<Arc<Mutex<FileClone>>> {
let mut file = open_file_id(OpenOptions::new().read(true), self.handle.dir(), file_id)?;
for extent in read_extents {
if !file.lock_segment(LockSharedNonblock, Some(extent.len), extent.offset)? {
abort();
}
}

Self::lock_read_extents(&file, read_extents.iter())?;
let len = file.seek(std::io::SeekFrom::End(0))?;
let file_clone = FileClone {
file,
Expand Down
21 changes: 21 additions & 0 deletions src/sys/flock/flock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Implement file locking for systems that lack open file description segment locking but have flock.
pub use nix::fcntl::FlockArg;
pub use nix::fcntl::FlockArg::*;

use super::*;

pub trait Flock {
fn flock(&self, arg: FlockArg) -> io::Result<bool>;
}

impl Flock for File {
/// Locks a segment that spans the maximum possible range of offsets.
fn flock(&self, arg: FlockArg) -> io::Result<bool> {
match nix::fcntl::flock(self.as_raw_fd(), arg) {
Ok(()) => Ok(true),
Err(errno) if errno == nix::Error::EWOULDBLOCK => Ok(false),
Err(errno) => Err(std::io::Error::from_raw_os_error(errno as i32)),
}
}
}
20 changes: 20 additions & 0 deletions src/sys/flock/freebsd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::fs::File;
use std::io;

use nix::fcntl::FlockArg;

use crate::sys::{FileLocking, Flock};

impl FileLocking for File {
fn trim_exclusive_lock_left(&self, _old_left: u64, _new_left: u64) -> io::Result<bool> {
Ok(true)
}

fn lock_segment(&self, arg: FlockArg, _len: Option<u64>, _offset: u64) -> io::Result<bool> {
self.lock_max_segment(arg)
}

fn lock_max_segment(&self, arg: FlockArg) -> io::Result<bool> {
self.flock(arg)
}
}
16 changes: 12 additions & 4 deletions src/sys/flock/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use super::*;

cfg_if! {
if #[cfg(unix)] {
mod unix;
pub use self::unix::*;
} else if #[cfg(windows)] {
if #[cfg(windows)] {
mod windows;
pub use self::windows::*;
} else if #[cfg(target_os = "freebsd")] {
mod freebsd;
} else {
mod ofd;
}
}

cfg_if! {
if #[cfg(unix)] {
mod flock;
pub use self::flock::*;
}
}

Expand Down
25 changes: 8 additions & 17 deletions src/sys/flock/unix.rs → src/sys/flock/ofd.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::io::SeekFrom;

pub use nix::fcntl::FlockArg;
pub use nix::fcntl::FlockArg::*;

use super::*;

fn seek_from_offset(seek_from: SeekFrom) -> i64 {
Expand Down Expand Up @@ -60,19 +57,7 @@ pub(super) fn lock_file_segment(
let l_type = l_type.try_into().unwrap();
flock_arg.l_type = l_type;
flock_arg.l_whence = seek_from_whence(whence);
cfg_if! {
if #[cfg(target_os = "freebsd")] {
use libc::{F_SETLK as SetLock, F_SETLKW as SetLockWait};
} else {
use libc::*;
#[allow(non_snake_case)]
let (SetLock, SetLockWait) = if emulate_freebsd() {
(F_SETLK, F_SETLKW)
} else {
(F_OFD_SETLK, F_OFD_SETLKW)
};
}
}
use libc::{F_OFD_SETLK as SetLock, F_OFD_SETLKW as SetLockWait};
#[allow(deprecated)]
let arg = match arg {
LockShared | LockExclusive => SetLockWait,
Expand All @@ -97,11 +82,17 @@ pub(super) fn lock_file_segment(

impl FileLocking for File {
fn trim_exclusive_lock_left(&self, old_left: u64, new_left: u64) -> io::Result<bool> {
#[allow(deprecated)]
if flocking() {
return Ok(true);
}
// #[allow(deprecated)]
self.lock_segment(UnlockNonblock, Some(new_left - old_left), old_left)
}

fn lock_segment(&self, arg: FlockArg, len: Option<u64>, offset: u64) -> io::Result<bool> {
if flocking() {
return self.flock(arg);
}
Ok(lock_file_segment(
self,
arg,
Expand Down
4 changes: 2 additions & 2 deletions src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use flock::*;
pub(crate) use pathconf::*;
pub use punchfile::*;

use crate::env::emulate_freebsd;
use crate::env::flocking;

cfg_if! {
if #[cfg(windows)] {
Expand Down Expand Up @@ -88,7 +88,7 @@ impl FileSystemFlags for UnixFilesystemFlags {
// AFAIK there's no way to check if a filesystem supports block cloning on non-Windows
// platforms, and even then it depends on where you're copying to/from, sometimes even on
// the same filesystem.
if emulate_freebsd() {
if flocking() {
Some(false)
} else {
None
Expand Down

0 comments on commit cb8faec

Please sign in to comment.