Skip to content

Commit

Permalink
[Flink] Fix flink select only partition column (#502)
Browse files Browse the repository at this point in the history
* fix flink select only partition column

Signed-off-by: chenxu <[email protected]>

* fix test case

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Jun 27, 2024
1 parent dd4f7c2 commit 2f28639
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,7 @@ private void initializeReader() throws IOException {
reader.addFile(FlinkUtil.makeQualifiedPath(path).toString());
}

List<String> nonPartitionColumns =
this.projectedRowType.getFieldNames().stream().filter(name -> !this.partitionValues.containsKey(name))
.collect(Collectors.toList());

if (!nonPartitionColumns.isEmpty()) {
if (!projectedRowTypeWithPk.getChildren().isEmpty()) {
ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf));
// native reader requires pk columns in schema
Schema arrowSchema = ArrowUtils.toArrowSchema(projectedRowTypeWithPk);
Expand All @@ -147,11 +143,11 @@ private void initializeReader() throws IOException {
}

LOG.info("Initializing reader for split {}, pk={}, partitions={}," +
" non partition cols={}, cdc column={}, filter={}",
" actual read cols={}, cdc column={}, filter={}",
split,
pkColumns,
partitionValues,
nonPartitionColumns,
projectedRowTypeWithPk,
cdcColumn,
filter);
reader.initializeReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package org.apache.flink.lakesoul.test;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
Expand Down Expand Up @@ -37,6 +39,7 @@ public abstract class AbstractTestBase {

static {
fsConfig = new org.apache.flink.configuration.Configuration();
fsConfig.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L);
if (!LOCAL_FS) {
fsConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
fsConfig.set(S3_ENDPOINT, "http://localhost:9002");
Expand All @@ -54,6 +57,7 @@ private static Configuration getConfig() {
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
config.set(ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 5);
config.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(3));
config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L);
config.setString("state.backend.type", "hashmap");
config.setString("state.checkpoint.dir", getTempDirUri("/flinkchk"));
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testInsertSQL() throws ExecutionException, InterruptedException {
}

@Test
public void testInsertPartitionTableSQL() throws ExecutionException, InterruptedException {
public void testInsertPkPartitionTableSQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUserWithRange(tEnv);
tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75)").await();
Expand All @@ -51,6 +51,17 @@ public void testInsertPartitionTableSQL() throws ExecutionException, Interrupted
List<Row> results1 = CollectionUtil.iteratorToList(flinkTable1.execute().collect());
TestUtils.checkEqualInAnyOrder(results1,
new String[]{"+I[2, Alice, 80]", "+I[3, Jack, 75]", "+I[4, Mike, 70]"});
List<Row>
results2 =
CollectionUtil.iteratorToList(tEnv.executeSql("select order_id from user_info_1").collect());
TestUtils.checkEqualInAnyOrder(results2,
new String[]{"+I[2]", "+I[3]", "+I[4]"});
List<Row>
results3 =
CollectionUtil.iteratorToList(
tEnv.executeSql("select order_id, sum(score) from user_info_1 group by order_id").collect());
TestUtils.checkEqualInAnyOrder(results3,
new String[]{"+I[2, 80]", "+I[3, 75]", "+I[4, 70]"});
}


Expand All @@ -73,12 +84,32 @@ public void testUpdateNonPkAndPartitionSQL() throws ExecutionException, Interrup
new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 100]", "+I[3, Jack, 100]", "+I[4, Mike, 70]"});
}

@Test
public void testNonPkPartitionedTableSQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceNonPkWithPartitionTableUser(tEnv);
tEnv.executeSql(
"INSERT INTO user_info_3 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)")
.await();
List<Row>
results1 =
CollectionUtil.iteratorToList(tEnv.executeSql("select order_id from user_info_3").collect());
TestUtils.checkEqualInAnyOrder(results1,
new String[]{"+I[2]", "+I[3]", "+I[3]", "+I[4]"});
List<Row>
results2 =
CollectionUtil.iteratorToList(
tEnv.executeSql("select order_id, sum(score) from user_info_3 group by order_id").collect());
TestUtils.checkEqualInAnyOrder(results2,
new String[]{"+I[2, 80]", "+I[3, 170]", "+I[4, 70]"});
}

@Test
public void testUpdate() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceNonPkTableUser(tEnv);
tEnv.executeSql("INSERT INTO user_info_2 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)")
tEnv.executeSql(
"INSERT INTO user_info_2 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)")
.await();
try {
tEnv.executeSql("UPDATE user_info_2 set name = cast('Johny' as varchar) where order_id = 4").await();
Expand Down Expand Up @@ -109,14 +140,19 @@ public void testUpdatePkSQLNotSupported() throws ExecutionException, Interrupted
TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect);
List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
TestUtils.checkEqualInAnyOrder(results,
new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]", "+I[3, Jack, 75]", "+I[4, John, 70]", "+I[4, Mike, 70]"});
new String[]{"+I[2, Alice, 80]",
"+I[3, Amy, 95]",
"+I[3, Jack, 75]",
"+I[4, John, 70]",
"+I[4, Mike, 70]"});
}

