Skip to content

Commit

Permalink
[Rust] fix panic
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <[email protected]>

fix proto

Signed-off-by: mag1c1an1 <[email protected]>

fix lakesoul-io

Signed-off-by: mag1c1an1 <[email protected]>

fix lakesoul-io-c

Signed-off-by: mag1c1an1 <[email protected]>

fix lakesoul-datafusion

Signed-off-by: mag1c1an1 <[email protected]>

fix lakesoul-datafusion

Signed-off-by: mag1c1an1 <[email protected]>

remove meaninglesss expect

Signed-off-by: mag1c1an1 <[email protected]>

cargo clippy && cargo fmt

Signed-off-by: mag1c1an1 <[email protected]>
  • Loading branch information
mag1c1an1 committed Feb 4, 2024
1 parent 4e4b415 commit 1ae0f3a
Show file tree
Hide file tree
Showing 37 changed files with 497 additions and 481 deletions.
25 changes: 25 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ serde = { version = "1.0", features = ["derive", "std", "rc"] }
rand = "^0.8"
bytes = "1.4.0"
half = "^2.1"
tracing = "0.1.40"
tracing = "0.1.40"
thiserror = "1.0"
anyhow = "1.0.79"
2 changes: 2 additions & 0 deletions rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ tokio = { workspace = true }
rand = { workspace = true }
bytes = { workspace = true }
tracing = "0.1.40"
thiserror = { workspace = true }
anyhow = { workspace = true }

