Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add buffer size, replication, blocksize as file open options (#135) #136

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions src/open_options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::ffi::CString;
use std::io::{Error, Result};
use std::ffi::{c_int, c_short, CString};
use std::io::{Error, ErrorKind, Result};

use hdfs_sys::*;
use log::debug;
Expand Down Expand Up @@ -54,6 +54,9 @@ pub struct OpenOptions {
truncate: bool,
create: bool,
create_new: bool,
buffer_size: usize,
replication: usize,
blocksize: usize,
}

/// HDFS's client handle is thread safe.
Expand All @@ -71,9 +74,42 @@ impl OpenOptions {
truncate: false,
create: false,
create_new: false,
buffer_size: 0,
replication: 0,
blocksize: 0,
}
}

/// Sets size of buffer for read/write.
///
/// Pass `0` if you want to use the default configured values.
///
/// `0` by default.
pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut Self {
self.buffer_size = buffer_size;
self
}

/// Sets block replication.
///
/// Pass `0` if you want to use the default configured values.
///
/// `0` by default.
pub fn with_replication(&mut self, replication: usize) -> &mut Self {
self.replication = replication;
self
}

/// Sets size of block.
///
/// Pass `0` if you want to use the default configured values.
///
/// `0` by default.
pub fn with_blocksize(&mut self, blocksize: usize) -> &mut Self {
self.blocksize = blocksize;
self
}

/// Sets the option for read access.
///
/// This option, when true, will indicate that the file should be
Expand Down Expand Up @@ -304,7 +340,8 @@ impl OpenOptions {
/// * [`AlreadyExists`]: `create_new` was specified and the file already
/// exists.
/// * [`InvalidInput`]: Invalid combinations of open options (truncate
/// without write access, no access mode set, etc.).
/// without write access, no access mode set, incompatible integer values,
/// etc.).
///
/// The following errors don't match any existing [`io::ErrorKind`] at the moment:
/// * One of the directory components of the specified file path
Expand Down Expand Up @@ -337,8 +374,32 @@ impl OpenOptions {
debug!("open file {} with flags {}", path, flags);
let b = unsafe {
let p = CString::new(path)?;
// TODO: we need to support buffer size, replication and block size.
hdfsOpenFile(self.fs, p.as_ptr(), flags, 0, 0, 0)
let buffer_size: c_int = self.buffer_size.try_into().map_err(|_| {
Error::new(
ErrorKind::InvalidInput,
format!("`buffer_size` {} exceeds valid `c_int`", self.buffer_size),
)
})?;
let replication: c_short = self.replication.try_into().map_err(|_| {
Error::new(
ErrorKind::InvalidInput,
format!("`replication` {} exceeds valid `c_short`", self.replication),
)
})?;
let blocksize: i32 = self.blocksize.try_into().map_err(|_| {
Error::new(
ErrorKind::InvalidInput,
format!("`blocksize` {} exceeds valid `i32`", self.blocksize),
)
})?;
hdfsOpenFile(
self.fs,
p.as_ptr(),
flags,
buffer_size,
replication,
blocksize,
)
};

if b.is_null() {
Expand Down
27 changes: 19 additions & 8 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ fn test_file() -> Result<()> {
Ok(())
}

#[cfg(feature = "futures-io")]
#[cfg(feature = "async_file")]
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
#[tokio::test]
async fn test_tokio_file() -> Result<()> {
use futures::io::*;
Expand All @@ -225,7 +225,12 @@ async fn test_tokio_file() -> Result<()> {
{
// Write file
debug!("test file write");
let mut f = fs.open_file().create(true).write(true).open(&path)?;
let mut f = fs
.open_file()
.create(true)
.write(true)
.async_open(&path)
.await?;
f.write_all(&content).await?;
// Flush file
debug!("test file flush");
Expand All @@ -235,7 +240,7 @@ async fn test_tokio_file() -> Result<()> {
{
// Read file
debug!("test file read");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let mut buf = Vec::new();
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, content.len());
Expand All @@ -254,7 +259,7 @@ async fn test_tokio_file() -> Result<()> {
{
// Seek file.
debug!("test file seek");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let offset = content.len() / 2;
let size = content.len() - offset;
let mut buf = Vec::new();
Expand Down Expand Up @@ -282,9 +287,10 @@ async fn test_tokio_file() -> Result<()> {
Ok(())
}

#[cfg(feature = "tokio-io")]
#[cfg(feature = "async_file")]
#[tokio::test]
async fn test_futures_file() -> Result<()> {
use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::io::*;

let _ = env_logger::try_init();
Expand All @@ -308,7 +314,12 @@ async fn test_futures_file() -> Result<()> {
{
// Write file
debug!("test file write");
let mut f = fs.open_file().create(true).write(true).open(&path)?;
let mut f = fs
.open_file()
.create(true)
.write(true)
.async_open(&path)
.await?;
f.write_all(&content).await?;
// Flush file
debug!("test file flush");
Expand All @@ -318,7 +329,7 @@ async fn test_futures_file() -> Result<()> {
{
// Read file
debug!("test file read");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let mut buf = Vec::new();
let n = f.read_to_end(&mut buf).await?;
assert_eq!(n, content.len());
Expand All @@ -337,7 +348,7 @@ async fn test_futures_file() -> Result<()> {
{
// Seek file.
debug!("test file seek");
let mut f = fs.open_file().read(true).open(&path)?;
let mut f = fs.open_file().read(true).async_open(&path).await?;
let offset = content.len() / 2;
let size = content.len() - offset;
let mut buf = Vec::new();
Expand Down