Skip to content

Commit

Permalink
Merge pull request #450 from Ceng23333/test/consistency_ci
Browse files Browse the repository at this point in the history
[Rust/BugFix]fix escape path error
  • Loading branch information
Ceng23333 authored Mar 5, 2024
2 parents 0d377b8 + 391d880 commit 9c670fd
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ object MergePartitionedFileUtil {
requestPartitionFields: Array[String]): MergePartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)

val filePathStr = filePath
val fs = filePath
.getFileSystem(sparkSession.sessionState.newHadoopConf())
val filePathStr = fs
.makeQualified(filePath).toString
val touchedFileInfo = fileInfo.find(f => filePathStr.equals(f.path))
val touchedFileInfo = fileInfo.find(f => filePathStr.equals(fs.makeQualified(new Path(f.path)).toString))
.getOrElse(throw LakeSoulErrors.filePathNotFoundException(filePathStr, fileInfo.mkString(",")))

val touchedFileSchema = requestFilesSchemaMap(touchedFileInfo.range_version).fieldNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ case class BatchDataSoulFileIndexV2(override val spark: SparkSession,

object LakeSoulFileIndexUtils {
def absolutePath(child: String, tableName: String): Path = {
val p = new Path(new URI(child))
val p = new Path(child)
if (p.isAbsolute) {
p
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object ConsistencyCI {
StructField("c_mktsegment", StringType, nullable = false),
StructField("c_comment", StringType, nullable = false),
)),
"c_custkey, c_name"),
"c_custkey, c_name", Some("c_nationkey")),
("part",
StructType(Array(
StructField("p_partkey", LongType, nullable = false),
Expand All @@ -33,7 +33,7 @@ object ConsistencyCI {
StructField("p_retailprice", DecimalType(15, 2), nullable = false),
StructField("p_comment", StringType, nullable = false),
)),
"p_partkey, p_name"),
"p_partkey, p_name", Option.empty),
("supplier",
StructType(Array(
StructField("s_suppkey", LongType, nullable = false),
Expand All @@ -44,7 +44,7 @@ object ConsistencyCI {
StructField("s_acctbal", DecimalType(15, 2), nullable = false),
StructField("s_comment", StringType, nullable = false),
)),
"s_suppkey, s_name"),
"s_suppkey, s_name", Some("s_nationkey")),
("partsupp",
StructType(Array(
StructField("ps_partkey", LongType, nullable = false),
Expand All @@ -53,7 +53,7 @@ object ConsistencyCI {
StructField("ps_supplycost", DecimalType(15, 2), nullable = false),
StructField("ps_comment", StringType, nullable = false),
)),
"ps_partkey, ps_suppkey"),
"ps_partkey, ps_suppkey", Option.empty),
("orders",
StructType(Array(
StructField("o_orderkey", LongType, nullable = false),
Expand All @@ -66,7 +66,7 @@ object ConsistencyCI {
StructField("o_shippriority", IntegerType, nullable = false),
StructField("o_comment", StringType, nullable = false),
)),
"o_orderkey, o_custkey"),
"o_orderkey, o_custkey", Some("o_orderpriority")),

("nation",
StructType(Array(
Expand All @@ -75,14 +75,14 @@ object ConsistencyCI {
StructField("n_regionkey", LongType, nullable = false),
StructField("n_comment", StringType, nullable = false),
)),
"n_nationkey, n_name"),
"n_nationkey, n_name", Some("n_regionkey")),
("region",
StructType(Array(
StructField("r_regionkey", LongType, nullable = false),
StructField("r_name", StringType, nullable = false),
StructField("r_comment", StringType, nullable = false),
)),
"r_regionkey, r_name"),
"r_regionkey, r_name", Option.empty),
("lineitem",
StructType(Array(
StructField("l_orderkey", LongType, nullable = false),
Expand All @@ -102,25 +102,37 @@ object ConsistencyCI {
StructField("l_shipmode", StringType, nullable = false),
StructField("l_comment", StringType, nullable = false),
)),
"l_orderkey, l_partkey"),
"l_orderkey, l_partkey", Option.empty),
)

def load_data(spark: SparkSession): Unit = {

val tpchPath = System.getenv("TPCH_DATA")
val lakeSoulPath = "/tmp/lakesoul/tpch"
tpchTable.foreach(tup => {
val (name, schema, hashPartitions) = tup
val (name, schema, hashPartitions, rangePartitions) = tup
val df = spark.read.option("delimiter", "|")
.schema(schema)
.csv(s"$tpchPath/$name.tbl")
// df.show
df.write.format("lakesoul")
.option("shortTableName", name)
.option("hashPartitions", hashPartitions)
.option("hashBucketNum", 5)
.mode("Overwrite")
.save(s"$lakeSoulPath/$name")
rangePartitions match {
case Some(value) =>
df.write.format("lakesoul")
.option("shortTableName", name)
.option("hashPartitions", hashPartitions)
.option("rangePartitions", value)
.option("hashBucketNum", 5)
.mode("Overwrite")
.save(s"$lakeSoulPath/$name")
case None =>
df.write.format("lakesoul")
.option("shortTableName", name)
.option("hashPartitions", hashPartitions)
.option("hashBucketNum", 5)
.mode("Overwrite")
.save(s"$lakeSoulPath/$name")
}

})

}
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ bytes = { workspace = true }
tracing = "0.1.40"
thiserror = { workspace = true }
anyhow = { workspace = true }
url = { workspace = true }