@Test
public void testUpdatePartitionSQLNotSupported() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUserWithRange(tEnv);
tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)")
tEnv.executeSql(
"INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)")
.await();
try {
tEnv.executeSql("UPDATE user_info_1 set order_id = 1 where score = 75").await();
Expand All @@ -128,7 +164,11 @@ public void testUpdatePartitionSQLNotSupported() throws ExecutionException, Inte
TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect);
List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
TestUtils.checkEqualInAnyOrder(results,
new String[]{"+I[2, Alice, 80]", "+I[1, Jack, 75]", "+I[3, Jack, 75]", "+I[3, Amy, 95]", "+I[4, Mike, 70]"});
new String[]{"+I[2, Alice, 80]",
"+I[1, Jack, 75]",
"+I[3, Jack, 75]",
"+I[3, Amy, 95]",
"+I[4, Mike, 70]"});
}

@Test
Expand Down Expand Up @@ -171,7 +211,8 @@ public void testDeleteNonPkAndPartitionSQL() throws ExecutionException, Interrup
public void testDeletePkSQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUser(tEnv);
tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await();
tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)")
.await();
try {
tEnv.executeSql("DELETE FROM user_info where name = 'Jack'").await();
} catch (Throwable e) {
Expand All @@ -187,7 +228,8 @@ public void testDeletePkSQL() throws ExecutionException, InterruptedException {
public void testDeleteCDCPkSQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulCDCSourceTableUser(tEnv);
tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await();
tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)")
.await();
try {
tEnv.executeSql("DELETE FROM user_info where name = 'Jack'").await();
} catch (Throwable e) {
Expand All @@ -204,7 +246,9 @@ public void testDeleteCDCPkSQL() throws ExecutionException, InterruptedException
public void testDeletePartitionAndPkSQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUserWithRange(tEnv);
tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await();
tEnv.executeSql(
"INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)")
.await();
try {
// LakeSoulTableSource::applyPartition will not be called and LakeSoulTableSource::applyFilters will be called
tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3 and name = 'Jack'").await();
Expand All @@ -222,7 +266,9 @@ public void testDeletePartitionAndPkSQL() throws ExecutionException, Interrupted
public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUserWithRange(tEnv);
tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await();
tEnv.executeSql(
"INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)")
.await();
try {
// LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called
tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3").await();
Expand All @@ -240,7 +286,9 @@ public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedE
public void testDeleteAllPartitionedDataExactlySQL() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUserWithRange(tEnv);
tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await();
tEnv.executeSql(
"INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)")
.await();
try {
// LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called
tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3 and score > 60").await();
Expand All @@ -258,7 +306,9 @@ public void testDeleteAllPartitionedDataExactlySQL() throws ExecutionException,
public void testDeletePartitionedCdcTable() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulPartitionedCDCSourceTableUser(tEnv);
tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await();
tEnv.executeSql(
"INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)")
.await();
try {
// LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called
tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3").await();
Expand All @@ -272,7 +322,8 @@ public void testDeletePartitionedCdcTable() throws ExecutionException, Interrupt
TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"});
}

private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info_2 (" +
" order_id INT," +
" name varchar," +
Expand All @@ -286,6 +337,21 @@ private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs) throws E
tEnvs.executeSql(createUserSql);
}

private void createLakeSoulSourceNonPkWithPartitionTableUser(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info_3 (" +
" order_id INT," +
" name varchar," +
" score DECIMAL" +
") PARTITIONED BY ( order_id )" +
" WITH (" +
" 'format'='lakesoul'," +
" 'path'='" + getTempDirUri("/lakeSource/user_nonpk_partitioned") +
"' )";
tEnvs.executeSql("DROP TABLE if exists user_info_3");
tEnvs.executeSql(createUserSql);
}

private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info (" +
" order_id INT," +
Expand All @@ -300,7 +366,8 @@ private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws Execut
tEnvs.executeSql(createUserSql);
}

private void createLakeSoulSourceTableUserWithRange(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
private void createLakeSoulSourceTableUserWithRange(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info_1 (" +
" order_id INT," +
" name STRING PRIMARY KEY NOT ENFORCED," +
Expand All @@ -315,7 +382,8 @@ private void createLakeSoulSourceTableUserWithRange(TableEnvironment tEnvs) thro
tEnvs.executeSql(createUserSql);
}

private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info (" +
" order_id INT," +
" name STRING PRIMARY KEY NOT ENFORCED," +
Expand All @@ -330,7 +398,8 @@ private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs) throws Exe
tEnvs.executeSql(createUserSql);
}

private void createLakeSoulPartitionedCDCSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
private void createLakeSoulPartitionedCDCSourceTableUser(TableEnvironment tEnvs)
throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info (" +
" order_id INT," +
" name STRING PRIMARY KEY NOT ENFORCED," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public static TableEnvironment createTableEnv(String mode) {
TableEnvironment createTableEnv;
if (mode.equals(BATCH_TYPE)) {
createTableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().withConfiguration(fsConfig).inBatchMode().build()
EnvironmentSettings.newInstance().withConfiguration(fsConfig)
.inBatchMode().build()
);
} else {
Configuration config = new Configuration(fsConfig);
Expand Down

0 comments on commit 2f28639

Please sign in to comment.