Skip to content

Commit

Permalink
reduce AbortHelper
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Feb 18, 2024
1 parent 00e4528 commit f86c420
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ object ConsistencyCI {
sparkDF.show
println(s"${tup._1} rustDF: ")
rustDF.show
// val diff1 = sparkDF.rdd.subtract(rustDF.rdd)
// val diff2 = rustDF.rdd.subtract(sparkDF.rdd)
// val result = diff1.count() == 0 && diff2.count() == 0
// if (!result) {
// println("sparkDF: ")
// println(sparkDF.collectAsList())
// println("rustDF: ")
// println(rustDF.collectAsList())
// System.exit(1)
// }
val diff1 = sparkDF.rdd.subtract(rustDF.rdd)
val diff2 = rustDF.rdd.subtract(sparkDF.rdd)
val result = diff1.count() == 0 && diff2.count() == 0
if (!result) {
println("sparkDF: ")
println(sparkDF.collectAsList())
println("rustDF: ")
println(rustDF.collectAsList())
System.exit(1)
}
})

}
Expand Down
3 changes: 2 additions & 1 deletion rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
table_name: table_name.to_string(),
table_path: format!(
"file:{}default/{}",
env::temp_dir()
env::current_dir()
.unwrap()
.to_str()
.ok_or(LakeSoulError::Internal("can not get $TMPDIR".to_string()))?,
table_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::sync::Arc;

use arrow::array::{ArrayRef, UInt64Array};
use arrow::array::{ArrayRef, StringArray, UInt64Array};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;

Expand All @@ -12,7 +12,6 @@ use datafusion::common::{FileType, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::common::AbortOnDropSingle;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, Distribution, Partitioning, SendableRecordBatchStream};
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -140,7 +139,8 @@ pub struct LakeSoulHashSinkExec {
input: Arc<dyn ExecutionPlan>,

/// Schema describing the structure of the output data.
count_schema: SchemaRef,
sink_schema: SchemaRef,

/// Optional required sort order for output data.
sort_order: Option<Vec<PhysicalSortRequirement>>,

Expand All @@ -151,7 +151,7 @@ pub struct LakeSoulHashSinkExec {

impl Debug for LakeSoulHashSinkExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "LakeSoulHashSinkExec schema: {:?}", self.count_schema)
write!(f, "LakeSoulHashSinkExec schema: {:?}", self.sink_schema)
}
}