[dev-dependencies]
ctor = "0.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ impl LakeSoulHashSinkExec {
let columnar_values = get_columnar_values(&batch, range_partitions.clone())?;
let partition_desc = columnar_values_to_partition_desc(&columnar_values);
let batch_excluding_range = batch.project(&schema_projection_excluding_range)?;
let file_absolute_path = format!("{}{}part-{}_{:0>4}.parquet", table_info.table_path, columnar_values_to_sub_path(&columnar_values), write_id, partition);
let file_absolute_path = format!("{}{}part-{}_{:0>4}.parquet", table_info.table_path, columnar_values_to_sub_path(&columnar_values), write_id, partition);

if !partitioned_writer.contains_key(&partition_desc) {
let mut config = create_io_config_builder_from_table_info(table_info.clone())
.map_err(|e| DataFusionError::External(Box::new(e)))?
.with_files(vec![file_absolute_path.clone()])
.with_files(vec![file_absolute_path])
.with_schema(batch_excluding_range.schema())
.build();

Expand Down Expand Up @@ -392,7 +392,6 @@ impl LakeSoulHashSinkExec {
let partitioned_file_path_and_row_count = partitioned_file_path_and_row_count.lock().await;

for (partition_desc, (files, _)) in partitioned_file_path_and_row_count.iter() {
// let partition_desc = columnar_values_to_partition_desc(columnar_values);
commit_data(client.clone(), &table_name, partition_desc.clone(), files)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand Down
7 changes: 4 additions & 3 deletions rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use arrow::{array::{Array, ArrayRef, AsArray, StringBuilder}, compute::prep_null
use arrow_cast::cast;
use arrow_arith::boolean::and;

use datafusion::{common::{DFField, DFSchema}, datasource::listing::ListingTableUrl, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue};
use datafusion::{common::{DFField, DFSchema}, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue};
use lakesoul_metadata::MetaDataClientRef;
use object_store::{ObjectMeta, ObjectStore};
use object_store::{path::Path, ObjectMeta, ObjectStore};
use tracing::{debug, trace};
use url::Url;

use crate::error::Result;
use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder;
Expand Down Expand Up @@ -200,7 +201,7 @@ pub async fn listing_partition_info(partition_info: PartitionInfo, store: &dyn O
.get_data_files_of_single_partition(&partition_info).await.map_err(|_| DataFusionError::External("listing partition info failed".into()))?;
let mut files = Vec::new();
for path in paths {
let result = store.head(ListingTableUrl::parse(path.clone())?.prefix()).await?;
let result = store.head(&Path::from_url_path(Url::parse(path.as_str()).map_err(|e| DataFusionError::External(Box::new(e)))?.path())?).await?;
files.push(result);
}
Ok((partition_info, files))
Expand Down
1 change: 0 additions & 1 deletion rust/lakesoul-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
// after finished. remove above attr
extern crate core;

mod benchmarks;
mod catalog;
mod datasource;
mod error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,96 @@
// SPDX-License-Identifier: Apache-2.0

mod run;
use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder};
use arrow::datatypes::{Schema, SchemaBuilder, Field, DataType};

pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region",
"part", "supplier", "partsupp", "customer",
"orders",
"lineitem",
"nation", "region",
];

/// The `.tbl` file contains a trailing column
pub fn get_tbl_tpch_table_primary_keys(table: &str) -> Vec<String> {
match table {
"part" => vec![String::from("p_partkey"), String::from("p_name")],
"part" => vec![
String::from("p_partkey"),
String::from("p_name"),
],

"supplier" => vec![
String::from("s_suppkey"),
String::from("s_name"),
],

"partsupp" => vec![
String::from("ps_partkey"),
String::from("ps_suppkey"),
],

"customer" => vec![
String::from("c_custkey"),
String::from("c_name"),
],

"orders" => vec![
String::from("o_orderkey"),
String::from("o_custkey"),
],

"lineitem" => vec![
String::from("l_orderkey"),
String::from("l_partkey"),
],

"nation" => vec![
String::from("n_nationkey"),
String::from("n_name"),
],

"region" => vec![
String::from("r_regionkey"),
String::from("r_name"),
],

_ => unimplemented!(),
}
}

"supplier" => vec![String::from("s_suppkey"), String::from("s_name")],
pub fn get_tbl_tpch_table_range_partitions(table: &str) -> Vec<String> {
match table {
"part" => vec![],

"partsupp" => vec![String::from("ps_partkey"), String::from("ps_suppkey")],
"supplier" => vec![
String::from("s_nationkey"),
],

"customer" => vec![String::from("c_custkey"), String::from("c_name")],
"partsupp" => vec![],

"orders" => vec![String::from("o_orderkey"), String::from("o_custkey")],
"customer" => vec![
String::from("c_nationkey"),
],

"lineitem" => vec![String::from("l_orderkey"), String::from("l_partkey")],
"orders" => vec![
// String::from("o_orderdate"),
String::from("o_orderpriority"),
],

"nation" => vec![String::from("n_nationkey"), String::from("n_name")],
"lineitem" => vec![
],

"region" => vec![String::from("r_regionkey"), String::from("r_name")],
"nation" => vec![
String::from("n_regionkey"),
],

"region" => vec![
],

_ => unimplemented!(),
}
}



/// The `.tbl` file contains a trailing column
pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
let mut schema = SchemaBuilder::from(get_tpch_table_schema(table).fields);
Expand Down Expand Up @@ -134,3 +195,5 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
_ => unimplemented!(),
}
}


Loading

0 comments on commit 9c670fd

Please sign in to comment.