From d6dfcbc44b8ea28cdb0356489623c2d058712d12 Mon Sep 17 00:00:00 2001 From: zenghua Date: Mon, 7 Aug 2023 15:47:13 +0800 Subject: [PATCH] fix after rebase main Signed-off-by: zenghua --- .github/workflows/flink-cdc-test.yml | 10 ++++ .github/workflows/maven-test.yml | 18 ++++--- .github/workflows/native-build.yml | 6 +-- .github/workflows/rust-clippy.yml | 4 ++ .../dmetasoul/lakesoul/meta/DBManager.java | 20 +++---- .../com/dmetasoul/lakesoul/meta/DBUtil.java | 24 ++++----- .../lakesoul/meta/dao/NamespaceDao.java | 7 ++- .../lakesoul/meta/jnr/JnrLoader.java | 2 - .../meta/jnr/NativeMetadataJavaClient.java | 25 +++++++-- .../lakesoul/meta/jnr/NativeUtils.java | 4 ++ .../lakesoul/meta/LakeSoulRBACTest.java | 52 ++++++++++--------- lakesoul-spark/scriptFile.scala | 1 - .../sql/lakesoul/SnapshotManagement.scala | 2 - .../lakesoul/meta/RBACOperatinSuite.scala | 19 +++---- native-metadata/Cargo.lock | 19 +++++++ .../lakesoul-metadata-c/src/lib.rs | 10 ---- native-metadata/lakesoul-metadata/src/lib.rs | 18 +++---- native-metadata/lakesoul-metadata/src/main.rs | 35 ------------- native-metadata/proto/Cargo.toml | 5 +- native-metadata/proto/build.rs | 4 ++ native-metadata/proto/src/lib.rs | 4 ++ script/benchmark/work-dir/lakesoul.properties | 1 - 22 files changed, 154 insertions(+), 136 deletions(-) delete mode 100644 lakesoul-spark/scriptFile.scala delete mode 100644 native-metadata/lakesoul-metadata/src/main.rs diff --git a/.github/workflows/flink-cdc-test.yml b/.github/workflows/flink-cdc-test.yml index fce2dfea6..7f05d69f0 100644 --- a/.github/workflows/flink-cdc-test.yml +++ b/.github/workflows/flink-cdc-test.yml @@ -61,6 +61,9 @@ jobs: - uses: Swatinem/rust-cache@v2 with: workspaces: "./native-io -> target" + - uses: Swatinem/rust-cache@v2 + with: + workspaces: "./native-metadata -> target" - name: Pull images run: | docker pull -q bitnami/spark:3.3.1 @@ -69,10 +72,17 @@ jobs: use-cross: true command: build args: '--manifest-path native-io/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features' + - uses: actions-rs/cargo@v1 + with: + use-cross: true + command: build + args: '--manifest-path native-metadata/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features' - name: Build with Maven run: | mkdir -p native-io/target/release cp native-io/target/x86_64-unknown-linux-gnu/release/liblakesoul_io_c.so native-io/target/release + mkdir -p native-metadata/target/release + cp native-metadata/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so native-metadata/target/release MAVEN_OPTS="-Xmx4000m" mvn -q -B package -f pom.xml -Pcross-build -DskipTests - name: Get jar names run: | diff --git a/.github/workflows/maven-test.yml b/.github/workflows/maven-test.yml index 41733eabc..b70089324 100644 --- a/.github/workflows/maven-test.yml +++ b/.github/workflows/maven-test.yml @@ -41,16 +41,12 @@ jobs: profile: minimal toolchain: nightly-2023-05-20 default: true - - name: Install Protoc - uses: arduino/setup-protoc@v2 - with: - version: "23.x" - uses: Swatinem/rust-cache@v2 with: workspaces: "./native-metadata -> target" - uses: actions-rs/cargo@v1 with: - # use-cross: true + use-cross: true command: build args: '--manifest-path native-metadata/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features' - uses: actions/upload-artifact@master @@ -222,7 +218,7 @@ jobs: spark-test-rbac: runs-on: ubuntu-latest - needs: [ build-linux-x86_64 ] + needs: [ build-native-io-linux-x86_64, build-native-metadata-linux-x86_64 ] services: # Label used to access the service container @@ -265,6 +261,10 @@ jobs: uses: arduino/setup-protoc@v2 with: version: "23.x" + - uses: actions/download-artifact@v3 + with: + name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test + path: ./native-metadata/target/release/ - uses: actions/download-artifact@v3 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test @@ -355,7 +355,7 @@ jobs: flink-test-rbac: runs-on: ubuntu-latest - needs: [ build-linux-x86_64 ] + needs: [ build-native-io-linux-x86_64, build-native-metadata-linux-x86_64 ] services: # Label used to access the service container @@ -406,6 +406,10 @@ jobs: uses: arduino/setup-protoc@v2 with: version: "23.x" + - uses: actions/download-artifact@v3 + with: + name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test + path: ./native-metadata/target/release/ - uses: actions/download-artifact@v3 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test diff --git a/.github/workflows/native-build.yml b/.github/workflows/native-build.yml index cb0376e8f..fd2f75245 100644 --- a/.github/workflows/native-build.yml +++ b/.github/workflows/native-build.yml @@ -117,10 +117,6 @@ jobs: java-version: '8' distribution: 'temurin' cache: maven - - name: Install Protoc - uses: arduino/setup-protoc@v2 - with: - version: "23.x" - uses: actions-rs/toolchain@v1 with: profile: minimal @@ -131,7 +127,7 @@ jobs: workspaces: "./native-metadata -> target" - uses: actions-rs/cargo@v1 with: - # use-cross: true + use-cross: true command: build args: '--manifest-path native-metadata/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features' - uses: actions/upload-artifact@master diff --git a/.github/workflows/rust-clippy.yml b/.github/workflows/rust-clippy.yml index 4c56dea18..9bf143754 100644 --- a/.github/workflows/rust-clippy.yml +++ b/.github/workflows/rust-clippy.yml @@ -33,6 +33,10 @@ jobs: toolchain: nightly-2023-05-20 components: clippy default: true + - name: Install Protoc + uses: arduino/setup-protoc@v2 + with: + version: "23.x" - uses: Swatinem/rust-cache@v2 with: workspaces: "./native-io -> target" diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index 5820803d0..94e8feaf5 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -112,7 +112,6 @@ public void createNewTable(String tableId, String namespace, String tableName, S String domain = getNameSpaceDomain(namespace); - System.out.println("[debug]createNewTable " + tableInfo); if (StringUtils.isNotBlank(tableName)) { tableNameIdDao.insert(TableNameIdDao.newTableNameId(tableName, tableId, namespace, domain)); } @@ -706,7 +705,7 @@ private PartitionInfo getOrCreateCurPartitionInfo(Map cur .setVersion(-1) .setDomain(getTableDomain(tableId)) .build(); - }else{ + } else { curPartitionInfo = curPartitionInfo.toBuilder() .setDomain(getTableDomain(tableId)) .build(); @@ -773,23 +772,23 @@ public void rollbackPartitionByVersion(String tableId, String partitionDesc, int .build()); } - private String getTableDomain(String tableId){ - if(!AuthZEnforcer.authZEnabled()){ + private String getTableDomain(String tableId) { + if (!AuthZEnforcer.authZEnabled()) { return "public"; } TableInfo tableInfo = this.getTableInfoByTableId(tableId); - if(tableInfo == null){ + if (tableInfo == null) { throw new IllegalStateException("target tableinfo does not exists"); } return getNameSpaceDomain(tableInfo.getTableNamespace()); } - private String getNameSpaceDomain(String namespace){ - if(!AuthZEnforcer.authZEnabled()){ + private String getNameSpaceDomain(String namespace) { + if (!AuthZEnforcer.authZEnabled()) { return "public"; } Namespace namespaceInfo = getNamespaceByNamespace(namespace); - if(namespaceInfo == null) { + if (namespaceInfo == null) { throw new IllegalStateException("target namespace does not exists"); } return namespaceInfo.getDomain(); @@ -875,10 +874,13 @@ public void deleteNamespace(String namespace) { public void cleanMeta() { if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) { NativeMetadataJavaClient.cleanMeta(); + if (!AuthZEnforcer.authZEnabled()) { + namespaceDao.insert(NamespaceDao.DEFAULT_NAMESPACE); + } return; } namespaceDao.clean(); - if(!AuthZEnforcer.authZEnabled()){ + if (!AuthZEnforcer.authZEnabled()) { namespaceDao.insert(NamespaceDao.DEFAULT_NAMESPACE); } dataCommitInfoDao.clean(); diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBUtil.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBUtil.java index 0277c9a96..2098e935e 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBUtil.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBUtil.java @@ -14,6 +14,8 @@ import org.apache.commons.lang3.StringUtils; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; @@ -30,17 +32,11 @@ public class DBUtil { private static final String urlDefault = "jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified"; private static final String usernameDefault = "lakesoul_test"; private static final String passwordDefault = "lakesoul_test"; - private static final String hostDefault = "127.0.0.1"; - private static final String portDefault = "5432"; - private static final String dbNameDefault = "lakesoul_test"; private static final String driverNameKey = "lakesoul.pg.driver"; private static final String urlKey = "lakesoul.pg.url"; - private static final String usernameKey = "lakesoul.pg.username"; - private static final String passwordKey = "lakesoul.pg.password"; - private static final String hostKey = "lakesoul.pg.host"; - private static final String portKey = "lakesoul.pg.port"; - private static final String dbNameKey = "lakesoul.pg.dbName"; + public static final String usernameKey = "lakesoul.pg.username"; + public static final String passwordKey = "lakesoul.pg.password"; private static final String driverNameEnv = "LAKESOUL_PG_DRIVER"; private static final String urlEnv = "LAKESOUL_PG_URL"; @@ -96,16 +92,20 @@ public static DataBaseProperty getDBInfo() { properties.setProperty(urlKey, getConfigValue(urlEnv, urlKey, urlDefault)); properties.setProperty(usernameKey, getConfigValue(usernameEnv, usernameKey, usernameDefault)); properties.setProperty(passwordKey, getConfigValue(passwordEnv, passwordKey, passwordDefault)); - properties.setProperty(dbNameKey, getConfigValue(dbNameKey, dbNameKey, dbNameDefault)); } DataBaseProperty dataBaseProperty = new DataBaseProperty(); dataBaseProperty.setDriver(properties.getProperty(driverNameKey, driverNameDefault)); dataBaseProperty.setUrl(properties.getProperty(urlKey, urlDefault)); dataBaseProperty.setUsername(properties.getProperty(usernameKey, usernameDefault)); dataBaseProperty.setPassword(properties.getProperty(passwordKey, passwordDefault)); - dataBaseProperty.setDbName(properties.getProperty(dbNameKey, dbNameDefault)); - dataBaseProperty.setHost(properties.getProperty(hostKey, hostDefault)); - dataBaseProperty.setPort(properties.getProperty(portKey, portDefault)); + try { + URL url = new URL(properties.getProperty(urlKey, urlDefault).replaceFirst("jdbc:postgresql", "http")); + dataBaseProperty.setDbName(url.getPath().substring(1)); + dataBaseProperty.setHost(url.getHost()); + dataBaseProperty.setPort(String.valueOf(url.getPort())); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } return dataBaseProperty; } diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/NamespaceDao.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/NamespaceDao.java index 96b782ac9..3ec6f399b 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/NamespaceDao.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/dao/NamespaceDao.java @@ -10,6 +10,9 @@ import com.dmetasoul.lakesoul.meta.entity.Namespace; import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient; import com.dmetasoul.lakesoul.meta.jnr.NativeUtils; +import com.dmetasoul.lakesoul.meta.rbac.AuthZContext; +import com.dmetasoul.lakesoul.meta.rbac.AuthZEnforcer; +import dev.failsafe.internal.util.Lists; import java.sql.Connection; import java.sql.PreparedStatement; @@ -167,10 +170,11 @@ public void clean() { } public static Namespace namespaceFromResultSet(ResultSet rs) throws SQLException { + String comment = rs.getString("comment"); return Namespace.newBuilder() .setNamespace(rs.getString("namespace")) .setProperties(rs.getString("properties")) - .setComment(rs.getString("comment")) + .setComment(comment == null ? "" : comment) .setDomain(rs.getString("domain")) .build(); } @@ -180,6 +184,7 @@ public static Namespace namespaceFromResultSet(ResultSet rs) throws SQLException .setNamespace(DBConfig.LAKESOUL_DEFAULT_NAMESPACE) .setProperties("{}") .setComment("") + .setDomain(AuthZContext.getInstance().getDomain()) .build(); } diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java index c000edd51..0e824dfd6 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/JnrLoader.java @@ -25,7 +25,6 @@ public class JnrLoader { public static LibLakeSoulMetaData get() { JnrLoader.tryLoad(); - System.out.println(INSTANCE.libLakeSoulMetaData); return INSTANCE.libLakeSoulMetaData; } @@ -59,7 +58,6 @@ public synchronized static void tryLoad() { libraryOptions.put(LibraryOption.LoadNow, true); libraryOptions.put(LibraryOption.IgnoreError, true); - System.out.println(finalPath); JnrLoader.INSTANCE.libLakeSoulMetaData = LibraryLoader.loadLibrary( LibLakeSoulMetaData.class, libraryOptions, diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java index 307069b96..83581b1cb 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java @@ -60,6 +60,12 @@ public class NativeMetadataJavaClient implements AutoCloseable { private final ReentrantReadWriteLock lock; + private static DataBaseProperty dataBaseProperty = null; + + public static void setDataBaseProperty(DataBaseProperty dataBaseProperty) { + NativeMetadataJavaClient.dataBaseProperty = dataBaseProperty; + } + public NativeMetadataJavaClient() { this(5000L, 1 << 12, 1 << 16); } @@ -85,6 +91,7 @@ public static NativeMetadataJavaClient getInstance() { return instance; } + public Pointer getTokioPostgresClient() { return tokioPostgresClient; } @@ -183,9 +190,11 @@ public void close() { } private void initialize() { - DataBaseProperty dataBaseProperty = DBUtil.getDBInfo(); + DataBaseProperty dataBaseProperty = NativeMetadataJavaClient.dataBaseProperty; + if (dataBaseProperty == null) { + dataBaseProperty = DBUtil.getDBInfo(); + } tokioRuntime = libLakeSoulMetaData.create_tokio_runtime(); - System.out.println("create_tokio_runtime success"); String config = String.format( "host=%s port=%s dbname=%s user=%s password=%s", @@ -200,14 +209,13 @@ private void initialize() { if (msg.isEmpty()) { future.complete(bool); } else { - System.out.println(msg); + System.err.println(msg); future.completeExceptionally(new IOException(msg)); } }, getbooleanCallbackObjectReferenceManager()), config, tokioRuntime ); - System.out.println("create_tokio_postgres_client success"); preparedStatement = libLakeSoulMetaData.create_prepared_statement(); try { future.get(timeout, TimeUnit.MILLISECONDS); @@ -431,7 +439,7 @@ public static int cleanMeta() { } @Override - public void close() throws Exception { + public void close() { if (tokioRuntime != null) { libLakeSoulMetaData.free_tokio_runtime(tokioRuntime); tokioRuntime = null; @@ -445,4 +453,11 @@ public void close() throws Exception { preparedStatement = null; } } + + public static void closeAll() { + if (instance != null) { + instance.close(); + instance = null; + } + } } diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java index 38ce2a11a..1b63eae89 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeUtils.java @@ -1,3 +1,7 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + package com.dmetasoul.lakesoul.meta.jnr; public class NativeUtils { diff --git a/lakesoul-flink/src/test/java/com/dmetasoul/lakesoul/meta/LakeSoulRBACTest.java b/lakesoul-flink/src/test/java/com/dmetasoul/lakesoul/meta/LakeSoulRBACTest.java index d9bef86b6..a23d9c56e 100644 --- a/lakesoul-flink/src/test/java/com/dmetasoul/lakesoul/meta/LakeSoulRBACTest.java +++ b/lakesoul-flink/src/test/java/com/dmetasoul/lakesoul/meta/LakeSoulRBACTest.java @@ -4,6 +4,7 @@ package com.dmetasoul.lakesoul.meta; +import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient; import org.apache.flink.lakesoul.test.LakeSoulFlinkTestBase; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -14,7 +15,7 @@ import java.util.List; public class LakeSoulRBACTest extends LakeSoulFlinkTestBase { - final String ADMIN1 = "admin1"; + final String ADMIN1 = "admin1"; final String ADMIN1_PASS = "admin1"; final String ADMIN2 = "admin2"; final String ADMIN2_PASS = "admin2"; @@ -23,21 +24,22 @@ public class LakeSoulRBACTest extends LakeSoulFlinkTestBase { final String DOMAIN1 = "domain1"; final String DOMAIN2 = "domain2"; - private void login(String username , String password, String domain) { + private void login(String username, String password, String domain) { System.setProperty(DBUtil.usernameKey, username); System.setProperty(DBUtil.passwordKey, password); System.setProperty(DBUtil.domainKey, domain); DBConnector.closeAllConnections(); + NativeMetadataJavaClient.closeAll(); } @Test - public void testDifferentDomain(){ + public void testDifferentDomain() { getTableEnv().useCatalog("lakesoul"); login(ADMIN1, ADMIN1_PASS, DOMAIN1); // create sql("create database if not exists database1"); - assert(sql("show databases").size() == 2); + assert (sql("show databases").size() == 2); // drop: coming soon // spark.sql("drop database database1").collect() // val df2 = spark.sql("show databases").toDF() @@ -53,12 +55,12 @@ public void testDifferentDomain(){ + " with ('format' = 'lakesoul', 'path' = '" + getTempDirUri("/lakeSource/table2") + "')"); - assert( sql("show tables").size() == 2); + assert (sql("show tables").size() == 2); // drop table sql("drop table table1"); sql("drop table table2"); - assert(sql("show tables").size() == 0); + assert (sql("show tables").size() == 0); // write and read data sql("create table if not exists table1 ( id int, foo string, bar string )" @@ -67,7 +69,7 @@ public void testDifferentDomain(){ + "')"); sql("insert into table1 values(1, 'foo1', 'bar1')"); sql("insert into table1 values(2, 'foo2', 'bar2')"); - assert(sql("select * from table1").size() == 2); + assert (sql("select * from table1").size() == 2); // create & drop database sql("insert into table1 values(3, 'foo3', 'bar3')"); @@ -75,13 +77,13 @@ public void testDifferentDomain(){ try { sql("use database1"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e){ - assert(e instanceof CatalogException); + } catch (Exception e) { + assert (e instanceof CatalogException); } - try{ + try { sql("create database if not exists database2"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e) { + } catch (Exception e) { System.out.println(e.getMessage()); assert e.getMessage().contains("Could not execute CREATE DATABASE"); } @@ -96,14 +98,14 @@ public void testDifferentDomain(){ + getTempDirUri("/lakeSource/table3") + "')"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e){ + } catch (Exception e) { System.out.println(e.getMessage()); assert e.getCause() instanceof DatabaseNotExistException; } try { sql("drop table database1.table1"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e){ + } catch (Exception e) { System.out.println(e.getMessage()); assert e.getMessage().contains("Table with identifier 'lakesoul.database1.table1' does not exist."); } @@ -112,17 +114,17 @@ public void testDifferentDomain(){ try { sql("insert into database1.table1 values(4, 'foo4', 'bar4')"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e){ + } catch (Exception e) { System.out.println(e.getMessage()); - assert(e.getMessage().contains("Cannot find table '`lakesoul`.`database1`.`table1`' in any of the catalogs")); + assert (e.getMessage().contains("Cannot find table '`lakesoul`.`database1`.`table1`' in any of the catalogs")); } try { sql("select * from database1.table1"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e){ + } catch (Exception e) { System.out.println(e.getMessage()); - assert(e.getMessage().contains("Object 'database1' not found within 'lakesoul'")); + assert (e.getMessage().contains("Object 'database1' not found within 'lakesoul'")); } // clear test @@ -132,7 +134,7 @@ public void testDifferentDomain(){ } @Test - public void testDifferentRole(){ + public void testDifferentRole() { getTableEnv().useCatalog("lakesoul"); login(ADMIN1, ADMIN1_PASS, DOMAIN1); // create @@ -142,17 +144,17 @@ public void testDifferentRole(){ login(USER1, USER1_PASS, DOMAIN1); // create table & drop database sql("use database1"); - try{ + try { sql("create database if not exists database3"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e) { + } catch (Exception e) { System.out.println(e.getMessage()); assert e.getMessage().contains("Could not execute CREATE DATABASE"); } - try{ + try { sql("drop database database1"); throw new RuntimeException("test state was unexcepted"); - }catch (Exception e) { + } catch (Exception e) { System.out.println(e.getMessage()); assert e.getMessage().contains("Could not execute DROP DATABASE lakesoul.database1 RESTRICT"); } @@ -166,10 +168,10 @@ public void testDifferentRole(){ + " with ('format' = 'lakesoul', 'path' = '" + getTempDirUri("/lakeSource/table2") + "')"); - assert(sql("show tables").size() == 2); + assert (sql("show tables").size() == 2); sql("drop table table1"); sql("drop table table2"); - assert(sql("show tables").size() == 0); + assert (sql("show tables").size() == 0); // CRUD data sql("create table if not exists table1 ( id int, foo string, bar string )" @@ -178,7 +180,7 @@ public void testDifferentRole(){ + "')"); sql("insert into table1 values(1, 'foo1', 'bar1')"); sql("insert into table1 values(2, 'foo2', 'bar2')"); - assert(sql("select * from table1").size() == 2); + assert (sql("select * from table1").size() == 2); // clear test sql("drop table table1"); diff --git a/lakesoul-spark/scriptFile.scala b/lakesoul-spark/scriptFile.scala deleted file mode 100644 index ae37267df..000000000 --- a/lakesoul-spark/scriptFile.scala +++ /dev/null @@ -1 +0,0 @@ -lakesoul.pg.host = "" \ No newline at end of file diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala index a7aa90802..7de9b6ae6 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala @@ -56,14 +56,12 @@ class SnapshotManagement(path: String, namespace: String) extends Logging { private def getCurrentSnapshot: Snapshot = { if (LakeSoulSourceUtils.isLakeSoulTableExists(table_path)) { - logError("createSnapshot for " + table_path) createSnapshot } else { //table_name in SnapshotManagement must be a root path, and its parent path shouldn't be lakesoul table if (LakeSoulUtils.isLakeSoulTable(table_path)) { throw new AnalysisException("table_name is expected as root path in SnapshotManagement") } - logError("initSnapshot for " + table_path) initSnapshot } } diff --git a/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/meta/RBACOperatinSuite.scala b/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/meta/RBACOperatinSuite.scala index 3839d2261..42dcb9419 100644 --- a/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/meta/RBACOperatinSuite.scala +++ b/lakesoul-spark/src/test/scala/com/dmetasoul/lakesoul/meta/RBACOperatinSuite.scala @@ -4,6 +4,7 @@ package com.dmetasoul.lakesoul.meta +import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient import com.dmetasoul.lakesoul.meta.rbac.AuthZEnforcer import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, UnresolvedTableOrView} @@ -42,8 +43,8 @@ class RBACOperatinSuite extends QueryTest System.setProperty(DBUtil.usernameKey, username) System.setProperty(DBUtil.passwordKey, password) System.setProperty(DBUtil.domainKey, domain) - DBConnector.closeAllConnections() - + DBConnector.closeAllConnections + NativeMetadataJavaClient.closeAll } test("testDifferentDomain") { @@ -53,10 +54,10 @@ class RBACOperatinSuite extends QueryTest val df = spark.sql("show databases").toDF() assert(df.count() == 2) // drop: coming soon -// spark.sql("drop database database1").collect() -// val df2 = spark.sql("show databases").toDF() -// assert(df2.count() == 1) -// assert(df2.collectAsList().get(0).getString(1).equals("default")) + // spark.sql("drop database database1").collect() + // val df2 = spark.sql("show databases").toDF() + // assert(df2.count() == 1) + // assert(df2.collectAsList().get(0).getString(1).equals("default")) // create tables spark.sql("use database1;") spark.sql("create table if not exists table1 ( id int, foo string, bar string ) using lakesoul ") @@ -78,7 +79,7 @@ class RBACOperatinSuite extends QueryTest assert(df5.count() == 2) // update data - spark.sql("update table1 set foo = 'foo3', bar = 'bar3' where id = 2") + spark.sql("update table1 set foo = 'foo3', bar = 'bar3' where id = 2") val df6 = spark.sql("select (id, foo, bar) from table1 where id = 2").toDF() val row = df6.collectAsList().get(0).get(0).asInstanceOf[GenericRowWithSchema]; assert(row.getString(1).equals("foo3")) @@ -91,14 +92,14 @@ class RBACOperatinSuite extends QueryTest - // create & drop database + // create & drop database spark.sql("insert into table1 values(3, 'foo3', 'bar3')") login(ADMIN2, ADMIN2_PASS, DOMAIN1) val err0 = intercept[Exception] { spark.sql("use database1;") } assert(err0.isInstanceOf[NoSuchNamespaceException]) - val err1 = intercept[Exception] { + val err1 = intercept[Exception] { spark.sql("create database if not exists database2") } println(err1.getMessage) diff --git a/native-metadata/Cargo.lock b/native-metadata/Cargo.lock index 1fbd8ff39..c9106cb2b 100644 --- a/native-metadata/Cargo.lock +++ b/native-metadata/Cargo.lock @@ -55,6 +55,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "autotools" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef8da1805e028a172334c3b680f93e71126f2327622faef2ec3d893c0a4ad77" +dependencies = [ + "cc", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -737,6 +746,16 @@ dependencies = [ "bytes", "prost", "prost-build", + "protobuf-src", +] + +[[package]] +name = "protobuf-src" +version = "1.1.0+21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7ac8852baeb3cc6fb83b93646fb93c0ffe5d14bf138c945ceb4b9948ee0e3c1" +dependencies = [ + "autotools", ] [[package]] diff --git a/native-metadata/lakesoul-metadata-c/src/lib.rs b/native-metadata/lakesoul-metadata-c/src/lib.rs index b082d638b..cca63828d 100644 --- a/native-metadata/lakesoul-metadata-c/src/lib.rs +++ b/native-metadata/lakesoul-metadata-c/src/lib.rs @@ -87,16 +87,6 @@ fn string_from_ptr(ptr: *const c_char) -> String { pub type ResultCallback = extern "C" fn(T, *const c_char); -#[no_mangle] -pub extern "C" fn namespace( - bytes: *const c_uchar, - len: i32, -) { - println!("rust::namespace"); - let a = unsafe {std::slice::from_raw_parts(bytes, len as usize)}; - println!("{:?}", entity::Namespace::decode(prost::bytes::Bytes::from(a)).unwrap()); -} - #[no_mangle] pub extern "C" fn execute_insert( callback: extern "C" fn(i32, *const c_char), diff --git a/native-metadata/lakesoul-metadata/src/lib.rs b/native-metadata/lakesoul-metadata/src/lib.rs index 67816c0a1..0e1f0d88a 100644 --- a/native-metadata/lakesoul-metadata/src/lib.rs +++ b/native-metadata/lakesoul-metadata/src/lib.rs @@ -1,3 +1,7 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + #![feature(io_error_other)] #![feature(split_array)] use std::collections::HashMap; @@ -416,7 +420,6 @@ pub fn execute_query( return Err(std::io::Error::from(std::io::ErrorKind::InvalidInput)) } let query_type = DaoType::try_from(query_type).unwrap(); - println!("query: {:?} params={:?}", query_type, joined_string); let statement = match get_prepared_statement(runtime, client, prepared, &query_type) { Ok(statement) => statement, Err(err) => return Err(convert_to_io_error(err)) @@ -832,7 +835,6 @@ pub fn execute_insert( return Err(std::io::Error::from(std::io::ErrorKind::InvalidInput)) } let insert_type = DaoType::try_from(insert_type).unwrap(); - println!("execute {:?} {:?}", insert_type, wrapper); let statement = match get_prepared_statement(runtime, client, prepared, &insert_type) { Ok(statement) => statement, Err(err) => return Err(convert_to_io_error(err)) @@ -1109,7 +1111,6 @@ pub fn execute_update( return Err(std::io::Error::from(std::io::ErrorKind::InvalidInput)) } let update_type = DaoType::try_from(update_type).unwrap(); - println!("update: {:?} params={:?}", update_type, joined_string); let statement = match get_prepared_statement(runtime, client, prepared, &update_type) { Ok(statement) => statement, Err(err) => return Err(convert_to_io_error(err)) @@ -1182,7 +1183,6 @@ pub fn execute_update( filter_params.push(params[3].clone()); } statement += " where table_id = $1::TEXT"; - println!("UpdateTableInfoById sql={} params={:?}", statement, filter_params); runtime.block_on(async{ match idx { 3 => client.execute(&statement, &[¶ms[0], &filter_params[0]]).await, @@ -1242,7 +1242,6 @@ pub fn execute_query_scalar( return Err(std::io::Error::from(std::io::ErrorKind::InvalidInput)) } let query_type = DaoType::try_from(query_type).unwrap(); - println!("query: {:?} params={:?}", query_type, joined_string); let statement = match get_prepared_statement(runtime, client, prepared, &query_type) { Ok(statement) => statement, Err(err) => return Err(convert_to_io_error(err)) @@ -1327,16 +1326,15 @@ pub fn clean_meta_for_test( client: &Client ) ->Result { let result = runtime.block_on(async{ - let _ = client.batch_execute("delete from namespace; + client.batch_execute("delete from namespace; delete from data_commit_info; delete from table_info; delete from table_path_id; delete from table_name_id; - delete from partition_info;").await; - client.execute("insert into namespace(namespace, properties, comment) values ('default', '{}', '');", &[]).await + delete from partition_info;").await }); match result { - Ok(count) => Ok(count as i32), + Ok(_) => Ok(0i32), Err(e) => Err(convert_to_io_error(e)), } } @@ -1353,7 +1351,6 @@ pub fn create_connection( runtime: &Runtime, config: String ) -> Result { - println!("{}", config); let (client, connection) = match runtime.block_on(async { tokio_postgres::connect(config.as_str(), NoTls).await }) { @@ -1363,7 +1360,6 @@ pub fn create_connection( return Err(std::io::Error::from(std::io::ErrorKind::ConnectionRefused)) } }; - println!("create_connection done"); runtime.spawn(async move { if let Err(e) = connection.await { diff --git a/native-metadata/lakesoul-metadata/src/main.rs b/native-metadata/lakesoul-metadata/src/main.rs deleted file mode 100644 index b6828fe03..000000000 --- a/native-metadata/lakesoul-metadata/src/main.rs +++ /dev/null @@ -1,35 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - -use tokio_postgres::{NoTls, Error}; - -#[tokio::main] // By default, tokio_postgres uses the tokio crate as its runtime. -async fn main() -> Result<(), Error> { - // Connect to the database. - let (client, connection) = - tokio_postgres::connect("host=localhost port=5433 user=yugabyte", NoTls).await?; - - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - - // Now we can execute a simple statement that just returns its parameter. - - let text_array:Vec<&str> = vec!["range=__L@KE$OUL_EMPTY_STRING__"]; - println!("{:?}", text_array); - let rows = client - .query("SELECT $1::TEXT, $2::TEXT[] where '2' in ($3::TEXT[])", &[&Some("hello world"), &text_array, &text_array]) - .await?; - - // And then check that we got back the same string we sent over. - let value: &str = rows[0].get(0); - println!("{:?} \n{:?}", rows, rows[0].get::>(1)); - assert_eq!(value, "hello world"); - - Ok(()) -} diff --git a/native-metadata/proto/Cargo.toml b/native-metadata/proto/Cargo.toml index a68119a0f..1cdb5099c 100644 --- a/native-metadata/proto/Cargo.toml +++ b/native-metadata/proto/Cargo.toml @@ -15,4 +15,7 @@ bytes = "1" prost = "0.11" [build-dependencies] -prost-build = "0.11" \ No newline at end of file +prost-build = "0.11" + +[target.'cfg(target_os = "linux")'.build-dependencies] +protobuf-src = "1.1.0" \ No newline at end of file diff --git a/native-metadata/proto/build.rs b/native-metadata/proto/build.rs index 8e256b07b..d865591df 100644 --- a/native-metadata/proto/build.rs +++ b/native-metadata/proto/build.rs @@ -5,6 +5,10 @@ extern crate prost_build; fn main() { + #[cfg(target_os = "linux")] + { + std::env::set_var("PROTOC", protobuf_src::protoc()); + } prost_build::compile_protos(&["src/entity.proto"], &["src/"]).unwrap(); } \ No newline at end of file diff --git a/native-metadata/proto/src/lib.rs b/native-metadata/proto/src/lib.rs index dcf2b24f9..800899bfc 100644 --- a/native-metadata/proto/src/lib.rs +++ b/native-metadata/proto/src/lib.rs @@ -1,3 +1,7 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + pub mod proto{ pub mod entity { include!(concat!(env!("OUT_DIR"), "/proto.entity.rs")); diff --git a/script/benchmark/work-dir/lakesoul.properties b/script/benchmark/work-dir/lakesoul.properties index ff4197f57..1f1edc7d1 100644 --- a/script/benchmark/work-dir/lakesoul.properties +++ b/script/benchmark/work-dir/lakesoul.properties @@ -1,7 +1,6 @@ # SPDX-FileCopyrightText: 2023 LakeSoul Contributors # # SPDX-License-Identifier: Apache-2.0 - lakesoul.pg.driver=com.lakesoul.shaded.org.postgresql.Driver lakesoul.pg.url=jdbc:postgresql://lakesoul-docker-compose-env-lakesoul-meta-db-1:5432/lakesoul_test?stringtype=unspecified lakesoul.pg.username=lakesoul_test