Skip to content

Commit

Permalink
[Native] Augment libhdfs error with java stacktrace (#536)
Browse files Browse the repository at this point in the history
* print libhdfs error

Signed-off-by: chenxu <[email protected]>

* fix clippy

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Sep 3, 2024
1 parent 0d2b823 commit d5a2e60
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 11 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/lakesoul-io-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ pub extern "C" fn create_lakesoul_reader_from_config(
let runtime: Runtime = from_opaque(runtime);
let result = match LakeSoulReader::new(config) {
Ok(reader) => CResult::<Reader>::new(SyncSendableMutableLakeSoulReader::new(reader, runtime)),
Err(e) => CResult::<Reader>::error(format!("{}", e).as_str()),
Err(e) => CResult::<Reader>::error(format!("{:?}", e).as_str()),
};
convert_to_nonnull(result)
}
Expand Down
3 changes: 2 additions & 1 deletion rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ smallvec = "1.10"
dary_heap = "0.3"
bytes = { workspace = true }
hdrs = { git = "https://github.com/lakesoul-io/hdrs.git", branch = "main", features = ["async_file"], optional = true }
hdfs-sys = { version = "0.3", default-features = false, features = ["hdfs_3_3"], optional = true }
lazy_static = "1.4.0"
chrono = "0.4"
serde_json = { workspace = true }
Expand All @@ -46,7 +47,7 @@ env_logger = "0.11"
hex = "0.4"

[features]
hdfs = ["dep:hdrs"]
hdfs = ["dep:hdrs", "dep:hdfs-sys"]
default = []

[target.'cfg(target_os = "windows")'.dependencies]
Expand Down
28 changes: 19 additions & 9 deletions rust/lakesoul-io/src/hdfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ use bytes::Bytes;
use datafusion::error::Result;
use datafusion_common::DataFusionError;
use futures::stream::BoxStream;
// use futures::TryStreamExt;
use hdfs_sys::{hdfsGetLastExceptionRootCause, hdfsGetLastExceptionStackTrace};
use hdrs::{Client, ClientBuilder};
use object_store::path::Path;
use object_store::Error::Generic;
use object_store::{GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore};
use parquet::data_type::AsBytes;
use std::ffi::CStr;
use std::fmt::{Debug, Display, Formatter};
use std::io;
use std::io::ErrorKind::NotFound;
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
// use tokio_util::io::ReaderStream;

pub struct Hdfs {
client: Arc<Client>,
Expand All @@ -38,11 +39,20 @@ impl Hdfs {
None => client_builder,
Some(user) => client_builder.with_user(user.as_str()),
};
let client = client_builder.connect().map_err(DataFusionError::IoError)?;

Ok(Self {
client: Arc::new(client),
})
let client = client_builder.connect();
match client {
Ok(c) => Ok(Self { client: Arc::new(c) }),
Err(e) => unsafe {
let errmsg = format!(
"Open HDFS client failed: {:?}\nroot cause: {:?}\nstack trace: {:?}",
e,
CStr::from_ptr(hdfsGetLastExceptionRootCause()),
CStr::from_ptr(hdfsGetLastExceptionStackTrace())
);
println!("{}", errmsg);
Err(DataFusionError::IoError(io::Error::new(e.kind(), errmsg)))
},
}
}

async fn is_file_exist(&self, path: &Path) -> object_store::Result<bool> {
Expand Down Expand Up @@ -342,7 +352,7 @@ mod tests {
use datafusion::datasource::object_store::ObjectStoreUrl;
use futures::StreamExt;
use object_store::path::Path;
use object_store::GetResult::Stream;
use object_store::GetResultPayload::Stream;
use object_store::ObjectStore;
use rand::distributions::{Alphanumeric, DistString};
use rand::thread_rng;
Expand All @@ -363,7 +373,7 @@ mod tests {

async fn read_file_from_hdfs(path: String, object_store: Arc<dyn ObjectStore>) -> String {
let file = object_store.get(&Path::from(path)).await.unwrap();
match file {
match file.payload {
Stream(s) => {
let read_result = s
.collect::<Vec<object_store::Result<Bytes>>>()
Expand Down
1 change: 1 addition & 0 deletions rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ fn register_hdfs_object_store(
{
let hdfs = Hdfs::try_new(_host, _config.clone())?;
_runtime.register_object_store(_url, Arc::new(hdfs));
println!("registered hdfs objec store {:?}, {:?}", _host, _url);
Ok(())
}
}
Expand Down

0 comments on commit d5a2e60

Please sign in to comment.