From 2f28639f8d8dcfb7244ea01e132968bbd961b207 Mon Sep 17 00:00:00 2001 From: Xu Chen Date: Thu, 27 Jun 2024 19:44:18 +0800 Subject: [PATCH] [Flink] Fix flink select only partition column (#502) * fix flink select only partition column Signed-off-by: chenxu * fix test case Signed-off-by: chenxu --------- Signed-off-by: chenxu Co-authored-by: chenxu --- .../source/LakeSoulOneSplitRecordsReader.java | 10 +- .../flink/lakesoul/test/AbstractTestBase.java | 4 + .../lakesoul/test/flinkSource/DMLSuite.java | 99 ++++++++++++++++--- .../lakesoul/test/flinkSource/TestUtils.java | 3 +- 4 files changed, 93 insertions(+), 23 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java index 21c843bed..c0397e15a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java @@ -118,11 +118,7 @@ private void initializeReader() throws IOException { reader.addFile(FlinkUtil.makeQualifiedPath(path).toString()); } - List nonPartitionColumns = - this.projectedRowType.getFieldNames().stream().filter(name -> !this.partitionValues.containsKey(name)) - .collect(Collectors.toList()); - - if (!nonPartitionColumns.isEmpty()) { + if (!projectedRowTypeWithPk.getChildren().isEmpty()) { ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf)); // native reader requires pk columns in schema Schema arrowSchema = ArrowUtils.toArrowSchema(projectedRowTypeWithPk); @@ -147,11 +143,11 @@ private void initializeReader() throws IOException { } LOG.info("Initializing reader for split {}, pk={}, partitions={}," + - " non partition cols={}, cdc column={}, filter={}", + " actual read cols={}, cdc column={}, filter={}", split, pkColumns, partitionValues, - nonPartitionColumns, + projectedRowTypeWithPk, cdcColumn, filter); reader.initializeReader(); diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java index f5a2e3636..684ba423b 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java @@ -5,6 +5,8 @@ package org.apache.flink.lakesoul.test; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; @@ -37,6 +39,7 @@ public abstract class AbstractTestBase { static { fsConfig = new org.apache.flink.configuration.Configuration(); + fsConfig.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); if (!LOCAL_FS) { fsConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); fsConfig.set(S3_ENDPOINT, "http://localhost:9002"); @@ -54,6 +57,7 @@ private static Configuration getConfig() { config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); config.set(ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 5); config.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(3)); + config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L); config.setString("state.backend.type", "hashmap"); config.setString("state.checkpoint.dir", getTempDirUri("/flinkchk")); return config; diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java index fef137379..ae1909388 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java @@ -37,7 +37,7 @@ public void testInsertSQL() throws ExecutionException, InterruptedException { } @Test - public void testInsertPartitionTableSQL() throws ExecutionException, InterruptedException { + public void testInsertPkPartitionTableSQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUserWithRange(tEnv); tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75)").await(); @@ -51,6 +51,17 @@ public void testInsertPartitionTableSQL() throws ExecutionException, Interrupted List results1 = CollectionUtil.iteratorToList(flinkTable1.execute().collect()); TestUtils.checkEqualInAnyOrder(results1, new String[]{"+I[2, Alice, 80]", "+I[3, Jack, 75]", "+I[4, Mike, 70]"}); + List + results2 = + CollectionUtil.iteratorToList(tEnv.executeSql("select order_id from user_info_1").collect()); + TestUtils.checkEqualInAnyOrder(results2, + new String[]{"+I[2]", "+I[3]", "+I[4]"}); + List + results3 = + CollectionUtil.iteratorToList( + tEnv.executeSql("select order_id, sum(score) from user_info_1 group by order_id").collect()); + TestUtils.checkEqualInAnyOrder(results3, + new String[]{"+I[2, 80]", "+I[3, 75]", "+I[4, 70]"}); } @@ -73,12 +84,32 @@ public void testUpdateNonPkAndPartitionSQL() throws ExecutionException, Interrup new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 100]", "+I[3, Jack, 100]", "+I[4, Mike, 70]"}); } + @Test + public void testNonPkPartitionedTableSQL() throws ExecutionException, InterruptedException { + TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); + createLakeSoulSourceNonPkWithPartitionTableUser(tEnv); + tEnv.executeSql( + "INSERT INTO user_info_3 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)") + .await(); + List + results1 = + CollectionUtil.iteratorToList(tEnv.executeSql("select order_id from user_info_3").collect()); + TestUtils.checkEqualInAnyOrder(results1, + new String[]{"+I[2]", "+I[3]", "+I[3]", "+I[4]"}); + List + results2 = + CollectionUtil.iteratorToList( + tEnv.executeSql("select order_id, sum(score) from user_info_3 group by order_id").collect()); + TestUtils.checkEqualInAnyOrder(results2, + new String[]{"+I[2, 80]", "+I[3, 170]", "+I[4, 70]"}); + } @Test public void testUpdate() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceNonPkTableUser(tEnv); - tEnv.executeSql("INSERT INTO user_info_2 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)") + tEnv.executeSql( + "INSERT INTO user_info_2 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)") .await(); try { tEnv.executeSql("UPDATE user_info_2 set name = cast('Johny' as varchar) where order_id = 4").await(); @@ -109,14 +140,19 @@ public void testUpdatePkSQLNotSupported() throws ExecutionException, Interrupted TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); TestUtils.checkEqualInAnyOrder(results, - new String[]{"+I[2, Alice, 80]", "+I[3, Amy, 95]", "+I[3, Jack, 75]", "+I[4, John, 70]", "+I[4, Mike, 70]"}); + new String[]{"+I[2, Alice, 80]", + "+I[3, Amy, 95]", + "+I[3, Jack, 75]", + "+I[4, John, 70]", + "+I[4, Mike, 70]"}); } @Test public void testUpdatePartitionSQLNotSupported() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUserWithRange(tEnv); - tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)") + tEnv.executeSql( + "INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Mike', 70)") .await(); try { tEnv.executeSql("UPDATE user_info_1 set order_id = 1 where score = 75").await(); @@ -128,7 +164,11 @@ public void testUpdatePartitionSQLNotSupported() throws ExecutionException, Inte TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); TestUtils.checkEqualInAnyOrder(results, - new String[]{"+I[2, Alice, 80]", "+I[1, Jack, 75]", "+I[3, Jack, 75]", "+I[3, Amy, 95]", "+I[4, Mike, 70]"}); + new String[]{"+I[2, Alice, 80]", + "+I[1, Jack, 75]", + "+I[3, Jack, 75]", + "+I[3, Amy, 95]", + "+I[4, Mike, 70]"}); } @Test @@ -171,7 +211,8 @@ public void testDeleteNonPkAndPartitionSQL() throws ExecutionException, Interrup public void testDeletePkSQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUser(tEnv); - tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)") + .await(); try { tEnv.executeSql("DELETE FROM user_info where name = 'Jack'").await(); } catch (Throwable e) { @@ -187,7 +228,8 @@ public void testDeletePkSQL() throws ExecutionException, InterruptedException { public void testDeleteCDCPkSQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulCDCSourceTableUser(tEnv); - tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + tEnv.executeSql("INSERT INTO user_info VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)") + .await(); try { tEnv.executeSql("DELETE FROM user_info where name = 'Jack'").await(); } catch (Throwable e) { @@ -204,7 +246,9 @@ public void testDeleteCDCPkSQL() throws ExecutionException, InterruptedException public void testDeletePartitionAndPkSQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUserWithRange(tEnv); - tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + tEnv.executeSql( + "INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)") + .await(); try { // LakeSoulTableSource::applyPartition will not be called and LakeSoulTableSource::applyFilters will be called tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3 and name = 'Jack'").await(); @@ -222,7 +266,9 @@ public void testDeletePartitionAndPkSQL() throws ExecutionException, Interrupted public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUserWithRange(tEnv); - tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + tEnv.executeSql( + "INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)") + .await(); try { // LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3").await(); @@ -240,7 +286,9 @@ public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedE public void testDeleteAllPartitionedDataExactlySQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulSourceTableUserWithRange(tEnv); - tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + tEnv.executeSql( + "INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)") + .await(); try { // LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3 and score > 60").await(); @@ -258,7 +306,9 @@ public void testDeleteAllPartitionedDataExactlySQL() throws ExecutionException, public void testDeletePartitionedCdcTable() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); createLakeSoulPartitionedCDCSourceTableUser(tEnv); - tEnv.executeSql("INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)").await(); + tEnv.executeSql( + "INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)") + .await(); try { // LakeSoulTableSource::applyPartition will be called and LakeSoulTableSource::applyFilters will not be called tEnv.executeSql("DELETE FROM user_info_1 where order_id = 3").await(); @@ -272,7 +322,8 @@ public void testDeletePartitionedCdcTable() throws ExecutionException, Interrupt TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"}); } - private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { + private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs) + throws ExecutionException, InterruptedException { String createUserSql = "create table user_info_2 (" + " order_id INT," + " name varchar," + @@ -286,6 +337,21 @@ private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs) throws E tEnvs.executeSql(createUserSql); } + private void createLakeSoulSourceNonPkWithPartitionTableUser(TableEnvironment tEnvs) + throws ExecutionException, InterruptedException { + String createUserSql = "create table user_info_3 (" + + " order_id INT," + + " name varchar," + + " score DECIMAL" + + ") PARTITIONED BY ( order_id )" + + " WITH (" + + " 'format'='lakesoul'," + + " 'path'='" + getTempDirUri("/lakeSource/user_nonpk_partitioned") + + "' )"; + tEnvs.executeSql("DROP TABLE if exists user_info_3"); + tEnvs.executeSql(createUserSql); + } + private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createUserSql = "create table user_info (" + " order_id INT," + @@ -300,7 +366,8 @@ private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws Execut tEnvs.executeSql(createUserSql); } - private void createLakeSoulSourceTableUserWithRange(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { + private void createLakeSoulSourceTableUserWithRange(TableEnvironment tEnvs) + throws ExecutionException, InterruptedException { String createUserSql = "create table user_info_1 (" + " order_id INT," + " name STRING PRIMARY KEY NOT ENFORCED," + @@ -315,7 +382,8 @@ private void createLakeSoulSourceTableUserWithRange(TableEnvironment tEnvs) thro tEnvs.executeSql(createUserSql); } - private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { + private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs) + throws ExecutionException, InterruptedException { String createUserSql = "create table user_info (" + " order_id INT," + " name STRING PRIMARY KEY NOT ENFORCED," + @@ -330,7 +398,8 @@ private void createLakeSoulCDCSourceTableUser(TableEnvironment tEnvs) throws Exe tEnvs.executeSql(createUserSql); } - private void createLakeSoulPartitionedCDCSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { + private void createLakeSoulPartitionedCDCSourceTableUser(TableEnvironment tEnvs) + throws ExecutionException, InterruptedException { String createUserSql = "create table user_info (" + " order_id INT," + " name STRING PRIMARY KEY NOT ENFORCED," + diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java index c66ad2285..3e50c0bf5 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java @@ -39,7 +39,8 @@ public static TableEnvironment createTableEnv(String mode) { TableEnvironment createTableEnv; if (mode.equals(BATCH_TYPE)) { createTableEnv = TableEnvironment.create( - EnvironmentSettings.newInstance().withConfiguration(fsConfig).inBatchMode().build() + EnvironmentSettings.newInstance().withConfiguration(fsConfig) + .inBatchMode().build() ); } else { Configuration config = new Configuration(fsConfig);