Skip to content

Commit

Permalink
[Native] Refine hdfs error msg (#537)
Browse files Browse the repository at this point in the history
* refine hdfs error msg

Signed-off-by: chenxu <[email protected]>

* fix test case

Signed-off-by: chenxu <[email protected]>

* fix test cases

Signed-off-by: chenxu <[email protected]>

* fix test case

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Sep 9, 2024
1 parent 0ec126c commit f2cae42
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 581 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
throw new CatalogException(
"Valid integer value for hashBucketNum property must be set for table with primary key");
}
} else {
// for non-primary key table, hashBucketNum properties should not be set
if (tableOptions.containsKey(HASH_BUCKET_NUM.key())) {
throw new CatalogException("hashBucketNum property should not be set for table without primary key");
}
}
String tableId = TABLE_ID_PREFIX + UUID.randomUUID();
String qualifiedPath = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
import org.apache.flink.lakesoul.test.AbstractTestBase;
import org.apache.flink.lakesoul.test.LakeSoulTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
Expand All @@ -29,11 +27,9 @@
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static org.apache.flink.lakesoul.LakeSoulOptions.LAKESOUL_TABLE_PATH;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.HASH_BUCKET_NUM;
import static org.apache.flink.table.planner.utils.TableTestUtil.*;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -297,9 +293,6 @@ private void testLakeSoulTableSinkWithParallelismBase(
+ " real_col int"
+ ") WITH ("
+ "'"
+ HASH_BUCKET_NUM.key()
+ "'= '3',"
+ "'"
+ LAKESOUL_TABLE_PATH.key()
+ "'='" +
getTempDirUri("/test_table")
Expand Down Expand Up @@ -338,9 +331,6 @@ private void testLakeSoulTableSinkDeleteWithParallelismBase(
+ " PARTITIONED BY ( part )"
+ " WITH ("
+ "'"
+ HASH_BUCKET_NUM.key()
+ "'= '3',"
+ "'"
+ LAKESOUL_TABLE_PATH.key()
+ "'='" +
getTempDirUri("/test_table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public class LakeSoulSinkFailTest extends AbstractTestBase {
public static Map<String, Tuple3<ResolvedSchema, String, MockTableSource.StopBehavior>> parameters;
static String dropSourceSql = "drop table if exists test_source";
static String createSourceSqlFormat = "create table if not exists test_source %s " +
"with ('connector'='lakesoul', 'path'='/', 'hashBucketNum'='2', " + "'discoveryinterval'='1000'" + ")";
"with ('connector'='lakesoul', 'path'='/', %s " + "'discoveryinterval'='1000'" + ")";
static String dropSinkSql = "drop table if exists test_sink";
static String createSinkSqlFormat = "create table if not exists test_sink %s %s" +
"with ('connector'='lakesoul', 'path'='%s', 'hashBucketNum'='%d')";
"with ('connector'='lakesoul', 'path'='%s' %s)";
private static ArrayList<Integer> indexArr;
private static StreamExecutionEnvironment streamExecEnv;
private static StreamTableEnvironment streamTableEnv;
Expand Down Expand Up @@ -334,11 +334,15 @@ private void testLakeSoulSink(ResolvedSchema resolvedSchema, MockTableSource.Sto


streamTableEnv.executeSql(dropSourceSql);
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema));
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema,
resolvedSchema.getPrimaryKey().isPresent() ?
"'hashBucketNum'='2'," : ""));


streamTableEnv.executeSql(dropSinkSql);
streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path, 2));
streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path,
resolvedSchema.getPrimaryKey().isPresent() ?
", 'hashBucketNum'='2'" : ""));

streamTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
streamTableEnv.getConfig().setLocalTimeZone(TimeZone.getTimeZone("UTC").toZoneId());
Expand Down Expand Up @@ -369,11 +373,15 @@ public void testMockTableSource() throws IOException {

testLakeSoulCatalog.setTestFactory(testFactory);

streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema));
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema,
resolvedSchema.getPrimaryKey().isPresent() ?
"'hashBucketNum'='2'," : ""));


streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, "",
tempFolder.newFolder("testMockTableSource").getAbsolutePath(), 2));
tempFolder.newFolder("testMockTableSource").getAbsolutePath(),
resolvedSchema.getPrimaryKey().isPresent() ?
", 'hashBucketNum'='2'" : ""));

streamTableEnv.executeSql("DROP TABLE IF EXISTS default_catalog.default_database.test_sink");
streamTableEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs)
" score DECIMAL" +
") WITH (" +
" 'format'='lakesoul'," +
" 'hashBucketNum'='2'," +
" 'path'='" + getTempDirUri("/lakeSource/user2") +
"' )";
tEnvs.executeSql("DROP TABLE if exists user_info_2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public void testTimeStampLTZ() throws ExecutionException, InterruptedException {
" modifyTime TIMESTAMP_LTZ " +
") WITH (" +
" 'connector'='lakesoul'," +
" 'hashBucketNum'='2'," +
" 'path'='" + getTempDirUri("/lakeSource/test_timestamp_ltz") +
"' )";

Expand Down
Loading

0 comments on commit f2cae42

Please sign in to comment.