Skip to content

Commit

Permalink
fix(driver): return buffer back if op cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Nov 30, 2024
1 parent 2e0e0d5 commit 6dbbdda
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 19 deletions.
94 changes: 79 additions & 15 deletions compio-driver/src/buffer_pool/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,28 @@ use std::{
io,
mem::ManuallyDrop,
ops::{Deref, DerefMut},
rc::Rc,
};

use compio_buf::{IntoInner, IoBuf, SetBufInit, Slice};
use compio_buf::{IntoInner, IoBuf, IoBufMut, SetBufInit, Slice};

struct BufferPoolInner {
buffers: RefCell<VecDeque<Vec<u8>>>,
}

impl BufferPoolInner {
pub(crate) fn add_buffer(&self, mut buffer: Vec<u8>) {
buffer.clear();
self.buffers.borrow_mut().push_back(buffer)
}
}

/// Buffer pool
///
/// A buffer pool to allow user no need to specify a specific buffer to do the
/// IO operation
pub struct BufferPool {
buffers: RefCell<VecDeque<Vec<u8>>>,
inner: Rc<BufferPoolInner>,
}

impl Debug for BufferPool {
Expand All @@ -31,37 +43,89 @@ impl BufferPool {
.collect();

Self {
buffers: RefCell::new(buffers),
inner: Rc::new(BufferPoolInner {
buffers: RefCell::new(buffers),
}),
}
}

pub(crate) fn get_buffer(&self, len: usize) -> io::Result<Slice<Vec<u8>>> {
let buffer = self.buffers.borrow_mut().pop_front().ok_or_else(|| {
pub(crate) fn get_buffer(&self, len: usize) -> io::Result<OwnedBuffer> {
let buffer = self.inner.buffers.borrow_mut().pop_front().ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "buffer ring has no available buffer")
})?;
let len = if len == 0 {
buffer.capacity()
} else {
buffer.capacity().min(len)
};
Ok(buffer.slice(..len))
Ok(OwnedBuffer::new(buffer.slice(..len), self.inner.clone()))
}

pub(crate) fn add_buffer(&self, mut buffer: Vec<u8>) {
buffer.clear();
self.buffers.borrow_mut().push_back(buffer)
pub(crate) fn add_buffer(&self, buffer: Vec<u8>) {
self.inner.add_buffer(buffer);
}

/// Safety: `len` should be valid
pub(crate) unsafe fn create_proxy(
&self,
mut slice: Slice<Vec<u8>>,
len: usize,
) -> BorrowedBuffer {
pub(crate) unsafe fn create_proxy(&self, mut slice: OwnedBuffer, len: usize) -> BorrowedBuffer {
unsafe {
slice.set_buf_init(len);
}
BorrowedBuffer::new(slice, self)
BorrowedBuffer::new(slice.into_inner(), self)
}
}

pub(crate) struct OwnedBuffer {
buffer: ManuallyDrop<Slice<Vec<u8>>>,
pool: Rc<BufferPoolInner>,
}

impl OwnedBuffer {
fn new(buffer: Slice<Vec<u8>>, pool: Rc<BufferPoolInner>) -> Self {
Self {
buffer: ManuallyDrop::new(buffer),
pool,
}
}
}

unsafe impl IoBuf for OwnedBuffer {
fn as_buf_ptr(&self) -> *const u8 {
self.buffer.as_buf_ptr()
}

fn buf_len(&self) -> usize {
self.buffer.buf_len()
}

fn buf_capacity(&self) -> usize {
self.buffer.buf_capacity()
}
}

unsafe impl IoBufMut for OwnedBuffer {
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
self.buffer.as_buf_mut_ptr()
}
}

impl SetBufInit for OwnedBuffer {
unsafe fn set_buf_init(&mut self, len: usize) {
self.buffer.set_buf_init(len);
}
}

impl Drop for OwnedBuffer {
fn drop(&mut self) {
self.pool
.add_buffer(unsafe { ManuallyDrop::take(&mut self.buffer) }.into_inner());
}
}

impl IntoInner for OwnedBuffer {
type Inner = Slice<Vec<u8>>;

fn into_inner(mut self) -> Self::Inner {
unsafe { ManuallyDrop::take(&mut self.buffer) }
}
}

Expand Down
8 changes: 4 additions & 4 deletions compio-driver/src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,14 @@ impl<S> Connect<S> {
pub(crate) mod managed {
use std::io;

use compio_buf::{IntoInner, Slice};
use compio_buf::IntoInner;

use super::{ReadAt, Recv};
use crate::{BorrowedBuffer, BufferPool, SharedFd, TakeBuffer};
use crate::{BorrowedBuffer, BufferPool, OwnedBuffer, SharedFd, TakeBuffer};

/// Read a file at specified position into managed buffer.
pub struct ReadManagedAt<S> {
pub(crate) op: ReadAt<Slice<Vec<u8>>, S>,
pub(crate) op: ReadAt<OwnedBuffer, S>,
}

impl<S> ReadManagedAt<S> {
Expand Down Expand Up @@ -322,7 +322,7 @@ pub(crate) mod managed {

/// Receive data from remote into managed buffer.
pub struct RecvManaged<S> {
pub(crate) op: Recv<Slice<Vec<u8>>, S>,
pub(crate) op: Recv<OwnedBuffer, S>,
}

impl<S> RecvManaged<S> {
Expand Down

0 comments on commit 6dbbdda

Please sign in to comment.