diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/SubstraitTest.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/SubstraitTest.java index caa2c41d3..cf0fe0849 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/SubstraitTest.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/SubstraitTest.java @@ -79,6 +79,22 @@ public void logicTest() throws ExecutionException, InterruptedException { "[+I[1, Bob, 1995-10-01, true, 10.01, A, 1.85, 3, 1, 89, 100.11, [1, -81], [18, 67, 112, -105], 1990-01-07T10:10, 1995-10-01T07:10:00Z]]"); } + @Test + public void partitionTest() throws ExecutionException, InterruptedException { + TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE); + createLakeSoulSourceTableWithDateType(createTableEnv); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + sb.append("0"); + } + String testSql = String.format("select * from type_info where zone='%s' or zone='A'", sb.toString()); + StreamTableEnvironment tEnvs = TestUtils.createStreamTableEnv(BATCH_TYPE); + List rows = CollectionUtil.iteratorToList(tEnvs.executeSql(testSql).collect()); + rows.sort(Comparator.comparing(Row::toString)); + assertThat(rows.toString()).isEqualTo( + "[+I[1, Bob, 1995-10-01, true, 10.01, A, 1.85, 3, 1, 89, 100.11, [1, -81], [18, 67, 112, -105], 1990-01-07T10:10, 1995-10-01T07:10:00Z]]"); + } + @Test public void cmpTest() throws ExecutionException, InterruptedException { TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE); @@ -110,7 +126,10 @@ private void createLakeSoulSourceTableWithDateType(TableEnvironment tEnvs) " country VARBINARY, " + " createTime TIMESTAMP, " + " modifyTime TIMESTAMP_LTZ " + - ") WITH (" + + ") " + + "PARTITIONED BY (`zone`,`country`,`money`)" + + + "WITH (" + " 'connector'='lakesoul'," + " 'hashBucketNum'='2'," + " 'path'='" + getTempDirUri("/lakeSource/type") + diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java index b7ca656af..e93cb1236 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/substrait/SubstraitUtil.java @@ -69,8 +69,8 @@ public class SubstraitUtil { private static final LibLakeSoulIO LIB; - private static final Pointer BUFFER1; - private static final Pointer BUFFER2; + private static Pointer BUFFER1; + private static Pointer BUFFER2; private static final NativeIOBase NATIVE_IO_BASE; @@ -204,12 +204,9 @@ public static List applyPartitionFilters(List allP JniWrapper jniWrapper = JniWrapper.newBuilder().addAllPartitionInfo(allPartitionInfo).build(); byte[] jniBytes = jniWrapper.toByteArray(); - BUFFER1.put(0, jniBytes, 0, jniBytes.length); - BUFFER1.putByte(jniBytes.length, (byte) 0); - byte[] filterBytes = partitionFilter.toByteArray(); - BUFFER2.put(0, filterBytes, 0, filterBytes.length); - BUFFER2.putByte(filterBytes.length, (byte) 0); + tryPutBuffer1(jniBytes); + tryPutBuffer2(filterBytes); try { final CompletableFuture filterFuture = new CompletableFuture<>(); @@ -267,6 +264,22 @@ public static List applyPartitionFilters(List allP return resultPartitionInfo; } + private static void tryPutBuffer1(byte[] bytes) { + while (BUFFER1.size() < bytes.length + 1) { + BUFFER1 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(BUFFER1.size() * 2); + } + BUFFER1.put(0, bytes, 0, bytes.length); + BUFFER1.putByte(bytes.length, (byte) 0); + } + + private static void tryPutBuffer2(byte[] bytes) { + while (BUFFER2.size() < bytes.length + 1) { + BUFFER2 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(BUFFER2.size() * 2); + } + BUFFER2.put(0, bytes, 0, bytes.length); + BUFFER2.putByte(bytes.length, (byte) 0); + } + public static FieldReference arrowFieldToSubstraitField(Field field) { return FieldReference diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8ae8cef40..720118198 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1909,6 +1909,7 @@ dependencies = [ "futures", "half", "hdrs", + "hex", "lazy_static", "log", "object_store", diff --git a/rust/lakesoul-io/Cargo.toml b/rust/lakesoul-io/Cargo.toml index 33e35ad90..5c3dd03ca 100644 --- a/rust/lakesoul-io/Cargo.toml +++ b/rust/lakesoul-io/Cargo.toml @@ -43,7 +43,7 @@ log = "0.4.20" anyhow = { workspace = true, features = [] } prost = { workspace = true } env_logger = "0.11" - +hex = "0.4" [features] hdfs = ["dep:hdrs"] @@ -56,8 +56,6 @@ datafusion-substrait = { workspace = true } datafusion-substrait = { workspace = true, features = ["protoc"] } - - [dev-dependencies] tempfile = "3.3.0" comfy-table = "6.0" diff --git a/rust/lakesoul-io/src/constant.rs b/rust/lakesoul-io/src/constant.rs index 34a5ded94..67c8c6f96 100644 --- a/rust/lakesoul-io/src/constant.rs +++ b/rust/lakesoul-io/src/constant.rs @@ -14,6 +14,8 @@ use lazy_static::lazy_static; pub static LAKESOUL_TIMEZONE: &str = "UTC"; pub static LAKESOUL_NULL_STRING: &str = "__L@KE$OUL_NULL__"; pub static LAKESOUL_EMPTY_STRING: &str = "__L@KE$OUL_EMPTY_STRING__"; +pub static LAKESOUL_EQ: &str = "__L@KE$OUL_EQ__"; +pub static LAKESOUL_COMMA: &str = "__L@KE$OUL_COMMA__"; pub static DATE32_FORMAT: &str = "%Y-%m-%d"; pub static FLINK_TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.9f"; diff --git a/rust/lakesoul-io/src/datasource/file_format.rs b/rust/lakesoul-io/src/datasource/file_format.rs index db2cf41ef..0df03323d 100644 --- a/rust/lakesoul-io/src/datasource/file_format.rs +++ b/rust/lakesoul-io/src/datasource/file_format.rs @@ -183,7 +183,7 @@ pub async fn flatten_file_scan_config( let mut builder = SchemaBuilder::new(); // O(nm), n = number of fields, m = number of partition columns for field in file_schema.fields() { - if !partition_schema.field_with_name(field.name()).is_ok() { + if partition_schema.field_with_name(field.name()).is_err() { builder.push(field.clone()); } } diff --git a/rust/lakesoul-io/src/datasource/listing.rs b/rust/lakesoul-io/src/datasource/listing.rs index 3366868d8..d89904b99 100644 --- a/rust/lakesoul-io/src/datasource/listing.rs +++ b/rust/lakesoul-io/src/datasource/listing.rs @@ -74,12 +74,12 @@ impl LakeSoulListingTable { let mut builder = SchemaBuilder::from(target_schema.fields()); // O(n^2), n is the number of fields in file_schema and config.partition_schema for field in file_schema.fields() { - if !target_schema.field_with_name(field.name()).is_ok() { + if target_schema.field_with_name(field.name()).is_err() { builder.try_merge(field)?; } } for field in config.partition_schema().fields() { - if !target_schema.field_with_name(field.name()).is_ok() { + if target_schema.field_with_name(field.name()).is_err() { builder.try_merge(field)?; } } diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index b5325f8cd..f4a70baae 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow::datatypes::UInt32Type; use arrow_array::{RecordBatch, UInt32Array}; +use arrow_buffer::i256; use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit}; use chrono::{DateTime, Duration}; use datafusion::{ @@ -31,9 +32,7 @@ use url::Url; use crate::{ constant::{ - DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, - TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, - TIMESTAMP_SECOND_FORMAT, + DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_COMMA, LAKESOUL_EMPTY_STRING, LAKESOUL_EQ, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT }, filter::parser::Parser, lakesoul_io_config::LakeSoulIOConfig, @@ -123,7 +122,7 @@ pub fn format_scalar_value(v: &ScalarValue) -> String { if s.is_empty() { LAKESOUL_EMPTY_STRING.to_string() } else { - s.clone() + s.replace(',', LAKESOUL_EQ).replace(',', LAKESOUL_COMMA) } } ScalarValue::TimestampSecond(Some(s), _) => { @@ -166,6 +165,14 @@ pub fn format_scalar_value(v: &ScalarValue) -> String { .format(TIMESTAMP_NANOSECOND_FORMAT) ) } + ScalarValue::Decimal128(Some(s), _, _) => format!("{}", s), + ScalarValue::Decimal256(Some(s), _, _) => format!("{}", s), + ScalarValue::Binary(e) + | ScalarValue::FixedSizeBinary(_, e) + | ScalarValue::LargeBinary(e) => match e { + Some(bytes) => hex::encode(bytes), + None => LAKESOUL_NULL_STRING.to_string(), + } other => other.to_string(), } } @@ -180,7 +187,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result if val.eq(LAKESOUL_EMPTY_STRING) { Ok(ScalarValue::Utf8(Some("".to_string()))) } else { - Ok(ScalarValue::Utf8(Some(val.to_string()))) + Ok(ScalarValue::Utf8(Some(val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ",")))) } } DataType::Timestamp(unit, timezone) => match unit { @@ -238,6 +245,11 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result Ok(ScalarValue::TimestampNanosecond(Some(nanosecs), timezone.clone())) } }, + DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(Some(val.parse::().unwrap()), *p, *s)), + DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(Some(i256::from_string(val).unwrap()), *p, *s)), + DataType::Binary=> Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))), + DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, Some(hex::decode(val).unwrap()))), + DataType::LargeBinary => Ok(ScalarValue::LargeBinary(Some(hex::decode(val).unwrap()))), _ => ScalarValue::try_from_string(val.to_string(), data_type), } } diff --git a/rust/lakesoul-io/src/transform.rs b/rust/lakesoul-io/src/transform.rs index ae153ca67..60bc8d526 100644 --- a/rust/lakesoul-io/src/transform.rs +++ b/rust/lakesoul-io/src/transform.rs @@ -19,7 +19,7 @@ use crate::constant::{ ARROW_CAST_OPTIONS, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT, }; -use crate::helpers::{date_str_to_epoch_days, timestamp_str_to_unix_time}; +use crate::helpers::{date_str_to_epoch_days, into_scalar_value, timestamp_str_to_unix_time}; /// adjust time zone to UTC pub fn uniform_field(orig_field: &FieldRef) -> FieldRef { @@ -315,12 +315,18 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize) .map_err(|e| External(Box::new(e)))?; num_rows ])), - _ => { - println!( - "make_default_array() datatype not match, datatype={:?}, value={:?}", - datatype, value - ); - new_null_array(datatype, num_rows) + data_type => { + match into_scalar_value(value, data_type) { + Ok(scalar) => scalar.to_array_of_size(num_rows)?, + Err(_) => { + println!( + "make_default_array() datatype not match, datatype={:?}, value={:?}", + datatype, value + ); + new_null_array(datatype, num_rows) + } + } + } }) }