From d5a2e6046bbf9153de84eecaeac75029d6631c19 Mon Sep 17 00:00:00 2001 From: Xu Chen Date: Tue, 3 Sep 2024 10:24:44 +0800 Subject: [PATCH] [Native] Augment libhdfs error with java stacktrace (#536) * print libhdfs error Signed-off-by: chenxu * fix clippy Signed-off-by: chenxu --------- Signed-off-by: chenxu Co-authored-by: chenxu --- rust/Cargo.lock | 1 + rust/lakesoul-io-c/src/lib.rs | 2 +- rust/lakesoul-io/Cargo.toml | 3 ++- rust/lakesoul-io/src/hdfs/mod.rs | 28 +++++++++++++++------- rust/lakesoul-io/src/lakesoul_io_config.rs | 1 + 5 files changed, 24 insertions(+), 11 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 720118198..86eb13656 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1908,6 +1908,7 @@ dependencies = [ "env_logger", "futures", "half", + "hdfs-sys", "hdrs", "hex", "lazy_static", diff --git a/rust/lakesoul-io-c/src/lib.rs b/rust/lakesoul-io-c/src/lib.rs index 69e06cfae..f463212a1 100644 --- a/rust/lakesoul-io-c/src/lib.rs +++ b/rust/lakesoul-io-c/src/lib.rs @@ -399,7 +399,7 @@ pub extern "C" fn create_lakesoul_reader_from_config( let runtime: Runtime = from_opaque(runtime); let result = match LakeSoulReader::new(config) { Ok(reader) => CResult::::new(SyncSendableMutableLakeSoulReader::new(reader, runtime)), - Err(e) => CResult::::error(format!("{}", e).as_str()), + Err(e) => CResult::::error(format!("{:?}", e).as_str()), }; convert_to_nonnull(result) } diff --git a/rust/lakesoul-io/Cargo.toml b/rust/lakesoul-io/Cargo.toml index 5c3dd03ca..99ace6c18 100644 --- a/rust/lakesoul-io/Cargo.toml +++ b/rust/lakesoul-io/Cargo.toml @@ -31,6 +31,7 @@ smallvec = "1.10" dary_heap = "0.3" bytes = { workspace = true } hdrs = { git = "https://github.com/lakesoul-io/hdrs.git", branch = "main", features = ["async_file"], optional = true } +hdfs-sys = { version = "0.3", default-features = false, features = ["hdfs_3_3"], optional = true } lazy_static = "1.4.0" chrono = "0.4" serde_json = { workspace = true } @@ -46,7 +47,7 @@ env_logger = "0.11" hex = "0.4" [features] -hdfs = ["dep:hdrs"] +hdfs = ["dep:hdrs", "dep:hdfs-sys"] default = [] [target.'cfg(target_os = "windows")'.dependencies] diff --git a/rust/lakesoul-io/src/hdfs/mod.rs b/rust/lakesoul-io/src/hdfs/mod.rs index 7c0ddf51c..a4d026a59 100644 --- a/rust/lakesoul-io/src/hdfs/mod.rs +++ b/rust/lakesoul-io/src/hdfs/mod.rs @@ -11,20 +11,21 @@ use bytes::Bytes; use datafusion::error::Result; use datafusion_common::DataFusionError; use futures::stream::BoxStream; -// use futures::TryStreamExt; +use hdfs_sys::{hdfsGetLastExceptionRootCause, hdfsGetLastExceptionStackTrace}; use hdrs::{Client, ClientBuilder}; use object_store::path::Path; use object_store::Error::Generic; use object_store::{GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore}; use parquet::data_type::AsBytes; +use std::ffi::CStr; use std::fmt::{Debug, Display, Formatter}; +use std::io; use std::io::ErrorKind::NotFound; use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::Arc; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; -// use tokio_util::io::ReaderStream; pub struct Hdfs { client: Arc, @@ -38,11 +39,20 @@ impl Hdfs { None => client_builder, Some(user) => client_builder.with_user(user.as_str()), }; - let client = client_builder.connect().map_err(DataFusionError::IoError)?; - - Ok(Self { - client: Arc::new(client), - }) + let client = client_builder.connect(); + match client { + Ok(c) => Ok(Self { client: Arc::new(c) }), + Err(e) => unsafe { + let errmsg = format!( + "Open HDFS client failed: {:?}\nroot cause: {:?}\nstack trace: {:?}", + e, + CStr::from_ptr(hdfsGetLastExceptionRootCause()), + CStr::from_ptr(hdfsGetLastExceptionStackTrace()) + ); + println!("{}", errmsg); + Err(DataFusionError::IoError(io::Error::new(e.kind(), errmsg))) + }, + } } async fn is_file_exist(&self, path: &Path) -> object_store::Result { @@ -342,7 +352,7 @@ mod tests { use datafusion::datasource::object_store::ObjectStoreUrl; use futures::StreamExt; use object_store::path::Path; - use object_store::GetResult::Stream; + use object_store::GetResultPayload::Stream; use object_store::ObjectStore; use rand::distributions::{Alphanumeric, DistString}; use rand::thread_rng; @@ -363,7 +373,7 @@ mod tests { async fn read_file_from_hdfs(path: String, object_store: Arc) -> String { let file = object_store.get(&Path::from(path)).await.unwrap(); - match file { + match file.payload { Stream(s) => { let read_result = s .collect::>>() diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 64c65db52..87d9c3c37 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -393,6 +393,7 @@ fn register_hdfs_object_store( { let hdfs = Hdfs::try_new(_host, _config.clone())?; _runtime.register_object_store(_url, Arc::new(hdfs)); + println!("registered hdfs objec store {:?}, {:?}", _host, _url); Ok(()) } }