Expand All @@ -165,7 +165,7 @@ impl LakeSoulHashSinkExec {
) -> Result<Self> {
Ok(Self {
input,
count_schema: make_count_schema(),
sink_schema: make_sink_schema(),
sort_order,
table_info,
metadata_client,
Expand Down Expand Up @@ -205,7 +205,7 @@ impl LakeSoulHashSinkExec {
let mut partitioned_writer = HashMap::<Vec<(String, ScalarValue)>, Box<MultiPartAsyncWriter>>::new();
let mut partitioned_file_path_and_row_count_locked = partitioned_file_path_and_row_count.lock().await;
while let Some(batch) = data.next().await.transpose()? {
dbg!(&batch.num_rows());
debug!("write record_batch with {} rows", batch.num_rows());
let columnar_value = get_columnar_value(&batch);
let file_absolute_path = format!("{}/part-{}_{:0>4}.parquet", table_info.table_path, write_id, partition);
if !partitioned_writer.contains_key(&columnar_value) {
Expand Down Expand Up @@ -293,7 +293,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec {

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.count_schema.clone()
self.sink_schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
Expand Down Expand Up @@ -348,7 +348,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec {
fn with_new_children(self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
input: children[0].clone(),
count_schema: self.count_schema.clone(),
sink_schema: self.sink_schema.clone(),
sort_order: self.sort_order.clone(),
table_info: self.table_info.clone(),
metadata_client: self.metadata_client.clone(),
Expand Down Expand Up @@ -392,18 +392,18 @@ impl ExecutionPlan for LakeSoulHashSinkExec {
schema: self.table_info().table_namespace.clone().into(),
table: self.table_info().table_name.clone().into(),
};
let join_handle = AbortOnDropSingle::new(tokio::spawn(Self::wait_for_commit(
let join_handle = tokio::spawn(Self::wait_for_commit(
join_handles,
self.metadata_client(),
table_ref.to_string(),
partitioned_file_path_and_row_count,
)));
));

// });

// let abort_helper = Arc::new(AbortOnDropMany(join_handles));

let count_schema = self.count_schema.clone();
let sink_schema = self.sink_schema.clone();
// let count = futures::future::join_all(join_handles).await;
// for (columnar_values, result) in partitioned_file_path_and_row_count.lock().await.iter() {
// match commit_data(self.metadata_client(), self.table_info().table_name.as_str(), &result.0).await {
Expand All @@ -414,32 +414,39 @@ impl ExecutionPlan for LakeSoulHashSinkExec {

let stream = futures::stream::once(async move {
match join_handle.await {
Ok(Ok(count)) => Ok(make_count_batch(count)),
_other => Ok(make_count_batch(u64::MAX)),
Ok(Ok(count)) => Ok(make_sink_batch(count, String::from(""))),
Ok(Err(e)) => {
debug!("{}", e.to_string());
Ok(make_sink_batch(u64::MAX, e.to_string()))
}
Err(e) => {
debug!("{}", e.to_string());
Ok(make_sink_batch(u64::MAX, e.to_string()))
}
}
})
.boxed();

Ok(Box::pin(RecordBatchStreamAdapter::new(count_schema, stream)))
Ok(Box::pin(RecordBatchStreamAdapter::new(sink_schema, stream)))
}
}

/// Create an output record batch with a count
///
/// ```text
/// +-------+,
/// | count |,
/// +-------+,
/// | 6 |,
/// +-------+,
/// ```
fn make_count_batch(count: u64) -> RecordBatch {
let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;

RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()

fn make_sink_batch(count: u64, msg: String) -> RecordBatch {
let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
let msg_array = Arc::new(StringArray::from(vec![msg])) as ArrayRef;
RecordBatch::try_from_iter_with_nullable(
vec![
("count", count_array, false),
("msg", msg_array, false)
]).unwrap()
}

fn make_count_schema() -> SchemaRef {
fn make_sink_schema() -> SchemaRef {
// define a schema.
Arc::new(Schema::new(vec![Field::new("count", DataType::UInt64, false)]))
Arc::new(Schema::new(vec![
Field::new("count", DataType::UInt64, false),
Field::new("msg", DataType::Utf8, false),
])
)
}
1 change: 0 additions & 1 deletion rust/lakesoul-datafusion/src/lakesoul_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod helpers;
use std::{ops::Deref, sync::Arc};

use arrow::datatypes::{SchemaRef, Schema};
use datafusion::dataframe;
use datafusion::sql::TableReference;
use datafusion::{
dataframe::DataFrame,
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-datafusion/src/test/catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mod catalog_tests {
}
v
};
let path = format!("{}{}/{}", env::temp_dir().to_str().unwrap(), &np.namespace, &table_name);
let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), &np.namespace, &table_name);
let table_id = format!(
"table_{}",
(&mut rng)
Expand All @@ -104,7 +104,7 @@ mod catalog_tests {
}

fn table_info(table_name: &str, namespace: &str, schema: SchemaRef) -> TableInfo {
let path = format!("{}{}/{}", env::temp_dir().to_str().unwrap(), namespace, table_name);
let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), namespace, table_name);
let schema = serde_json::to_string::<ArrowJavaSchema>(&schema.into()).unwrap();
TableInfo {
table_id: "table_000000001".into(),
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/update_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod update_tests {
}

async fn execute_append(batch: RecordBatch, table_name: &str, client: MetaDataClientRef) -> Result<()> {
let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::<PathBuf>().to_str().unwrap().to_string();
let file = [env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::<PathBuf>().to_str().unwrap().to_string();
let builder = create_io_config_builder(client.clone(), Some(table_name)).await?.with_file(file.clone()).with_schema(batch.schema());
let config = builder.clone().build();

Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/test/upsert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod upsert_with_io_config_tests {
builder: LakeSoulIOConfigBuilder,
) -> LakeSoulIOConfigBuilder {
let file = [
env::temp_dir().to_str().unwrap(),
env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(),
table_name,
format!(
"{}.parquet",
Expand Down

0 comments on commit f86c420

Please sign in to comment.