Skip to content

Commit

Permalink
[Flink]fix partition filter (#535)
Browse files Browse the repository at this point in the history
* fix partition filter

Signed-off-by: zenghua <[email protected]>

* fix clippy

Signed-off-by: zenghua <[email protected]>

* cleanup code

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Sep 2, 2024
1 parent a824ec1 commit 0d2b823
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ public void logicTest() throws ExecutionException, InterruptedException {
"[+I[1, Bob, 1995-10-01, true, 10.01, A, 1.85, 3, 1, 89, 100.11, [1, -81], [18, 67, 112, -105], 1990-01-07T10:10, 1995-10-01T07:10:00Z]]");
}

@Test
public void partitionTest() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableWithDateType(createTableEnv);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 65536; i++) {
sb.append("0");
}
String testSql = String.format("select * from type_info where zone='%s' or zone='A'", sb.toString());
StreamTableEnvironment tEnvs = TestUtils.createStreamTableEnv(BATCH_TYPE);
List<Row> rows = CollectionUtil.iteratorToList(tEnvs.executeSql(testSql).collect());
rows.sort(Comparator.comparing(Row::toString));
assertThat(rows.toString()).isEqualTo(
"[+I[1, Bob, 1995-10-01, true, 10.01, A, 1.85, 3, 1, 89, 100.11, [1, -81], [18, 67, 112, -105], 1990-01-07T10:10, 1995-10-01T07:10:00Z]]");
}

@Test
public void cmpTest() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
Expand Down Expand Up @@ -110,7 +126,10 @@ private void createLakeSoulSourceTableWithDateType(TableEnvironment tEnvs)
" country VARBINARY, " +
" createTime TIMESTAMP, " +
" modifyTime TIMESTAMP_LTZ " +
") WITH (" +
") " +
"PARTITIONED BY (`zone`,`country`,`money`)" +