[dev-dependencies]
ctor = "0.2"
Expand Down
3 changes: 2 additions & 1 deletion rust/lakesoul-datafusion/src/catalog/lakesoul_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use lakesoul_metadata::error::LakeSoulMetaDataError;
use lakesoul_metadata::MetaDataClientRef;
use proto::proto::entity::Namespace;
use std::any::Any;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl CatalogProvider for LakeSoulCatalog {
Handle::current()
.spawn(async move { client.create_namespace(np).await })
.await
.expect("tokio join error in register schema")
.map_err(|e| LakeSoulMetaDataError::Other(Box::new(e)))?
});
Ok(schema)
}
Expand Down
5 changes: 3 additions & 2 deletions rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ impl SchemaProvider for LakeSoulNamespace {
/// return LakeSoulListing table
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let _guard = self.namespace_lock.read().await;
if let Ok(_) = self
if self
.metadata_client
.get_table_info_by_table_name(name, &self.namespace)
.await
.is_ok()
{
debug!("call table() on table: {}.{}", &self.namespace, name);
let config;
Expand Down Expand Up @@ -192,7 +193,7 @@ impl SchemaProvider for LakeSoulNamespace {
return Ok(Some(Arc::new(table_provider) as Arc<dyn TableProvider>));
}
debug("get table provider fail");
return Err(DataFusionError::External("get table provider failed".into()));
Err(DataFusionError::External("get table provider failed".into()))
}
Err(e) => match e {
LakeSoulMetaDataError::NotFound(_) => Ok(None),
Expand Down
33 changes: 20 additions & 13 deletions rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use proto::proto::entity::{CommitOp, DataCommitInfo, DataFileOp, FileOp, TableIn
use crate::lakesoul_table::helpers::create_io_config_builder_from_table_info;
use crate::serialize::arrow_java::ArrowJavaSchema;
// use crate::transaction::TransactionMetaInfo;
use crate::error::Result;
use crate::error::{LakeSoulError, Result};

// pub mod lakesoul_sink;
// pub mod lakesoul_source;
mod lakesoul_catalog;
// used in catalog_test, but still say unused_imports, i think it is a bug about rust-lint.
// used in catalog_test, but still say unused_imports, I think it is a bug about rust-lint.
// this is a workaround
#[cfg(test)]
pub use lakesoul_catalog::*;
Expand All @@ -37,8 +37,14 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
.create_table(TableInfo {
table_id: format!("table_{}", uuid::Uuid::new_v4()),
table_name: table_name.to_string(),
table_path: format!("file://{}default/{}", env::temp_dir().to_str().unwrap(), table_name),
table_schema: serde_json::to_string::<ArrowJavaSchema>(&config.schema().into()).unwrap(),
table_path: format!(
"file://{}default/{}",
env::temp_dir()
.to_str()
.ok_or(LakeSoulError::Internal("can not get $TMPDIR".to_string()))?,
table_name
),
table_schema: serde_json::to_string::<ArrowJavaSchema>(&config.schema().into())?,
table_namespace: "default".to_string(),
properties: serde_json::to_string(&LakeSoulTableProperty {
hash_bucket_num: Some(4),
Expand Down Expand Up @@ -74,16 +80,20 @@ pub(crate) async fn create_io_config_builder(
} else {
vec![]
};
Ok(create_io_config_builder_from_table_info(Arc::new(table_info)).with_files(data_files))
create_io_config_builder_from_table_info(Arc::new(table_info)).map(|builder| builder.with_files(data_files))
} else {
Ok(LakeSoulIOConfigBuilder::new())
}
}

pub(crate) fn parse_table_info_partitions(partitions: String) -> (Vec<String>, Vec<String>) {
let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').unwrap());
pub(crate) fn parse_table_info_partitions(partitions: String) -> Result<(Vec<String>, Vec<String>)> {
let (range_keys, hash_keys) = partitions.split_at(
partitions
.find(';')
.ok_or(LakeSoulError::Internal("wrong partition format".to_string()))?,
);
let hash_keys = &hash_keys[1..];
(
Ok((
range_keys
.split(',')
.collect::<Vec<&str>>()
Expand All @@ -96,7 +106,7 @@ pub(crate) fn parse_table_info_partitions(partitions: String) -> (Vec<String>, V
.iter()
.filter_map(|str| if str.is_empty() { None } else { Some(str.to_string()) })
.collect::<Vec<String>>(),
)
))
}

pub(crate) async fn commit_data(
Expand Down Expand Up @@ -130,10 +140,7 @@ pub(crate) async fn commit_data(
})
.collect(),
commit_op: CommitOp::AppendCommit as i32,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs() as i64,
commit_id: {
let (high, low) = uuid::Uuid::new_v4().as_u64_pair();
Some(Uuid { high, low })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl LakeSoulHashSinkExec {
let file_absolute_path = format!("{}/part-{}_{:0>4}.parquet", table_info.table_path, write_id, partition);
if !partitioned_writer.contains_key(&columnar_value) {
let mut config = create_io_config_builder_from_table_info(table_info.clone())
.map_err(|e| DataFusionError::External(Box::new(e)))?
.with_files(vec![file_absolute_path.clone()])
.with_schema(batch.schema())
.build();
Expand Down Expand Up @@ -268,7 +269,11 @@ impl LakeSoulHashSinkExec {
commit_data(client.clone(), &table_name, partition_desc, files)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
debug!("table: {} insert success at {:?}",&table_name,std::time::SystemTime::now())
debug!(
"table: {} insert success at {:?}",
&table_name,
std::time::SystemTime::now()
)
}
Ok(count)
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-datafusion/src/datasource/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use super::file_format::LakeSoulMetaDataParquetFormat;
/// 2. Hive-style partitioning support, where a path such as
/// `/files/date=1/1/2022/data.parquet` is injected as a `date` column.
///
/// 3. Projection pushdown for formats that support it such as such as
/// 3. Projection pushdown for formats that support it such as
/// Parquet
///
/// ```
Expand All @@ -56,7 +56,7 @@ impl LakeSoulTableProvider {
as_sink: bool,
) -> crate::error::Result<Self> {
let schema = schema_from_metadata_str(&table_info.table_schema);
let (_, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone());
let (_, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?;

let file_format: Arc<dyn FileFormat> = match as_sink {
true => {
Expand Down
90 changes: 19 additions & 71 deletions rust/lakesoul-datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::{error::Error, fmt::Display, result, sync::Arc};
use tokio::task::JoinError;
use std::{result, sync::Arc};

use lakesoul_io::lakesoul_reader::{ArrowError, DataFusionError};
use lakesoul_metadata::error::LakeSoulMetaDataError;
Expand All @@ -15,76 +14,25 @@ pub type Result<T, E = LakeSoulError> = result::Result<T, E>;
pub type SharedResult<T> = result::Result<T, Arc<LakeSoulError>>;

/// Error type for generic operations that could result in LakeSoulMetaDataError::External
pub type GenericError = Box<dyn Error + Send + Sync>;
pub type GenericError = Box<dyn std::error::Error + Send + Sync>;

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum LakeSoulError {
MetaDataError(LakeSoulMetaDataError),
DataFusionError(DataFusionError),
ArrowError(ArrowError),
SerdeJsonError(serde_json::Error),
TokioJoinError(tokio::task::JoinError),
#[error("metadata error: {0}")]
MetaDataError(#[from] LakeSoulMetaDataError),
#[error("Datafusion error: {0}")]
DataFusionError(#[from] DataFusionError),
#[error("arrow error: {0}")]
ArrowError(#[from] ArrowError),
#[error("serde_json error: {0}")]
SerdeJsonError(#[from] serde_json::Error),
#[error("tokio error: {0}")]
TokioJoinError(#[from] tokio::task::JoinError),
#[error("sys time error: {0}")]
SysTimeError(#[from] std::time::SystemTimeError),
#[error(
"Internal error: {0}.\nThis was likely caused by a bug in LakeSoul's \
code and we would welcome that you file an bug report in our issue tracker"
)]
Internal(String),
}

impl From<LakeSoulMetaDataError> for LakeSoulError {
fn from(err: LakeSoulMetaDataError) -> Self {
Self::MetaDataError(err)
}
}

impl From<DataFusionError> for LakeSoulError {
fn from(err: DataFusionError) -> Self {
Self::DataFusionError(err)
}
}

impl From<ArrowError> for LakeSoulError {
fn from(err: ArrowError) -> Self {
Self::ArrowError(err)
}
}

impl From<serde_json::Error> for LakeSoulError {
fn from(err: serde_json::Error) -> Self {
Self::SerdeJsonError(err)
}
}

impl From<tokio::task::JoinError> for LakeSoulError {
fn from(err: JoinError) -> Self {
Self::TokioJoinError(err)
}
}

impl Display for LakeSoulError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
LakeSoulError::MetaDataError(ref desc) => write!(f, "metadata error: {desc}"),
LakeSoulError::DataFusionError(ref desc) => write!(f, "DataFusion error: {desc}"),
LakeSoulError::SerdeJsonError(ref desc) => write!(f, "serde_json error: {desc}"),
LakeSoulError::ArrowError(ref desc) => write!(f, "arrow error: {desc}"),
LakeSoulError::TokioJoinError(ref desc) => write!(f, "tokio error: {desc}"),
LakeSoulError::Internal(ref desc) => {
write!(
f,
"Internal error: {desc}.\nThis was likely caused by a bug in LakeSoul's \
code and we would welcome that you file an bug report in our issue tracker"
)
}
}
}
}

impl Error for LakeSoulError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
LakeSoulError::MetaDataError(e) => Some(e),
LakeSoulError::DataFusionError(e) => Some(e),
LakeSoulError::SerdeJsonError(e) => Some(e),
LakeSoulError::ArrowError(e) => Some(e),
LakeSoulError::TokioJoinError(e) => Some(e),
LakeSoulError::Internal(_) => None,
}
}
}
11 changes: 6 additions & 5 deletions rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow::record_batch::RecordBatch;

use datafusion::scalar::ScalarValue;

use crate::error::Result;
use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder;
use proto::proto::entity::TableInfo;

Expand All @@ -16,15 +17,15 @@ use crate::{
serialize::arrow_java::schema_from_metadata_str,
};

pub(crate) fn create_io_config_builder_from_table_info(table_info: Arc<TableInfo>) -> LakeSoulIOConfigBuilder {
let (range_partitions, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone());
let properties = serde_json::from_str::<LakeSoulTableProperty>(&table_info.properties).unwrap();
LakeSoulIOConfigBuilder::new()
pub(crate) fn create_io_config_builder_from_table_info(table_info: Arc<TableInfo>) -> Result<LakeSoulIOConfigBuilder> {
let (range_partitions, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone())?;
let properties = serde_json::from_str::<LakeSoulTableProperty>(&table_info.properties)?;
Ok(LakeSoulIOConfigBuilder::new()
.with_schema(schema_from_metadata_str(&table_info.table_schema))
.with_prefix(table_info.table_path.clone())
.with_primary_keys(hash_partitions)
.with_range_partitions(range_partitions)
.with_hash_bucket_num(properties.hash_bucket_num.unwrap_or(1))
.with_hash_bucket_num(properties.hash_bucket_num.unwrap_or(1)))
}

pub fn get_columnar_value(_batch: &RecordBatch) -> Vec<(String, ScalarValue)> {
Expand Down
Loading

0 comments on commit 1ae0f3a

Please sign in to comment.