Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Native] Refine hdfs error msg #537

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
throw new CatalogException(
"Valid integer value for hashBucketNum property must be set for table with primary key");
}
} else {
// for non-primary key table, hashBucketNum properties should not be set
if (tableOptions.containsKey(HASH_BUCKET_NUM.key())) {
throw new CatalogException("hashBucketNum property should not be set for table without primary key");
}
}
String tableId = TABLE_ID_PREFIX + UUID.randomUUID();
String qualifiedPath = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
import org.apache.flink.lakesoul.test.AbstractTestBase;
import org.apache.flink.lakesoul.test.LakeSoulTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
Expand All @@ -29,11 +27,9 @@
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static org.apache.flink.lakesoul.LakeSoulOptions.LAKESOUL_TABLE_PATH;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.HASH_BUCKET_NUM;
import static org.apache.flink.table.planner.utils.TableTestUtil.*;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -297,9 +293,6 @@ private void testLakeSoulTableSinkWithParallelismBase(
+ " real_col int"
+ ") WITH ("
+ "'"
+ HASH_BUCKET_NUM.key()
+ "'= '3',"
+ "'"
+ LAKESOUL_TABLE_PATH.key()
+ "'='" +
getTempDirUri("/test_table")
Expand Down Expand Up @@ -338,9 +331,6 @@ private void testLakeSoulTableSinkDeleteWithParallelismBase(
+ " PARTITIONED BY ( part )"
+ " WITH ("
+ "'"
+ HASH_BUCKET_NUM.key()
+ "'= '3',"
+ "'"
+ LAKESOUL_TABLE_PATH.key()
+ "'='" +
getTempDirUri("/test_table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public class LakeSoulSinkFailTest extends AbstractTestBase {
public static Map<String, Tuple3<ResolvedSchema, String, MockTableSource.StopBehavior>> parameters;
static String dropSourceSql = "drop table if exists test_source";
static String createSourceSqlFormat = "create table if not exists test_source %s " +
"with ('connector'='lakesoul', 'path'='/', 'hashBucketNum'='2', " + "'discoveryinterval'='1000'" + ")";
"with ('connector'='lakesoul', 'path'='/', %s " + "'discoveryinterval'='1000'" + ")";
static String dropSinkSql = "drop table if exists test_sink";
static String createSinkSqlFormat = "create table if not exists test_sink %s %s" +
"with ('connector'='lakesoul', 'path'='%s', 'hashBucketNum'='%d')";
"with ('connector'='lakesoul', 'path'='%s' %s)";
private static ArrayList<Integer> indexArr;
private static StreamExecutionEnvironment streamExecEnv;
private static StreamTableEnvironment streamTableEnv;
Expand Down Expand Up @@ -334,11 +334,15 @@ private void testLakeSoulSink(ResolvedSchema resolvedSchema, MockTableSource.Sto


streamTableEnv.executeSql(dropSourceSql);
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema));
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema,
resolvedSchema.getPrimaryKey().isPresent() ?
"'hashBucketNum'='2'," : ""));


streamTableEnv.executeSql(dropSinkSql);
streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path, 2));
streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, partitionBy, path,
resolvedSchema.getPrimaryKey().isPresent() ?
", 'hashBucketNum'='2'" : ""));

streamTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
streamTableEnv.getConfig().setLocalTimeZone(TimeZone.getTimeZone("UTC").toZoneId());
Expand Down Expand Up @@ -369,11 +373,15 @@ public void testMockTableSource() throws IOException {

testLakeSoulCatalog.setTestFactory(testFactory);

streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema));
streamTableEnv.executeSql(String.format(createSourceSqlFormat, resolvedSchema,
resolvedSchema.getPrimaryKey().isPresent() ?
"'hashBucketNum'='2'," : ""));


streamTableEnv.executeSql(String.format(createSinkSqlFormat, resolvedSchema, "",
tempFolder.newFolder("testMockTableSource").getAbsolutePath(), 2));
tempFolder.newFolder("testMockTableSource").getAbsolutePath(),
resolvedSchema.getPrimaryKey().isPresent() ?
", 'hashBucketNum'='2'" : ""));

streamTableEnv.executeSql("DROP TABLE IF EXISTS default_catalog.default_database.test_sink");
streamTableEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ private void createLakeSoulSourceNonPkTableUser(TableEnvironment tEnvs)
" score DECIMAL" +
") WITH (" +
" 'format'='lakesoul'," +
" 'hashBucketNum'='2'," +
" 'path'='" + getTempDirUri("/lakeSource/user2") +
"' )";
tEnvs.executeSql("DROP TABLE if exists user_info_2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public void testTimeStampLTZ() throws ExecutionException, InterruptedException {
" modifyTime TIMESTAMP_LTZ " +
") WITH (" +
" 'connector'='lakesoul'," +
" 'hashBucketNum'='2'," +
" 'path'='" + getTempDirUri("/lakeSource/test_timestamp_ltz") +
"' )";

Expand Down
Loading
Loading