"WITH (" +
" 'connector'='lakesoul'," +
" 'hashBucketNum'='2'," +
" 'path'='" + getTempDirUri("/lakeSource/type") +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public class SubstraitUtil {

private static final LibLakeSoulIO LIB;

private static final Pointer BUFFER1;
private static final Pointer BUFFER2;
private static Pointer BUFFER1;
private static Pointer BUFFER2;

private static final NativeIOBase NATIVE_IO_BASE;

Expand Down Expand Up @@ -204,12 +204,9 @@ public static List<PartitionInfo> applyPartitionFilters(List<PartitionInfo> allP
JniWrapper jniWrapper = JniWrapper.newBuilder().addAllPartitionInfo(allPartitionInfo).build();

byte[] jniBytes = jniWrapper.toByteArray();
BUFFER1.put(0, jniBytes, 0, jniBytes.length);
BUFFER1.putByte(jniBytes.length, (byte) 0);

byte[] filterBytes = partitionFilter.toByteArray();
BUFFER2.put(0, filterBytes, 0, filterBytes.length);
BUFFER2.putByte(filterBytes.length, (byte) 0);
tryPutBuffer1(jniBytes);
tryPutBuffer2(filterBytes);

try {
final CompletableFuture<Integer> filterFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -267,6 +264,22 @@ public static List<PartitionInfo> applyPartitionFilters(List<PartitionInfo> allP
return resultPartitionInfo;
}

private static void tryPutBuffer1(byte[] bytes) {
while (BUFFER1.size() < bytes.length + 1) {
BUFFER1 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(BUFFER1.size() * 2);
}
BUFFER1.put(0, bytes, 0, bytes.length);
BUFFER1.putByte(bytes.length, (byte) 0);
}

private static void tryPutBuffer2(byte[] bytes) {
while (BUFFER2.size() < bytes.length + 1) {
BUFFER2 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(BUFFER2.size() * 2);
}
BUFFER2.put(0, bytes, 0, bytes.length);
BUFFER2.putByte(bytes.length, (byte) 0);
}


public static FieldReference arrowFieldToSubstraitField(Field field) {
return FieldReference
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ log = "0.4.20"
anyhow = { workspace = true, features = [] }
prost = { workspace = true }
env_logger = "0.11"

hex = "0.4"

[features]
hdfs = ["dep:hdrs"]
Expand All @@ -56,8 +56,6 @@ datafusion-substrait = { workspace = true }
datafusion-substrait = { workspace = true, features = ["protoc"] }




[dev-dependencies]
tempfile = "3.3.0"
comfy-table = "6.0"
Expand Down
2 changes: 2 additions & 0 deletions rust/lakesoul-io/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use lazy_static::lazy_static;
pub static LAKESOUL_TIMEZONE: &str = "UTC";
pub static LAKESOUL_NULL_STRING: &str = "__L@KE$OUL_NULL__";
pub static LAKESOUL_EMPTY_STRING: &str = "__L@KE$OUL_EMPTY_STRING__";
pub static LAKESOUL_EQ: &str = "__L@KE$OUL_EQ__";
pub static LAKESOUL_COMMA: &str = "__L@KE$OUL_COMMA__";

pub static DATE32_FORMAT: &str = "%Y-%m-%d";
pub static FLINK_TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.9f";
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-io/src/datasource/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub async fn flatten_file_scan_config(
let mut builder = SchemaBuilder::new();
// O(nm), n = number of fields, m = number of partition columns
for field in file_schema.fields() {
if !partition_schema.field_with_name(field.name()).is_ok() {
if partition_schema.field_with_name(field.name()).is_err() {
builder.push(field.clone());
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-io/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ impl LakeSoulListingTable {
let mut builder = SchemaBuilder::from(target_schema.fields());
// O(n^2), n is the number of fields in file_schema and config.partition_schema
for field in file_schema.fields() {
if !target_schema.field_with_name(field.name()).is_ok() {
if target_schema.field_with_name(field.name()).is_err() {
builder.try_merge(field)?;
}
}
for field in config.partition_schema().fields() {
if !target_schema.field_with_name(field.name()).is_ok() {
if target_schema.field_with_name(field.name()).is_err() {
builder.try_merge(field)?;
}
}
Expand Down
22 changes: 17 additions & 5 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::UInt32Type;
use arrow_array::{RecordBatch, UInt32Array};
use arrow_buffer::i256;
use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit};
use chrono::{DateTime, Duration};
use datafusion::{
Expand All @@ -31,9 +32,7 @@ use url::Url;

use crate::{
constant::{
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING,
TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT,
TIMESTAMP_SECOND_FORMAT,
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_COMMA, LAKESOUL_EMPTY_STRING, LAKESOUL_EQ, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT
},
filter::parser::Parser,
lakesoul_io_config::LakeSoulIOConfig,
Expand Down Expand Up @@ -123,7 +122,7 @@ pub fn format_scalar_value(v: &ScalarValue) -> String {
if s.is_empty() {
LAKESOUL_EMPTY_STRING.to_string()
} else {
s.clone()
s.replace(',', LAKESOUL_EQ).replace(',', LAKESOUL_COMMA)
}
}
ScalarValue::TimestampSecond(Some(s), _) => {
Expand Down Expand Up @@ -166,6 +165,14 @@ pub fn format_scalar_value(v: &ScalarValue) -> String {
.format(TIMESTAMP_NANOSECOND_FORMAT)
)
}
ScalarValue::Decimal128(Some(s), _, _) => format!("{}", s),
ScalarValue::Decimal256(Some(s), _, _) => format!("{}", s),
ScalarValue::Binary(e)
| ScalarValue::FixedSizeBinary(_, e)
| ScalarValue::LargeBinary(e) => match e {
Some(bytes) => hex::encode(bytes),
None => LAKESOUL_NULL_STRING.to_string(),
}
other => other.to_string(),
}
}
Expand All @@ -180,7 +187,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
if val.eq(LAKESOUL_EMPTY_STRING) {
Ok(ScalarValue::Utf8(Some("".to_string())))
} else {
Ok(ScalarValue::Utf8(Some(val.to_string())))
Ok(ScalarValue::Utf8(Some(val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ","))))
}
}
DataType::Timestamp(unit, timezone) => match unit {
Expand Down Expand Up @@ -238,6 +245,11 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
Ok(ScalarValue::TimestampNanosecond(Some(nanosecs), timezone.clone()))
}
},
DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(Some(val.parse::<i128>().unwrap()), *p, *s)),
DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(Some(i256::from_string(val).unwrap()), *p, *s)),
DataType::Binary=> Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))),
DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, Some(hex::decode(val).unwrap()))),
DataType::LargeBinary => Ok(ScalarValue::LargeBinary(Some(hex::decode(val).unwrap()))),
_ => ScalarValue::try_from_string(val.to_string(), data_type),
}
}
Expand Down
20 changes: 13 additions & 7 deletions rust/lakesoul-io/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::constant::{
ARROW_CAST_OPTIONS, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING,
TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT,
};
use crate::helpers::{date_str_to_epoch_days, timestamp_str_to_unix_time};
use crate::helpers::{date_str_to_epoch_days, into_scalar_value, timestamp_str_to_unix_time};

/// adjust time zone to UTC
pub fn uniform_field(orig_field: &FieldRef) -> FieldRef {
Expand Down Expand Up @@ -315,12 +315,18 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize)
.map_err(|e| External(Box::new(e)))?;
num_rows
])),
_ => {
println!(
"make_default_array() datatype not match, datatype={:?}, value={:?}",
datatype, value
);
new_null_array(datatype, num_rows)
data_type => {
match into_scalar_value(value, data_type) {
Ok(scalar) => scalar.to_array_of_size(num_rows)?,
Err(_) => {
println!(
"make_default_array() datatype not match, datatype={:?}, value={:?}",
datatype, value
);
new_null_array(datatype, num_rows)
}
}

}
})
}

0 comments on commit 0d2b823

Please sign in to comment.