diff --git a/.github/workflows/consistency-ci.yml b/.github/workflows/consistency-ci.yml index f4f629ca0..1cffaa2f9 100644 --- a/.github/workflows/consistency-ci.yml +++ b/.github/workflows/consistency-ci.yml @@ -118,7 +118,7 @@ jobs: git clone https://github.com/databricks/tpch-dbgen.git cd tpch-dbgen make - ./dbgen -f -s 0.001 + ./dbgen -f -s 0.1 mv *.tbl ../lakesoul/test_files/tpch/data - name: Verify that benchmark queries return expected results run: | diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala index aee243de6..bfe598155 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/ConsistencyCI.scala @@ -151,16 +151,16 @@ object ConsistencyCI { sparkDF.show println(s"${tup._1} rustDF: ") rustDF.show - // val diff1 = sparkDF.rdd.subtract(rustDF.rdd) - // val diff2 = rustDF.rdd.subtract(sparkDF.rdd) - // val result = diff1.count() == 0 && diff2.count() == 0 - // if (!result) { - // println("sparkDF: ") - // println(sparkDF.collectAsList()) - // println("rustDF: ") - // println(rustDF.collectAsList()) - // System.exit(1) - // } + val diff1 = sparkDF.rdd.subtract(rustDF.rdd) + val diff2 = rustDF.rdd.subtract(sparkDF.rdd) + val result = diff1.count() == 0 && diff2.count() == 0 + if (!result) { + println("sparkDF: ") + println(sparkDF.collectAsList()) + println("rustDF: ") + println(rustDF.collectAsList()) + System.exit(1) + } }) } diff --git a/rust/lakesoul-datafusion/src/catalog/mod.rs b/rust/lakesoul-datafusion/src/catalog/mod.rs index 83c8bb7db..5cb7b633a 100644 --- a/rust/lakesoul-datafusion/src/catalog/mod.rs +++ b/rust/lakesoul-datafusion/src/catalog/mod.rs @@ -39,8 +39,9 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co table_id: format!("table_{}", uuid::Uuid::new_v4()), table_name: table_name.to_string(), table_path: format!( - "file:{}default/{}", - env::temp_dir() + "file:{}/default/{}", + env::current_dir() + .unwrap() .to_str() .ok_or(LakeSoulError::Internal("can not get $TMPDIR".to_string()))?, table_name diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index a2a6ba575..d428a92c0 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use std::fmt::{self, Debug}; use std::sync::Arc; -use arrow::array::{ArrayRef, UInt64Array}; +use arrow::array::{ArrayRef, StringArray, UInt64Array}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -12,7 +12,6 @@ use datafusion::common::{FileType, Statistics}; use datafusion::error::DataFusionError; use datafusion::execution::TaskContext; use datafusion::physical_expr::PhysicalSortExpr; -use datafusion::physical_plan::common::AbortOnDropSingle; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, Distribution, Partitioning, SendableRecordBatchStream}; use datafusion::scalar::ScalarValue; @@ -140,7 +139,8 @@ pub struct LakeSoulHashSinkExec { input: Arc, /// Schema describing the structure of the output data. - count_schema: SchemaRef, + sink_schema: SchemaRef, + /// Optional required sort order for output data. sort_order: Option>, @@ -151,7 +151,7 @@ pub struct LakeSoulHashSinkExec { impl Debug for LakeSoulHashSinkExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "LakeSoulHashSinkExec schema: {:?}", self.count_schema) + write!(f, "LakeSoulHashSinkExec schema: {:?}", self.sink_schema) } } @@ -165,7 +165,7 @@ impl LakeSoulHashSinkExec { ) -> Result { Ok(Self { input, - count_schema: make_count_schema(), + sink_schema: make_sink_schema(), sort_order, table_info, metadata_client, @@ -205,7 +205,7 @@ impl LakeSoulHashSinkExec { let mut partitioned_writer = HashMap::, Box>::new(); let mut partitioned_file_path_and_row_count_locked = partitioned_file_path_and_row_count.lock().await; while let Some(batch) = data.next().await.transpose()? { - dbg!(&batch.num_rows()); + debug!("write record_batch with {} rows", batch.num_rows()); let columnar_value = get_columnar_value(&batch); let file_absolute_path = format!("{}/part-{}_{:0>4}.parquet", table_info.table_path, write_id, partition); if !partitioned_writer.contains_key(&columnar_value) { @@ -293,7 +293,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec { /// Get the schema for this execution plan fn schema(&self) -> SchemaRef { - self.count_schema.clone() + self.sink_schema.clone() } fn output_partitioning(&self) -> Partitioning { @@ -348,7 +348,7 @@ impl ExecutionPlan for LakeSoulHashSinkExec { fn with_new_children(self: Arc, children: Vec>) -> Result> { Ok(Arc::new(Self { input: children[0].clone(), - count_schema: self.count_schema.clone(), + sink_schema: self.sink_schema.clone(), sort_order: self.sort_order.clone(), table_info: self.table_info.clone(), metadata_client: self.metadata_client.clone(), @@ -392,18 +392,18 @@ impl ExecutionPlan for LakeSoulHashSinkExec { schema: self.table_info().table_namespace.clone().into(), table: self.table_info().table_name.clone().into(), }; - let join_handle = AbortOnDropSingle::new(tokio::spawn(Self::wait_for_commit( + let join_handle = tokio::spawn(Self::wait_for_commit( join_handles, self.metadata_client(), table_ref.to_string(), partitioned_file_path_and_row_count, - ))); + )); // }); // let abort_helper = Arc::new(AbortOnDropMany(join_handles)); - let count_schema = self.count_schema.clone(); + let sink_schema = self.sink_schema.clone(); // let count = futures::future::join_all(join_handles).await; // for (columnar_values, result) in partitioned_file_path_and_row_count.lock().await.iter() { // match commit_data(self.metadata_client(), self.table_info().table_name.as_str(), &result.0).await { @@ -414,32 +414,39 @@ impl ExecutionPlan for LakeSoulHashSinkExec { let stream = futures::stream::once(async move { match join_handle.await { - Ok(Ok(count)) => Ok(make_count_batch(count)), - _other => Ok(make_count_batch(u64::MAX)), + Ok(Ok(count)) => Ok(make_sink_batch(count, String::from(""))), + Ok(Err(e)) => { + debug!("{}", e.to_string()); + Ok(make_sink_batch(u64::MAX, e.to_string())) + } + Err(e) => { + debug!("{}", e.to_string()); + Ok(make_sink_batch(u64::MAX, e.to_string())) + } } }) .boxed(); - Ok(Box::pin(RecordBatchStreamAdapter::new(count_schema, stream))) + Ok(Box::pin(RecordBatchStreamAdapter::new(sink_schema, stream))) } } -/// Create an output record batch with a count -/// -/// ```text -/// +-------+, -/// | count |, -/// +-------+, -/// | 6 |, -/// +-------+, -/// ``` -fn make_count_batch(count: u64) -> RecordBatch { - let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; - - RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap() + +fn make_sink_batch(count: u64, msg: String) -> RecordBatch { + let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + let msg_array = Arc::new(StringArray::from(vec![msg])) as ArrayRef; + RecordBatch::try_from_iter_with_nullable( + vec![ + ("count", count_array, false), + ("msg", msg_array, false) + ]).unwrap() } -fn make_count_schema() -> SchemaRef { +fn make_sink_schema() -> SchemaRef { // define a schema. - Arc::new(Schema::new(vec![Field::new("count", DataType::UInt64, false)])) + Arc::new(Schema::new(vec![ + Field::new("count", DataType::UInt64, false), + Field::new("msg", DataType::Utf8, false), + ]) + ) } diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs index 88d16b8ee..66e68e486 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/mod.rs @@ -7,7 +7,6 @@ pub mod helpers; use std::{ops::Deref, sync::Arc}; use arrow::datatypes::{SchemaRef, Schema}; -use datafusion::dataframe; use datafusion::sql::TableReference; use datafusion::{ dataframe::DataFrame, diff --git a/rust/lakesoul-datafusion/src/test/catalog_tests.rs b/rust/lakesoul-datafusion/src/test/catalog_tests.rs index 3579e1c9a..de24b6198 100644 --- a/rust/lakesoul-datafusion/src/test/catalog_tests.rs +++ b/rust/lakesoul-datafusion/src/test/catalog_tests.rs @@ -78,7 +78,7 @@ mod catalog_tests { } v }; - let path = format!("{}{}/{}", env::temp_dir().to_str().unwrap(), &np.namespace, &table_name); + let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), &np.namespace, &table_name); let table_id = format!( "table_{}", (&mut rng) @@ -104,7 +104,7 @@ mod catalog_tests { } fn table_info(table_name: &str, namespace: &str, schema: SchemaRef) -> TableInfo { - let path = format!("{}{}/{}", env::temp_dir().to_str().unwrap(), namespace, table_name); + let path = format!("{}{}/{}", env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), namespace, table_name); let schema = serde_json::to_string::(&schema.into()).unwrap(); TableInfo { table_id: "table_000000001".into(), diff --git a/rust/lakesoul-datafusion/src/test/update_tests.rs b/rust/lakesoul-datafusion/src/test/update_tests.rs index dad7730f4..ad1612b54 100644 --- a/rust/lakesoul-datafusion/src/test/update_tests.rs +++ b/rust/lakesoul-datafusion/src/test/update_tests.rs @@ -37,7 +37,7 @@ mod update_tests { } async fn execute_append(batch: RecordBatch, table_name: &str, client: MetaDataClientRef) -> Result<()> { - let file = [env::temp_dir().to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::().to_str().unwrap().to_string(); + let file = [env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), table_name, format!("{}.parquet", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis().to_string()).as_str()].iter().collect::().to_str().unwrap().to_string(); let builder = create_io_config_builder(client.clone(), Some(table_name)).await?.with_file(file.clone()).with_schema(batch.schema()); let config = builder.clone().build(); diff --git a/rust/lakesoul-datafusion/src/test/upsert_tests.rs b/rust/lakesoul-datafusion/src/test/upsert_tests.rs index e3adfbb0c..a2ad95de1 100644 --- a/rust/lakesoul-datafusion/src/test/upsert_tests.rs +++ b/rust/lakesoul-datafusion/src/test/upsert_tests.rs @@ -67,7 +67,7 @@ mod upsert_with_io_config_tests { builder: LakeSoulIOConfigBuilder, ) -> LakeSoulIOConfigBuilder { let file = [ - env::temp_dir().to_str().unwrap(), + env::current_dir().unwrap_or(env::temp_dir()).to_str().unwrap(), table_name, format!( "{}.parquet",