Skip to content

Commit

Permalink
feat: Add buffer size, replication, blocksize as file open options (#135
Browse files Browse the repository at this point in the history
) (#136)
  • Loading branch information
Banyc authored Nov 13, 2024
1 parent 98fe333 commit ce2ddc3
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 13 deletions.
71 changes: 66 additions & 5 deletions src/open_options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::ffi::CString;
use std::io::{Error, Result};
use std::ffi::{c_int, c_short, CString};
use std::io::{Error, ErrorKind, Result};

use hdfs_sys::*;
use log::debug;
Expand Down Expand Up @@ -54,6 +54,9 @@ pub struct OpenOptions {
truncate: bool,
create: bool,
create_new: bool,
buffer_size: usize,
replication: usize,
blocksize: usize,
}

/// HDFS's client handle is thread safe.
Expand All @@ -71,9 +74,42 @@ impl OpenOptions {
truncate: false,
create: false,
create_new: false,
buffer_size: 0,
replication: 0,
blocksize: 0,
}
}

/// Sets size of buffer for read/write.
///
/// Pass `0` if you want to use the default configured values.
///
/// `0` by default.
pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut Self {
self.buffer_size = buffer_size;
self
}

/// Sets block replication.
///
/// Pass `0` if you want to use the default configured values.
///
/// `0` by default.
pub fn with_replication(&mut self, replication: usize) -> &mut Self {
self.replication = replication;
self
}

/// Sets size of block.
///
/// Pass `0` if you want to use the default configured values.
///
/// `0` by default.
pub fn with_blocksize(&mut self, blocksize: usize) -> &mut Self {
self.blocksize = blocksize;
self
}

/// Sets the option for read access.
///
/// This option, when true, will indicate that the file should be
Expand Down Expand Up @@ -304,7 +340,8 @@ impl OpenOptions {
/// * [`AlreadyExists`]: `create_new` was specified and the file already
/// exists.
/// * [`InvalidInput`]: Invalid combinations of open options (truncate
/// without write access, no access mode set, etc.).
/// without write access, no access mode set, incompatible integer values,
/// etc.).
///
/// The following errors don't match any existing [`io::ErrorKind`] at the moment:
/// * One of the directory components of the specified file path
Expand Down Expand Up @@ -337,8 +374,32 @@ impl OpenOptions {
debug!("open file {} with flags {}", path, flags);
let b = unsafe {
let p = CString::new(path)?;
// TODO: we need to support buffer size, replication and block size.
hdfsOpenFile(self.fs, p.as_ptr(), flags, 0, 0, 0)
let buffer_size: c_int = self.buffer_size.try_into().map_err(|_| {
Error::new(
ErrorKind::InvalidInput,
format!("`buffer_size` {} exceeds valid `c_int`", self.buffer_size),
)
})?;
let replication: c_short = self.replication.try_into().map_err(|_| {
Error::new(
ErrorKind::InvalidInput,
format!("`replication` {} exceeds valid `c_short`", self.replication),
)
})?;
let blocksize: i32 = self.blocksize.try_into().map_err(|_| {
Error::new(
ErrorKind::InvalidInput,
format!("`blocksize` {} exceeds valid `i32`", self.blocksize),
)
})?;
hdfsOpenFile(
self.fs,
p.as_ptr(),
flags,
buffer_size,
replication,
blocksize,
)
};

if b.is_null() {
Expand Down
27 changes: 19 additions & 8 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ fn test_file() -> Result<()> {
Ok(())
}

#[cfg(feature = "futures-io")]
#[cfg(feature = "async_file")]
#[tokio::test]
async fn test_tokio_file() -> Result<()> {
use futures::io::*;
Expand All @@ -225,7 +225,12 @@ async fn test_tokio_file() -> Result<()> {
{
// Write file
debug!("test file write");
let mut f = fs.open_file().create(true).write(true).open(&path)?;
let mut f = fs
.open_file()
.create(true)
.write(true)
.async_open(&path)
.await?;
f.write_all(&content).await?;
// Flush file
debug!("test file flush");
Expand All @@ -235,7 +240,7 @@ async fn test_tokio_file() -> Result<()> {
{
// Read file
debug!("test file read");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let mut buf = Vec::new();
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, content.len());
Expand All @@ -254,7 +259,7 @@ async fn test_tokio_file() -> Result<()> {
{
// Seek file.
debug!("test file seek");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let offset = content.len() / 2;
let size = content.len() - offset;
let mut buf = Vec::new();
Expand Down Expand Up @@ -282,9 +287,10 @@ async fn test_tokio_file() -> Result<()> {
Ok(())
}

#[cfg(feature = "tokio-io")]
#[cfg(feature = "async_file")]
#[tokio::test]
async fn test_futures_file() -> Result<()> {
use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::io::*;

let _ = env_logger::try_init();
Expand All @@ -308,7 +314,12 @@ async fn test_futures_file() -> Result<()> {
{
// Write file
debug!("test file write");
let mut f = fs.open_file().create(true).write(true).open(&path)?;
let mut f = fs
.open_file()
.create(true)
.write(true)
.async_open(&path)
.await?;
f.write_all(&content).await?;
// Flush file
debug!("test file flush");
Expand All @@ -318,7 +329,7 @@ async fn test_futures_file() -> Result<()> {
{
// Read file
debug!("test file read");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let mut buf = Vec::new();
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, content.len());
Expand All @@ -337,7 +348,7 @@ async fn test_futures_file() -> Result<()> {
{
// Seek file.
debug!("test file seek");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let offset = content.len() / 2;
let size = content.len() - offset;
let mut buf = Vec::new();
Expand Down

0 comments on commit ce2ddc3

Please sign in to comment.