From 3fa3a9af64aa630db5f8fa159cf8c3b95fe21e0a Mon Sep 17 00:00:00 2001 From: Ceng <441651826@qq.com> Date: Tue, 10 Oct 2023 18:26:02 +0800 Subject: [PATCH] cleanup redundant DataOperation (#346) Signed-off-by: zenghua Co-authored-by: zenghua --- lakesoul-common/pom.xml | 34 +++ .../lakesoul/meta/DataOperation.scala | 48 ++-- .../lakesoul/meta/LakeSoulOptions.scala | 0 .../dmetasoul/lakesoul/meta/MetaVersion.scala | 22 +- .../apache/flink/lakesoul/tool/FlinkUtil.java | 8 +- .../lakesoul/meta/DataOperation.scala | 208 -------------- .../dmetasoul/lakesoul/meta/MetaVersion.scala | 264 ------------------ .../lakesoul/meta/DataOperation.scala | 244 ---------------- .../dmetasoul/lakesoul/meta/MetaVersion.scala | 24 +- .../v2/merge/MergeParquetScan.scala | 4 +- .../v2/merge/MergePartitionedFileUtil.scala | 3 +- .../sql/lakesoul/DelayedCommitProtocol.scala | 6 +- .../sql/lakesoul/LakeSoulFileIndex.scala | 15 +- .../sql/lakesoul/LakeSoulPartFileMerge.scala | 4 +- .../spark/sql/lakesoul/LakeSoulUtils.scala | 3 +- .../spark/sql/lakesoul/PartitionFilter.scala | 6 +- .../apache/spark/sql/lakesoul/Snapshot.scala | 5 +- .../sql/lakesoul/SnapshotManagement.scala | 6 +- .../sql/lakesoul/TransactionCommit.scala | 12 +- .../sql/lakesoul/TransactionalWrite.scala | 18 +- .../catalog/LakeSoulScanBuilder.scala | 3 +- .../spark/sql/lakesoul/commands/Command.scala | 2 +- .../lakesoul/commands/CompactionCommand.scala | 7 +- .../commands/CreateTableCommand.scala | 4 +- .../sql/lakesoul/commands/DeleteCommand.scala | 2 +- .../lakesoul/commands/DropTableCommand.scala | 1 - .../sql/lakesoul/commands/UpdateCommand.scala | 2 +- .../sql/lakesoul/commands/UpsertCommand.scala | 3 +- .../lakesoul/commands/WriteIntoTable.scala | 12 +- .../commands/alterTableCommands.scala | 2 +- .../lakesoul/exception/MetaRerunErrors.scala | 7 +- .../sources/LakeSoulSourceUtils.scala | 4 +- .../spark/sql/lakesoul/utils/MetaData.scala | 40 +-- .../spark/sql/lakesoul/utils/SparkUtil.scala | 2 +- .../sql/lakesoul/TableCreationTests.scala | 3 +- .../lakesoul/commands/AlterTableTests.scala | 7 +- .../schema/CaseSensitivitySuite.scala | 5 +- .../schema/InvariantEnforcementSuite.scala | 3 +- 38 files changed, 174 insertions(+), 869 deletions(-) rename {lakesoul-presto => lakesoul-common}/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala (82%) rename {lakesoul-presto => lakesoul-common}/src/main/scala/com/dmetasoul/lakesoul/meta/LakeSoulOptions.scala (100%) rename {lakesoul-flink => lakesoul-common}/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala (94%) delete mode 100644 lakesoul-flink/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala delete mode 100644 lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala delete mode 100644 lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala diff --git a/lakesoul-common/pom.xml b/lakesoul-common/pom.xml index 2ac6da470..1ae690d62 100644 --- a/lakesoul-common/pom.xml +++ b/lakesoul-common/pom.xml @@ -90,6 +90,28 @@ SPDX-License-Identifier: Apache-2.0 + + net.alchim31.maven + scala-maven-plugin + 4.6.3 + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + @@ -212,6 +234,18 @@ SPDX-License-Identifier: Apache-2.0 native + + com.google.guava + guava + 32.0.0-jre + + + org.apache.hadoop + hadoop-client-api + 3.3.2 + ${local.scope} + + diff --git a/lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala similarity index 82% rename from lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala rename to lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala index 64f4b121d..04c0250a3 100644 --- a/lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala +++ b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala @@ -5,10 +5,10 @@ package com.dmetasoul.lakesoul.meta import com.dmetasoul.lakesoul.meta.entity.DataCommitInfo -import com.facebook.presto.lakesoul.pojo.Path +import com.google.common.collect.Lists +import org.apache.hadoop.fs.Path import java.util.{Objects, UUID} -import com.google.common.collect.Lists import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter} import scala.collection.mutable.ArrayBuffer import scala.collection.{JavaConverters, mutable} @@ -42,10 +42,15 @@ case class DataFileInfo(range_partitions: String, path: String, file_op: String, override def hashCode(): Int = { Objects.hash(range_partitions, path, file_op) } + + lazy val range_version: String = range_partitions + "-" + file_exist_cols + + //trans to files which need to delete + def expire(deleteTime: Long): DataFileInfo = this.copy(modification_time = deleteTime) } -case class PartitionInfo(table_id: String, range_value: String, version: Int = -1, - read_files: Array[UUID] = Array.empty[UUID], expression: String = "", commit_op: String = "") { +case class PartitionInfoScala(table_id: String, range_value: String, version: Int = -1, + read_files: Array[UUID] = Array.empty[UUID], expression: String = "", commit_op: String = "") { override def toString: String = { s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}" } @@ -59,7 +64,7 @@ object DataOperation { getTableDataInfo(MetaVersion.getAllPartitionInfo(tableId)) } - def getTableDataInfo(partition_info_arr: Array[PartitionInfo]): Array[DataFileInfo] = { + def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo] = { val file_info_buf = new ArrayBuffer[DataFileInfo]() @@ -73,7 +78,7 @@ object DataOperation { def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo] = { val Pars = MetaVersion.getAllPartitionInfo(tableId) - val partitionInfos = new ArrayBuffer[PartitionInfo]() + val partitionInfos = new ArrayBuffer[PartitionInfoScala]() for (partition_info <- Pars) { var contained = true; for (item <- partitions) { @@ -120,14 +125,14 @@ object DataOperation { } //get fies info in this partition that match the current read version - private def getSinglePartitionDataInfo(partition_info: PartitionInfo): ArrayBuffer[DataFileInfo] = { + def getSinglePartitionDataInfo(partition_info: PartitionInfoScala): ArrayBuffer[DataFileInfo] = { val file_arr_buf = new ArrayBuffer[DataFileInfo]() - val metaPartitionInfo = entity.PartitionInfo.newBuilder - metaPartitionInfo.setTableId(partition_info.table_id) - metaPartitionInfo.setPartitionDesc(partition_info.range_value) - metaPartitionInfo.addAllSnapshot(JavaConverters.bufferAsJavaList(partition_info.read_files.map(DBUtil.toProtoUuid).toBuffer)) - val dataCommitInfoList = dbManager.getTableSinglePartitionDataInfo(metaPartitionInfo.build).asScala.toArray + val metaPartitionInfoScala = entity.PartitionInfo.newBuilder + metaPartitionInfoScala.setTableId(partition_info.table_id) + metaPartitionInfoScala.setPartitionDesc(partition_info.range_value) + metaPartitionInfoScala.addAllSnapshot(JavaConverters.bufferAsJavaList(partition_info.read_files.map(DBUtil.toProtoUuid).toBuffer)) + val dataCommitInfoList = dbManager.getTableSinglePartitionDataInfo(metaPartitionInfoScala.build).asScala.toArray for (metaDataCommitInfo <- dataCommitInfoList) { val fileOps = metaDataCommitInfo.getFileOpsList.asScala.toArray for (file <- fileOps) { @@ -151,8 +156,8 @@ object DataOperation { getSinglePartitionDataInfo(table_id, partition_desc, startTimestamp, endTime, readType).toArray } - private def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long, - endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = { + def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long, + endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = { if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ) || readType .equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) { if (null == partition_desc || "".equals(partition_desc)) { @@ -175,9 +180,9 @@ object DataOperation { } } - private def getSinglePartitionIncrementalDataInfos(table_id: String, partition_desc: String, - startVersionTimestamp: Long, - endVersionTimestamp: Long): ArrayBuffer[DataFileInfo] = { + def getSinglePartitionIncrementalDataInfos(table_id: String, partition_desc: String, + startVersionTimestamp: Long, + endVersionTimestamp: Long): ArrayBuffer[DataFileInfo] = { val preVersionUUIDs = new mutable.LinkedHashSet[UUID]() val compactionUUIDs = new mutable.LinkedHashSet[UUID]() val incrementalAllUUIDs = new mutable.LinkedHashSet[UUID]() @@ -221,4 +226,13 @@ object DataOperation { fillFiles(file_arr_buf, dataCommitInfoList) } } + + + def dropDataInfoData(table_id: String, range: String, commit_id: UUID): Unit = { + MetaVersion.dbManager.deleteDataCommitInfo(table_id, range, commit_id) + } + + def dropDataInfoData(table_id: String): Unit = { + MetaVersion.dbManager.deleteDataCommitInfo(table_id) + } } diff --git a/lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/LakeSoulOptions.scala b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/LakeSoulOptions.scala similarity index 100% rename from lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/LakeSoulOptions.scala rename to lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/LakeSoulOptions.scala diff --git a/lakesoul-flink/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala similarity index 94% rename from lakesoul-flink/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala rename to lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala index eb80327c1..9954b641c 100644 --- a/lakesoul-flink/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala +++ b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala @@ -120,9 +120,9 @@ object MetaVersion { // ) // } - def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfo = { + def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfoScala = { val info = dbManager.getSinglePartitionInfo(table_id, range_value) - PartitionInfo( + PartitionInfoScala( table_id = info.getTableId, range_value = range_value, version = info.getVersion, @@ -132,10 +132,10 @@ object MetaVersion { ) } - def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() + def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfoScala] = { + val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]() val info = dbManager.getSinglePartitionInfo(table_id, range_value, version) - partitionVersionBuffer += PartitionInfo( + partitionVersionBuffer += PartitionInfoScala( table_id = info.getTableId, range_value = range_value, version = info.getVersion, @@ -147,12 +147,12 @@ object MetaVersion { } - def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() + def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfoScala] = { + val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]() val res_itr = dbManager.getOnePartitionVersions(table_id, range_value).iterator() while (res_itr.hasNext) { val res = res_itr.next() - partitionVersionBuffer += PartitionInfo( + partitionVersionBuffer += PartitionInfoScala( table_id = res.getTableId, range_value = res.getPartitionDesc, version = res.getVersion, @@ -184,12 +184,12 @@ object MetaVersion { (false, "") } - def getAllPartitionInfo(table_id: String): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() + def getAllPartitionInfo(table_id: String): Array[PartitionInfoScala] = { + val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]() val res_itr = dbManager.getAllPartitionInfo(table_id).iterator() while (res_itr.hasNext) { val res = res_itr.next() - partitionVersionBuffer += PartitionInfo( + partitionVersionBuffer += PartitionInfoScala( table_id = res.getTableId, range_value = res.getPartitionDesc, version = res.getVersion, diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java index 36d927bae..392ae92d6 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java @@ -405,11 +405,11 @@ public static DataFileInfo[] getTargetDataFileInfo(TableInfo tif, List partitionDescs = remainingPartitions.stream() .map(DBUtil::formatPartitionDesc) .collect(Collectors.toList()); - List partitionInfos = new ArrayList<>(); + List partitionInfos = new ArrayList<>(); for (String partitionDesc : partitionDescs) { partitionInfos.add(MetaVersion.getSinglePartitionInfo(tif.getTableId(), partitionDesc, "")); } - PartitionInfo[] ptinfos = partitionInfos.toArray(new PartitionInfo[0]); + PartitionInfoScala[] ptinfos = partitionInfos.toArray(new PartitionInfoScala[0]); return DataOperation.getTableDataInfo(ptinfos); } } @@ -433,8 +433,8 @@ public static Map>> splitDataInfosToRangeAndHash } public static DataFileInfo[] getSinglePartitionDataFileInfo(TableInfo tif, String partitionDesc) { - PartitionInfo partitionInfo = MetaVersion.getSinglePartitionInfo(tif.getTableId(), partitionDesc, ""); - return DataOperation.getTableDataInfo(new PartitionInfo[]{partitionInfo}); + PartitionInfoScala partitionInfo = MetaVersion.getSinglePartitionInfo(tif.getTableId(), partitionDesc, ""); + return DataOperation.getTableDataInfo(new PartitionInfoScala[]{partitionInfo}); } public static int[] getFieldPositions(String[] fields, List allFields) { diff --git a/lakesoul-flink/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala b/lakesoul-flink/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala deleted file mode 100644 index 225e16362..000000000 --- a/lakesoul-flink/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala +++ /dev/null @@ -1,208 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - -package com.dmetasoul.lakesoul.meta - -import com.dmetasoul.lakesoul.meta.entity.DataCommitInfo -import org.apache.flink.core.fs.Path -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists - -import java.util.{Objects, UUID} -import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter} -import scala.collection.mutable.ArrayBuffer -import scala.collection.{JavaConverters, mutable} -import scala.util.control.Breaks - -object BucketingUtils { - // The file name of bucketed data should have 3 parts: - // 1. some other information in the head of file name - // 2. bucket id part, some numbers, starts with "_" - // * The other-information part may use `-` as separator and may have numbers at the end, - // e.g. a normal parquet file without bucketing may have name: - // part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and we will mistakenly - // treat `431234567891` as bucket id. So here we pick `_` as separator. - // 3. optional file extension part, in the tail of file name, starts with `.` - // An example of bucketed parquet file name with bucket id 3: - // part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet - private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r - - def getBucketId(fileName: String): Option[Int] = fileName match { - case bucketedFileName(bucketId) => Some(bucketId.toInt) - case _ => None - } -} - -case class DataFileInfo(range_partitions: String, path: String, file_op: String, size: Long, - modification_time: Long = -1L, file_exist_cols: String = "") { - - lazy val file_bucket_id: Int = BucketingUtils.getBucketId(new Path(path).getName) - .getOrElse(sys.error(s"Invalid bucket file $path")) - - override def hashCode(): Int = { - Objects.hash(range_partitions, path, file_op) - } -} - - -case class PartitionInfo(table_id: String, range_value: String, version: Int = -1, - read_files: Array[UUID] = Array.empty[UUID], expression: String = "", commit_op: String = "") { - override def toString: String = { - s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}" - } -} - -object DataOperation { - - val dbManager = new DBManager - - def getTableDataInfo(tableId: String): Array[DataFileInfo] = { - getTableDataInfo(MetaVersion.getAllPartitionInfo(tableId)) - } - - def getTableDataInfo(partition_info_arr: Array[PartitionInfo]): Array[DataFileInfo] = { - - val file_info_buf = new ArrayBuffer[DataFileInfo]() - - for (partition_info <- partition_info_arr) { - file_info_buf ++= getSinglePartitionDataInfo(partition_info) - } - - file_info_buf.toArray - } - - - private def filterFiles(file_arr_buf: ArrayBuffer[DataFileInfo]): ArrayBuffer[DataFileInfo] = { - val dupCheck = new mutable.HashSet[String]() - val file_res_arr_buf = new ArrayBuffer[DataFileInfo]() - if (file_arr_buf.length > 1) { - for (i <- Range(file_arr_buf.size - 1, -1, -1)) { - if (file_arr_buf(i).file_op.equals("del")) { - dupCheck.add(file_arr_buf(i).path) - } else { - if (dupCheck.isEmpty || !dupCheck.contains(file_arr_buf(i).path)) { - file_res_arr_buf += file_arr_buf(i) - } - } - } - file_res_arr_buf.reverse - } else { - file_arr_buf.filter(_.file_op.equals("add")) - } - } - - private def fillFiles(file_arr_buf: ArrayBuffer[DataFileInfo], - dataCommitInfoList: Array[DataCommitInfo]): ArrayBuffer[DataFileInfo] = { - dataCommitInfoList.foreach(data_commit_info => { - val fileOps = data_commit_info.getFileOpsList.asScala.toArray - fileOps.foreach(file => { - file_arr_buf += DataFileInfo(data_commit_info.getPartitionDesc, file.getPath, file.getFileOp.name, file.getSize, - data_commit_info.getTimestamp, file.getFileExistCols) - }) - }) - filterFiles(file_arr_buf) - } - - //get fies info in this partition that match the current read version - private def getSinglePartitionDataInfo(partition_info: PartitionInfo): ArrayBuffer[DataFileInfo] = { - val file_arr_buf = new ArrayBuffer[DataFileInfo]() - - val metaPartitionInfo = entity.PartitionInfo.newBuilder - metaPartitionInfo.setTableId(partition_info.table_id) - metaPartitionInfo.setPartitionDesc(partition_info.range_value) - metaPartitionInfo.addAllSnapshot(JavaConverters.bufferAsJavaList(partition_info.read_files.map(DBUtil.toProtoUuid).toBuffer)) - val dataCommitInfoList = dbManager.getTableSinglePartitionDataInfo(metaPartitionInfo.build).asScala.toArray - for (metaDataCommitInfo <- dataCommitInfoList) { - val fileOps = metaDataCommitInfo.getFileOpsList.asScala.toArray - for (file <- fileOps) { - file_arr_buf += DataFileInfo(partition_info.range_value, file.getPath, file.getFileOp.name, file.getSize, - metaDataCommitInfo.getTimestamp, file.getFileExistCols) - } - } - filterFiles(file_arr_buf) - } - - private def getSinglePartitionDataInfo(table_id: String, partition_desc: String, - version: Int): ArrayBuffer[DataFileInfo] = { - val file_arr_buf = new ArrayBuffer[DataFileInfo]() - val dataCommitInfoList = dbManager.getPartitionSnapshot(table_id, partition_desc, version).asScala.toArray - fillFiles(file_arr_buf, dataCommitInfoList) - } - - def getIncrementalPartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long, - endTimestamp: Long, readType: String): Array[DataFileInfo] = { - val endTime = if (endTimestamp == 0) Long.MaxValue else endTimestamp - getSinglePartitionDataInfo(table_id, partition_desc, startTimestamp, endTime, readType).toArray - } - - private def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long, - endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = { - if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ) || readType - .equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) { - if (null == partition_desc || "".equals(partition_desc)) { - val partitions = dbManager.getAllPartitionInfo(table_id) - val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]() - partitions.forEach(partition => { - val preVersionTimestamp = dbManager - .getLastedVersionTimestampUptoTime(table_id, partition.getPartitionDesc, startTimestamp) - files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition.getPartitionDesc, - preVersionTimestamp, endTimestamp) - }) - files_all_partitions_buf - } else { - val preVersionTimestamp = dbManager.getLastedVersionTimestampUptoTime(table_id, partition_desc, startTimestamp) - getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp) - } - } else { - val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp) - getSinglePartitionDataInfo(table_id, partition_desc, version) - } - } - - private def getSinglePartitionIncrementalDataInfos(table_id: String, partition_desc: String, - startVersionTimestamp: Long, - endVersionTimestamp: Long): ArrayBuffer[DataFileInfo] = { - val preVersionUUIDs = new mutable.LinkedHashSet[UUID]() - val compactionUUIDs = new mutable.LinkedHashSet[UUID]() - val incrementalAllUUIDs = new mutable.LinkedHashSet[UUID]() - var updated: Boolean = false - val dataCommitInfoList = dbManager - .getIncrementalPartitionsFromTimestamp(table_id, partition_desc, startVersionTimestamp, endVersionTimestamp) - .asScala.toArray - var count: Int = 0 - val loop = new Breaks() - loop.breakable { - for (dataItem <- dataCommitInfoList) { - count += 1 - if ("UpdateCommit".equals(dataItem.getCommitOp) && startVersionTimestamp != dataItem - .getTimestamp && count != 1) { - updated = true - loop.break() - } - if (startVersionTimestamp == dataItem.getTimestamp) { - preVersionUUIDs ++= dataItem.getSnapshotList.asScala.map(DBUtil.toJavaUUID) - } else { - if ("CompactionCommit".equals(dataItem.getCommitOp)) { - val compactShotList = dataItem.getSnapshotList.asScala.map(DBUtil.toJavaUUID).toArray - compactionUUIDs += compactShotList(0) - if (compactShotList.length > 1) { - incrementalAllUUIDs ++= compactShotList.slice(1, compactShotList.length) - } - } else { - incrementalAllUUIDs ++= dataItem.getSnapshotList.asScala.map(DBUtil.toJavaUUID) - } - } - } - } - if (updated) { - new ArrayBuffer[DataFileInfo]() - } else { - val tmpUUIDs = incrementalAllUUIDs -- preVersionUUIDs - val resultUUID = tmpUUIDs -- compactionUUIDs - val file_arr_buf = new ArrayBuffer[DataFileInfo]() - val dataCommitInfoList = dbManager - .getDataCommitInfosFromUUIDs(table_id, partition_desc, Lists.newArrayList(resultUUID.map(DBUtil.toProtoUuid).asJava)).asScala.toArray - fillFiles(file_arr_buf, dataCommitInfoList) - } - } -} \ No newline at end of file diff --git a/lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala b/lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala deleted file mode 100644 index eb80327c1..000000000 --- a/lakesoul-presto/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala +++ /dev/null @@ -1,264 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - -package com.dmetasoul.lakesoul.meta - -import com.alibaba.fastjson.JSONObject - -import java.util -import java.util.UUID -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -object MetaVersion { - - val dbManager = new DBManager() - - def createNamespace(namespace: String): Unit = { - dbManager.createNewNamespace(namespace, new JSONObject().toJSONString, "") - } - - def listNamespaces(): Array[String] = { - dbManager.listNamespaces.asScala.toArray - } - - def isTableExists(table_name: String): Boolean = { - dbManager.isTableExists(table_name) - } - - def isTableIdExists(table_name: String, table_id: String): Boolean = { - dbManager.isTableIdExists(table_name, table_id) - } - - def isNamespaceExists(table_namespace: String): Boolean = { - dbManager.isNamespaceExists(table_namespace) - } - - // //check whether short_table_name exists, and return table path if exists - // def isShortTableNameExists(short_table_name: String): (Boolean, String) = - // isShortTableNameExists(short_table_name, LakeSoulCatalog.showCurrentNamespace().mkString(".")) - - //check whether short_table_name exists, and return table path if exists - def isShortTableNameExists(short_table_name: String, table_namespace: String): (Boolean, String) = { - val path = dbManager.getTablePathFromShortTableName(short_table_name, table_namespace) - if (path == null) (false, null) else (true, path) - } - - // def getTablePathFromShortTableName(short_table_name: String): String = - // getTablePathFromShortTableName(short_table_name, LakeSoulCatalog.showCurrentNamespace().mkString(".")) - - //get table path, if not exists, return "not found" - def getTablePathFromShortTableName(short_table_name: String, table_namespace: String): String = { - dbManager.getTablePathFromShortTableName(short_table_name, table_namespace) - } - - def createNewTable(table_namespace: String, - table_path: String, - short_table_name: String, - table_id: String, - table_schema: String, - range_column: String, - hash_column: String, - configuration: Map[String, String], - bucket_num: Int): Unit = { - - val partitions = DBUtil.formatTableInfoPartitionsField(hash_column, range_column) - val json = new JSONObject() - configuration.foreach(x => json.put(x._1, x._2)) - json.put("hashBucketNum", String.valueOf(bucket_num)) - dbManager.createNewTable(table_id, table_namespace, short_table_name, table_path, table_schema, json, partitions) - } - - // def listTables(): util.List[String] = { - // listTables(LakeSoulCatalog.showCurrentNamespace()) - // } - - def listTables(namespace: Array[String]): util.List[String] = { - dbManager.listTablePathsByNamespace(namespace.mkString(".")) - } - - // def getTableInfo(table_path: String): TableInfo = { - // getTableInfo(LakeSoulCatalog.showCurrentNamespace().mkString("."), table_path) - // } - - // def getTableInfo(namespace: String, table_path: String): TableInfo = { - // val info = dbManager.getTableInfoByPath(table_path) - // if (info == null) { - // return null - // } - // val short_table_name = info.getTableName - // val partitions = info.getPartitions - // val properties = info.getProperties.toString() - // - // import scala.util.parsing.json.JSON - // val configuration = JSON.parseFull(properties) - // val configurationMap = configuration match { - // case Some(map: collection.immutable.Map[String, String]) => map - // } - // - // // table may have no partition at all or only have range or hash partition - // val partitionCols = Splitter.on(';').split(partitions).asScala.toArray - // val (range_column, hash_column) = partitionCols match { - // case Array(range, hash) => (range, hash) - // case _ => ("", "") - // } - // val bucket_num = configurationMap.get("hashBucketNum") match { - // case Some(value) => value.toInt - // case _ => -1 - // } - // TableInfo( - // namespace, - // Some(table_path), - // info.getTableId, - // info.getTableSchema, - // range_column, - // hash_column, - // bucket_num, - // configurationMap, - // if (short_table_name.equals("")) None else Some(short_table_name) - // ) - // } - - def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfo = { - val info = dbManager.getSinglePartitionInfo(table_id, range_value) - PartitionInfo( - table_id = info.getTableId, - range_value = range_value, - version = info.getVersion, - read_files = info.getSnapshotList.asScala.map(DBUtil.toJavaUUID).toArray, - expression = info.getExpression, - commit_op = info.getCommitOp.name - ) - } - - def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() - val info = dbManager.getSinglePartitionInfo(table_id, range_value, version) - partitionVersionBuffer += PartitionInfo( - table_id = info.getTableId, - range_value = range_value, - version = info.getVersion, - read_files = info.getSnapshotList.asScala.map(DBUtil.toJavaUUID).toArray, - expression = info.getExpression, - commit_op = info.getCommitOp.name - ) - partitionVersionBuffer.toArray - - } - - def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() - val res_itr = dbManager.getOnePartitionVersions(table_id, range_value).iterator() - while (res_itr.hasNext) { - val res = res_itr.next() - partitionVersionBuffer += PartitionInfo( - table_id = res.getTableId, - range_value = res.getPartitionDesc, - version = res.getVersion, - expression = res.getExpression, - commit_op = res.getCommitOp.name - ) - } - partitionVersionBuffer.toArray - - } - - def getLastedTimestamp(table_id: String, range_value: String): Long = { - dbManager.getLastedTimestamp(table_id, range_value) - } - - def getLastedVersionUptoTime(table_id: String, range_value: String, utcMills: Long): Int = { - dbManager.getLastedVersionUptoTime(table_id, range_value, utcMills) - } - - /* - if range_value is "", clean up all patitions; - if not "" , just one partition - */ - def cleanMetaUptoTime(table_id: String, range_value: String, utcMills: Long): List[String] = { - dbManager.getDeleteFilePath(table_id, range_value, utcMills).asScala.toList - } - - def getPartitionId(table_id: String, range_value: String): (Boolean, String) = { - (false, "") - } - - def getAllPartitionInfo(table_id: String): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() - val res_itr = dbManager.getAllPartitionInfo(table_id).iterator() - while (res_itr.hasNext) { - val res = res_itr.next() - partitionVersionBuffer += PartitionInfo( - table_id = res.getTableId, - range_value = res.getPartitionDesc, - version = res.getVersion, - read_files = res.getSnapshotList.asScala.map(DBUtil.toJavaUUID).toArray, - expression = res.getExpression, - commit_op = res.getCommitOp.name - ) - } - partitionVersionBuffer.toArray - } - - def rollbackPartitionInfoByVersion(table_id: String, range_value: String, toVersion: Int): Unit = { - dbManager.rollbackPartitionByVersion(table_id, range_value, toVersion); - } - - def updateTableSchema(table_name: String, - table_id: String, - table_schema: String, - config: Map[String, String], - new_read_version: Int): Unit = { - dbManager.updateTableSchema(table_id, table_schema) - } - - // def deleteTableInfo(table_name: String, table_id: String): Unit = { - // deleteTableInfo(table_name, table_id, LakeSoulCatalog.showCurrentNamespace().mkString(".")) - // } - - def deleteTableInfo(table_name: String, table_id: String, table_namespace: String): Unit = { - dbManager.deleteTableInfo(table_name, table_id, table_namespace) - } - - def deletePartitionInfoByTableId(table_id: String): Unit = { - dbManager.logicDeletePartitionInfoByTableId(table_id) - } - - def deletePartitionInfoByRangeId(table_id: String, range_value: String, range_id: String): Unit = { - dbManager.logicDeletePartitionInfoByRangeId(table_id, range_value) - } - - def dropNamespaceByNamespace(namespace: String): Unit = { - dbManager.deleteNamespace(namespace) - } - - def dropPartitionInfoByTableId(table_id: String): Unit = { - dbManager.deletePartitionInfoByTableId(table_id) - } - - def dropPartitionInfoByRangeId(table_id: String, range_value: String): Unit = { - dbManager.deletePartitionInfoByTableAndPartition(table_id, range_value) - } - - // def deleteShortTableName(short_table_name: String, table_name: String): Unit = { - // deleteShortTableName(short_table_name, table_name, LakeSoulCatalog.showCurrentNamespace().mkString(".")) - // } - - def deleteShortTableName(short_table_name: String, table_name: String, table_namespace: String): Unit = { - dbManager.deleteShortTableName(short_table_name, table_name, table_namespace) - } - - - def updateTableShortName(table_name: String, - table_id: String, - short_table_name: String, - table_namespace: String): Unit = { - dbManager.updateTableShortName(table_name, table_id, short_table_name, table_namespace) - } - - def cleanMeta(): Unit = { - dbManager.cleanMeta() - } - -} diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala deleted file mode 100644 index 940bcaa81..000000000 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala +++ /dev/null @@ -1,244 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - -package com.dmetasoul.lakesoul.meta - -import com.dmetasoul.lakesoul.meta.entity.{DataFileOp, FileOp, Uuid} -import com.google.common.collect.Lists -import org.apache.spark.internal.Logging -import org.apache.spark.sql.lakesoul.LakeSoulOptions -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, PartitionInfo} - -import java.util -import java.util.UUID -import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter} -import scala.collection.mutable.ArrayBuffer -import scala.collection.{JavaConverters, mutable} -import scala.util.control.Breaks - -object DataOperation extends Logging { - - def getTableDataInfo(partition_info_arr: Array[PartitionInfo]): Array[DataFileInfo] = { - - val file_info_buf = new ArrayBuffer[DataFileInfo]() - - for (partition_info <- partition_info_arr) { - - file_info_buf ++= getSinglePartitionDataInfo(partition_info) - } - - file_info_buf.toArray - } - - //get fies info in this partition that match the current read version - def getSinglePartitionDataInfo(partition_info: PartitionInfo): ArrayBuffer[DataFileInfo] = { - val file_arr_buf = new ArrayBuffer[DataFileInfo]() - val file_res_arr_buf = new ArrayBuffer[DataFileInfo]() - - val dupCheck = new mutable.HashSet[String]() - val metaPartitionInfo = entity.PartitionInfo.newBuilder - metaPartitionInfo.setTableId(partition_info.table_id) - metaPartitionInfo.setPartitionDesc(partition_info.range_value) - metaPartitionInfo.addAllSnapshot(JavaConverters.bufferAsJavaList(partition_info.read_files.map(uuid => DBUtil.toProtoUuid(uuid)).toBuffer)) - val dataCommitInfoList = MetaVersion.dbManager.getTableSinglePartitionDataInfo(metaPartitionInfo.build).asScala.toArray - for (metaDataCommitInfo <- dataCommitInfoList) { - val fileOps = metaDataCommitInfo.getFileOpsList.asScala.toArray - for (file <- fileOps) { - file_arr_buf += DataFileInfo( - partition_info.range_value, - file.getPath, - file.getFileOp.name, - file.getSize, - metaDataCommitInfo.getTimestamp, - file.getFileExistCols - ) - } - } - if (file_arr_buf.length > 1) { - for (i <- Range(file_arr_buf.size - 1, -1, -1)) { - if (file_arr_buf(i).file_op.equals("del")) { - dupCheck.add(file_arr_buf(i).path) - } else { - if (dupCheck.isEmpty || !dupCheck.contains(file_arr_buf(i).path)) { - file_res_arr_buf += file_arr_buf(i) - } - } - } - file_res_arr_buf.reverse - } else { - file_arr_buf.filter(_.file_op.equals("add")) - } - } - - def getSinglePartitionDataInfo(table_id: String, partition_desc: String, version: Int): ArrayBuffer[DataFileInfo] = { - val file_arr_buf = new ArrayBuffer[DataFileInfo]() - val file_res_arr_buf = new ArrayBuffer[DataFileInfo]() - val dupCheck = new mutable.HashSet[String]() - val dataCommitInfoList = MetaVersion.dbManager.getPartitionSnapshot(table_id, partition_desc, version).asScala.toArray - dataCommitInfoList.foreach(data_commit_info => { - val fileOps = data_commit_info.getFileOpsList.asScala.toArray - fileOps.foreach(file => { - file_arr_buf += DataFileInfo( - data_commit_info.getPartitionDesc, - file.getPath, - file.getFileOp.name, - file.getSize, - data_commit_info.getTimestamp, - file.getFileExistCols - ) - }) - }) - - if (file_arr_buf.length > 1) { - for (i <- Range(file_arr_buf.size - 1, -1, -1)) { - if (file_arr_buf(i).file_op.equals("del")) { - dupCheck.add(file_arr_buf(i).path) - } else { - if (dupCheck.isEmpty || !dupCheck.contains(file_arr_buf(i).path)) { - file_res_arr_buf += file_arr_buf(i) - } - } - } - file_res_arr_buf.reverse - } else { - file_arr_buf.filter(_.file_op.equals("add")) - } - } - - def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long, endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = { - if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ) || readType.equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) { - if (null == partition_desc || "".equals(partition_desc)) { - val partitions = MetaVersion.dbManager.getAllPartitionInfo(table_id) - val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]() - partitions.forEach(partition => { - val preVersionTimestamp = MetaVersion.dbManager.getLastedVersionTimestampUptoTime(table_id, partition.getPartitionDesc, startTimestamp) - files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition.getPartitionDesc, preVersionTimestamp, endTimestamp) - }) - files_all_partitions_buf - } else { - val preVersionTimestamp = MetaVersion.dbManager.getLastedVersionTimestampUptoTime(table_id, partition_desc, startTimestamp) - getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp) - } - } else { - val version = MetaVersion.dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp) - getSinglePartitionDataInfo(table_id, partition_desc, version) - } - } - - def getSinglePartitionIncrementalDataInfos(table_id: String, partition_desc: String, startVersionTimestamp: Long, endVersionTimestamp: Long): ArrayBuffer[DataFileInfo] = { - var preVersionUUIDs = new mutable.LinkedHashSet[UUID]() - var compactionUUIDs = new mutable.LinkedHashSet[UUID]() - var incrementalAllUUIDs = new mutable.LinkedHashSet[UUID]() - var updated: Boolean = false - val dataCommitInfoList = MetaVersion.dbManager.getIncrementalPartitionsFromTimestamp(table_id, partition_desc, startVersionTimestamp, endVersionTimestamp).asScala.toArray - // if (dataCommitInfoList.size < 2) { - // println("It is the latest version") - // return new ArrayBuffer[DataFileInfo]() - // } - var count: Int = 0 - val loop = new Breaks() - loop.breakable { - for (dataItem <- dataCommitInfoList) { - count += 1 - if ("UpdateCommit".equals(dataItem.getCommitOp) && startVersionTimestamp != dataItem.getTimestamp && count != 1) { - updated = true - loop.break() - } - if (startVersionTimestamp == dataItem.getTimestamp) { - preVersionUUIDs ++= dataItem.getSnapshotList.asScala.map(id => DBUtil.toJavaUUID(id)) - } else { - if ("CompactionCommit".equals(dataItem.getCommitOp)) { - val compactShotList = dataItem.getSnapshotList.asScala.map(id => DBUtil.toJavaUUID(id)).toArray - compactionUUIDs += compactShotList(0) - if (compactShotList.length > 1) { - incrementalAllUUIDs ++= compactShotList.slice(1, compactShotList.length) - } - } else { - incrementalAllUUIDs ++= dataItem.getSnapshotList.asScala.map(id => DBUtil.toJavaUUID(id)) - } - } - } - } - if (updated) { - println("Incremental query could not have update just for compaction merge and append") - new ArrayBuffer[DataFileInfo]() - } else { - val tmpUUIDs = incrementalAllUUIDs -- preVersionUUIDs - val resultUUID = tmpUUIDs -- compactionUUIDs - val file_arr_buf = new ArrayBuffer[DataFileInfo]() - val file_res_arr_buf = new ArrayBuffer[DataFileInfo]() - val dupCheck = new mutable.HashSet[String]() - val dataCommitInfoList = MetaVersion.dbManager.getDataCommitInfosFromUUIDs(table_id, partition_desc, Lists.newArrayList(resultUUID.map(DBUtil.toProtoUuid).asJava)).asScala.toArray - dataCommitInfoList.foreach(data_commit_info => { - val fileOps = data_commit_info.getFileOpsList.asScala.toArray - fileOps.foreach(file => { - file_arr_buf += DataFileInfo( - data_commit_info.getPartitionDesc, - file.getPath, - file.getFileOp.name, - file.getSize, - data_commit_info.getTimestamp, - file.getFileExistCols - ) - }) - }) - - if (file_arr_buf.length > 1) { - for (i <- Range(file_arr_buf.size - 1, -1, -1)) { - if (file_arr_buf(i).file_op.equals("del")) { - dupCheck.add(file_arr_buf(i).path) - } else { - if (dupCheck.isEmpty || !dupCheck.contains(file_arr_buf(i).path)) { - file_res_arr_buf += file_arr_buf(i) - } - } - } - file_res_arr_buf.reverse - } else { - file_arr_buf.filter(_.file_op.equals("add")) - } - } - } - - //add new data info to table data_info - def addNewDataFile(table_id: String, - range_value: String, - file_path: String, - commit_id: UUID, - file_op: String, - commit_type: String, - size: Long, - file_exist_cols: String, - modification_time: Long): Unit = { - val dataFileInfo = DataFileOp.newBuilder - dataFileInfo.setPath(file_path) - dataFileInfo.setFileOp(FileOp.valueOf(file_op)) - dataFileInfo.setSize(size) - dataFileInfo.setFileExistCols(file_exist_cols) - val file_arr_buf = new ArrayBuffer[DataFileOp]() - file_arr_buf += dataFileInfo.build - - val metaDataCommitInfoList = new util.ArrayList[entity.DataCommitInfo]() - val metaDataCommitInfo = entity.DataCommitInfo.newBuilder - metaDataCommitInfo.setTableId(table_id) - metaDataCommitInfo.setPartitionDesc(range_value) - metaDataCommitInfo.setCommitOp(entity.CommitOp.valueOf(commit_type)) - metaDataCommitInfo.setCommitId(DBUtil.toProtoUuid(commit_id)) - metaDataCommitInfo.addAllFileOps(JavaConverters.bufferAsJavaList(file_arr_buf)) - metaDataCommitInfo.setTimestamp(modification_time) - metaDataCommitInfoList.add(metaDataCommitInfo.build) - MetaVersion.dbManager.batchCommitDataCommitInfo(metaDataCommitInfoList) - - } - - def dropDataInfoData(table_id: String, range: String, commit_id: UUID): Unit = { - MetaVersion.dbManager.deleteDataCommitInfo(table_id, range, commit_id) - } - - def dropDataInfoData(table_id: String): Unit = { - MetaVersion.dbManager.deleteDataCommitInfo(table_id) - } - -} - diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala index 431e79f7f..16a4b1601 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/meta/MetaVersion.scala @@ -6,7 +6,7 @@ package com.dmetasoul.lakesoul.meta import com.alibaba.fastjson.JSONObject import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog -import org.apache.spark.sql.lakesoul.utils.{PartitionInfo, SparkUtil, TableInfo} +import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo} import java.util import java.util.UUID @@ -119,9 +119,9 @@ object MetaVersion { ) } - def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfo = { + def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfoScala = { val info = dbManager.getSinglePartitionInfo(table_id, range_value) - PartitionInfo( + PartitionInfoScala( table_id = info.getTableId, range_value = range_value, version = info.getVersion, @@ -131,10 +131,10 @@ object MetaVersion { ) } - def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() + def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfoScala] = { + val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]() val info = dbManager.getSinglePartitionInfo(table_id, range_value, version) - partitionVersionBuffer += PartitionInfo( + partitionVersionBuffer += PartitionInfoScala( table_id = info.getTableId, range_value = range_value, version = info.getVersion, @@ -146,12 +146,12 @@ object MetaVersion { } - def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() + def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfoScala] = { + val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]() val res_itr = dbManager.getOnePartitionVersions(table_id, range_value).iterator() while (res_itr.hasNext) { val res = res_itr.next() - partitionVersionBuffer += PartitionInfo( + partitionVersionBuffer += PartitionInfoScala( table_id = res.getTableId, range_value = res.getPartitionDesc, version = res.getVersion, @@ -183,12 +183,12 @@ object MetaVersion { (false, "") } - def getAllPartitionInfo(table_id: String): Array[PartitionInfo] = { - val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]() + def getAllPartitionInfo(table_id: String): Array[PartitionInfoScala] = { + val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]() val res_itr = dbManager.getAllPartitionInfo(table_id).iterator() while (res_itr.hasNext) { val res = res_itr.next() - partitionVersionBuffer += PartitionInfo( + partitionVersionBuffer += PartitionInfoScala( table_id = res.getTableId, range_value = res.getPartitionDesc, version = res.getVersion, diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala index 03ed8f657..b5eb0bf33 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala @@ -5,7 +5,7 @@ package org.apache.spark.sql.execution.datasources.v2.merge import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_FILE_EXISTS_COLUMN_SPLITTER, LAKESOUL_RANGE_PARTITION_SPLITTER} -import com.dmetasoul.lakesoul.meta.MetaVersion +import com.dmetasoul.lakesoul.meta.{DataFileInfo, MetaVersion} import java.util.{Locale, OptionalLong, TimeZone} import org.apache.hadoop.conf.Configuration @@ -28,7 +28,7 @@ import org.apache.spark.sql.sources.{EqualTo, Filter, Not} import org.apache.spark.sql.lakesoul._ import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil, TableInfo, TimestampFormatter} +import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo, TimestampFormatter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.{AnalysisException, SparkSession} diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala index 86769d846..40e9c76d4 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergePartitionedFileUtil.scala @@ -4,11 +4,12 @@ package org.apache.spark.sql.execution.datasources.v2.merge +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, TableInfo} +import org.apache.spark.sql.lakesoul.utils.TableInfo import org.apache.spark.sql.types.StructType object MergePartitionedFileUtil { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala index 492d8b540..6177fba80 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala @@ -4,7 +4,7 @@ package org.apache.spark.sql.lakesoul -import com.dmetasoul.lakesoul.meta.MetaUtils +import com.dmetasoul.lakesoul.meta.{DataFileInfo, MetaUtils} import java.net.URI import java.util.UUID @@ -14,7 +14,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.catalyst.expressions.Cast -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, DateFormatter, PartitionUtils, TimestampFormatter} +import org.apache.spark.sql.lakesoul.utils.{DateFormatter, PartitionUtils, TimestampFormatter} import org.apache.spark.sql.types.StringType import scala.collection.mutable.ArrayBuffer @@ -119,7 +119,7 @@ class DelayedCommitProtocol(jobId: String, val filePath = new Path(new URI(f._2)) val stat = fs.getFileStatus(filePath) - DataFileInfo(MetaUtils.getPartitionKeyFromList(f._1),fs.makeQualified(filePath).toString, "add", stat.getLen, stat.getModificationTime) + DataFileInfo(MetaUtils.getPartitionKeyFromList(f._1), fs.makeQualified(filePath).toString, "add", stat.getLen, stat.getModificationTime) } new TaskCommitMessage(statuses) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala index f5042d670..fddf0611d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulFileIndex.scala @@ -5,15 +5,14 @@ package org.apache.spark.sql.lakesoul import java.net.URI - import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, Literal} import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionSpec, PartitioningAwareFileIndex} import org.apache.spark.sql.lakesoul.LakeSoulFileIndexUtils._ -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.SparkUtil import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, SparkSession} -import com.dmetasoul.lakesoul.meta.{DataOperation, MetaUtils} +import com.dmetasoul.lakesoul.meta.{DataFileInfo, DataOperation, MetaUtils} import scala.collection.mutable @@ -40,8 +39,8 @@ abstract class LakeSoulFileIndexV2(val spark: SparkSession, override def refresh(): Unit = {} /** - * Returns all matching/valid files by the given `partitionFilters` and `dataFilters` - */ + * Returns all matching/valid files by the given `partitionFilters` and `dataFilters` + */ def matchingFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression] = Nil): Seq[DataFileInfo] @@ -121,9 +120,9 @@ case class DataSoulFileIndexV2(override val spark: SparkSession, /** - * A [[LakeSoulFileIndexV2]] that generates the list of files from a given list of files - * that are within a version range of SnapshotManagement. - */ + * A [[LakeSoulFileIndexV2]] that generates the list of files from a given list of files + * that are within a version range of SnapshotManagement. + */ case class BatchDataSoulFileIndexV2(override val spark: SparkSession, override val snapshotManagement: SnapshotManagement, files: Seq[DataFileInfo]) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulPartFileMerge.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulPartFileMerge.scala index 0600a537e..f99cb1335 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulPartFileMerge.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulPartFileMerge.scala @@ -4,12 +4,12 @@ package org.apache.spark.sql.lakesoul +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.hadoop.fs.Path import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 -import org.apache.spark.sql.lakesoul.exception.{MetaRerunException} -import org.apache.spark.sql.lakesoul.utils.DataFileInfo +import org.apache.spark.sql.lakesoul.exception.MetaRerunException import org.apache.spark.sql.util.CaseInsensitiveStringMap import scala.collection.JavaConversions._ diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala index 4954fe231..f3a5a2a5d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/LakeSoulUtils.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases @@ -18,7 +19,7 @@ import org.apache.spark.sql.lakesoul.catalog.{LakeSoulCatalog, LakeSoulTableV2} import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.rules.LakeSoulRelation import org.apache.spark.sql.lakesoul.sources.{LakeSoulBaseRelation, LakeSoulSourceUtils} -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, TableInfo} +import org.apache.spark.sql.lakesoul.utils.TableInfo import org.apache.spark.sql.sources.{EqualTo, Filter, Not} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.util.Utils diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/PartitionFilter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/PartitionFilter.scala index 7b8bc8bb7..b768af837 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/PartitionFilter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/PartitionFilter.scala @@ -4,10 +4,10 @@ package org.apache.spark.sql.lakesoul -import com.dmetasoul.lakesoul.meta.{DataOperation, MetaUtils} +import com.dmetasoul.lakesoul.meta.{DataFileInfo, DataOperation, MetaUtils} import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, PartitionFilterInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.{PartitionFilterInfo, SparkUtil} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession} @@ -38,7 +38,7 @@ object PartitionFilter { DataOperation.getTableDataInfo(partitionArray) } else { val partitionRangeValues = partitionsForScan(snapshot, filters).map(_.range_value).toSet - val partitionInfo =partitionArray.filter(p => partitionRangeValues.contains(p.range_value)) + val partitionInfo = partitionArray.filter(p => partitionRangeValues.contains(p.range_value)) DataOperation.getTableDataInfo(partitionInfo) } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/Snapshot.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/Snapshot.scala index 1926b268c..5f5e4b681 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/Snapshot.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/Snapshot.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul +import com.dmetasoul.lakesoul.meta.PartitionInfoScala import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -15,7 +16,7 @@ import org.apache.spark.sql.lakesoul.utils._ class Snapshot(table_info: TableInfo, - partition_info_arr: Array[PartitionInfo], + partition_info_arr: Array[PartitionInfoScala], is_first_commit: Boolean = false ) { private var partitionDesc: String = "" @@ -53,7 +54,7 @@ class Snapshot(table_info: TableInfo, def isFirstCommit: Boolean = is_first_commit - def getPartitionInfoArray: Array[PartitionInfo] = partition_info_arr + def getPartitionInfoArray: Array[PartitionInfoScala] = partition_info_arr override def toString: String = table_info + partition_info_arr.mkString(",") } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala index 7de9b6ae6..6dd5b652c 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/SnapshotManagement.scala @@ -4,7 +4,7 @@ package org.apache.spark.sql.lakesoul -import com.dmetasoul.lakesoul.meta.{MetaUtils, MetaVersion} +import com.dmetasoul.lakesoul.meta.{MetaUtils, MetaVersion, PartitionInfoScala} import com.google.common.cache.{CacheBuilder, RemovalNotification} import javolution.util.ReentrantLock import org.apache.hadoop.fs.Path @@ -15,7 +15,7 @@ import org.apache.spark.sql.lakesoul.LakeSoulOptions.ReadType import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.sources.{LakeSoulSQLConf, LakeSoulSourceUtils} -import org.apache.spark.sql.lakesoul.utils.{PartitionInfo, SparkUtil, TableInfo} +import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo} import org.apache.spark.sql.{AnalysisException, SparkSession} import java.io.File @@ -48,7 +48,7 @@ class SnapshotManagement(path: String, namespace: String) extends Logging { val table_id = "table_" + UUID.randomUUID().toString val table_info = TableInfo(table_namespace, Some(table_path), table_id) val partition_arr = Array( - PartitionInfo(table_id, MetaUtils.DEFAULT_RANGE_PARTITION_VALUE, 0) + PartitionInfoScala(table_id, MetaUtils.DEFAULT_RANGE_PARTITION_VALUE, 0) ) new Snapshot(table_info, partition_arr, true) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionCommit.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionCommit.scala index da1fce9fa..ce8c62011 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionCommit.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionCommit.scala @@ -208,7 +208,7 @@ trait Transaction extends TransactionalWrite with Logging { scan } - def getCompactionPartitionFiles(partitionInfo: PartitionInfo): Seq[DataFileInfo] = { + def getCompactionPartitionFiles(partitionInfo: PartitionInfoScala): Seq[DataFileInfo] = { val files = DataOperation.getSinglePartitionDataInfo(partitionInfo) readFiles ++= files @@ -228,7 +228,7 @@ trait Transaction extends TransactionalWrite with Logging { */ def commit(addFiles: Seq[DataFileInfo], expireFiles: Seq[DataFileInfo], - readPartitionInfo: Array[PartitionInfo]): Unit = { + readPartitionInfo: Array[PartitionInfoScala]): Unit = { commit(addFiles, expireFiles, "", -1, readPartitionInfo) } @@ -237,7 +237,7 @@ trait Transaction extends TransactionalWrite with Logging { expireFiles: Seq[DataFileInfo], query_id: String = "", //for streaming commit batch_id: Long = -1L, - readPartitionInfo: Array[PartitionInfo] = null): Unit = { + readPartitionInfo: Array[PartitionInfoScala] = null): Unit = { snapshotManagement.lockInterruptibly { assert(!committed, "Transaction already committed.") if (isFirstCommit) { @@ -265,7 +265,7 @@ trait Transaction extends TransactionalWrite with Logging { val add_file_arr_buf = new ArrayBuffer[DataCommitInfo]() - val add_partition_info_arr_buf = new ArrayBuffer[PartitionInfo]() + val add_partition_info_arr_buf = new ArrayBuffer[PartitionInfoScala]() val commit_type = commitType.getOrElse(CommitType("append")).name if (commit_type.equals(CommitType("update").name)) { @@ -302,7 +302,7 @@ trait Transaction extends TransactionalWrite with Logging { System.currentTimeMillis(), filter_files.toArray ) - add_partition_info_arr_buf += PartitionInfo( + add_partition_info_arr_buf += PartitionInfoScala( table_id = tableInfo.table_id, range_value = range_key, read_files = Array(addUUID) @@ -323,7 +323,7 @@ trait Transaction extends TransactionalWrite with Logging { System.currentTimeMillis(), changeFiles.toArray ) - add_partition_info_arr_buf += PartitionInfo( + add_partition_info_arr_buf += PartitionInfoScala( table_id = tableInfo.table_id, range_value = range_key, read_files = Array(addUUID) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index aef544661..1f4d78350 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -4,7 +4,7 @@ package org.apache.spark.sql.lakesoul -import com.dmetasoul.lakesoul.meta.CommitType +import com.dmetasoul.lakesoul.meta.{CommitType, DataFileInfo} import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER import org.apache.hadoop.fs.Path import org.apache.spark.sql.Dataset @@ -17,7 +17,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.schema.{InvariantCheckerExec, Invariants, SchemaUtils} import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.SparkUtil import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -146,16 +146,16 @@ trait TransactionalWrite { val physicalPlan = if (isCompaction) { val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey) - if(cdcCol.nonEmpty){ + if (cdcCol.nonEmpty) { val tmpSparkPlan = queryExecution.executedPlan val outColumns = outputSpec.outputColumns - val nonCdcAttrCols = outColumns.filter(p=>(!p.name.equalsIgnoreCase(cdcCol.get))) - val cdcAttrCol = outColumns.filter(p=>p.name.equalsIgnoreCase(cdcCol.get)) - val cdcCaseWhen = CaseWhen.createFromParser(Seq(EqualTo(cdcAttrCol(0),Literal("update")),Literal("insert"),cdcAttrCol(0))) - val alias = Alias(cdcCaseWhen,cdcCol.get)() + val nonCdcAttrCols = outColumns.filter(p => (!p.name.equalsIgnoreCase(cdcCol.get))) + val cdcAttrCol = outColumns.filter(p => p.name.equalsIgnoreCase(cdcCol.get)) + val cdcCaseWhen = CaseWhen.createFromParser(Seq(EqualTo(cdcAttrCol(0), Literal("update")), Literal("insert"), cdcAttrCol(0))) + val alias = Alias(cdcCaseWhen, cdcCol.get)() val allAttrCols = nonCdcAttrCols :+ alias - ProjectExec(allAttrCols,tmpSparkPlan) - }else{ + ProjectExec(allAttrCols, tmpSparkPlan) + } else { queryExecution.executedPlan } } else { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala index 3042e2de3..7ba14b1c6 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul.catalog +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, sources} @@ -17,7 +18,7 @@ import org.apache.spark.sql.execution.datasources.v2.merge.{MultiPartitionMergeB import org.apache.spark.sql.execution.datasources.v2.parquet.{EmptyParquetScan, NativeParquetScan, ParquetScan, StreamParquetScan} import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil, TableInfo} +import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo} import org.apache.spark.sql.lakesoul.{LakeSoulFileIndexV2, LakeSoulUtils} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/Command.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/Command.scala index f411a60c6..77f84760a 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/Command.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/Command.scala @@ -5,12 +5,12 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.lakesoul.TransactionCommit import org.apache.spark.sql.lakesoul.sources.LakeSoulBaseRelation -import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.{AnalysisException, SparkSession} /** diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala index 73acb28e6..21a737ba1 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala @@ -4,7 +4,7 @@ package org.apache.spark.sql.lakesoul.commands -import com.dmetasoul.lakesoul.meta.MetaVersion +import com.dmetasoul.lakesoul.meta.{DataFileInfo, MetaVersion, PartitionInfoScala} import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.cleanOldCommitOpDiskData import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -17,7 +17,6 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, Data import org.apache.spark.sql.functions.expr import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, PartitionInfo} import org.apache.spark.sql.lakesoul.{BatchDataSoulFileIndexV2, SnapshotManagement, TransactionCommit} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.{Dataset, Row, SparkSession} @@ -39,11 +38,11 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, def filterPartitionNeedCompact(spark: SparkSession, force: Boolean, - partitionInfo: PartitionInfo): Boolean = { + partitionInfo: PartitionInfoScala): Boolean = { partitionInfo.read_files.length >= 1 } - def executeCompaction(spark: SparkSession, tc: TransactionCommit, files: Seq[DataFileInfo], readPartitionInfo: Array[PartitionInfo]): Unit = { + def executeCompaction(spark: SparkSession, tc: TransactionCommit, files: Seq[DataFileInfo], readPartitionInfo: Array[PartitionInfoScala]): Unit = { if (readPartitionInfo.forall(p => p.commit_op.equals("CompactionCommit") && p.read_files.length == 1)) { logInfo("=========== All Partitions Have Compacted, This Operation Will Cancel!!! ===========") return diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala index 1ac3497a9..0daaa6a07 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CreateTableCommand.scala @@ -5,7 +5,7 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER -import com.dmetasoul.lakesoul.meta.MetaVersion +import com.dmetasoul.lakesoul.meta.{DataFileInfo, MetaVersion} import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -15,7 +15,7 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.schema.SchemaUtils -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil, TableInfo} +import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo} import org.apache.spark.sql.lakesoul.{LakeSoulOptions, LakeSoulTableProperties, SnapshotManagement, TransactionCommit} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.NativeIOUtils diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DeleteCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DeleteCommand.scala index 57f378904..17c2c6169 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DeleteCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DeleteCommand.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul.commands +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, InputFileName, Literal, Not} import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -12,7 +13,6 @@ import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableComm import org.apache.spark.sql.lakesoul._ import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DropTableCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DropTableCommand.scala index fbb8fad91..135ffee7a 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DropTableCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/DropTableCommand.scala @@ -9,7 +9,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper} import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.lakesoul.{PartitionFilter, Snapshot, SnapshotManagement} import java.util.concurrent.TimeUnit diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpdateCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpdateCommand.scala index ba5aafea1..a1958c0b5 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpdateCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpdateCommand.scala @@ -4,13 +4,13 @@ package org.apache.spark.sql.lakesoul.commands +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If, Literal} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.functions.input_file_name -import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.lakesoul.{LakeSoulUtils, SnapshotManagement, TransactionCommit} import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpsertCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpsertCommand.scala index ebf618664..7cf66e494 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpsertCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/UpsertCommand.scala @@ -5,12 +5,13 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_FILE_EXISTS_COLUMN_SPLITTER, LAKESOUL_RANGE_PARTITION_SPLITTER} +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.spark.sql.catalyst.expressions.And import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.lakesoul._ import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.SparkUtil //import org.apache.spark.sql.lakesoul.actions.AddFile import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression, PredicateHelper} diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/WriteIntoTable.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/WriteIntoTable.scala index 108fc3b62..d0df65531 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/WriteIntoTable.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/WriteIntoTable.scala @@ -4,12 +4,11 @@ package org.apache.spark.sql.lakesoul.commands -import com.dmetasoul.lakesoul.meta.MetaUtils +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.spark.sql._ -import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand} +import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.schema.ImplicitMetadataOperation -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, PartitionFilterInfo} import org.apache.spark.sql.lakesoul.{LakeSoulOptions, PartitionFilter, SnapshotManagement, TransactionCommit} /** @@ -22,10 +21,10 @@ import org.apache.spark.sql.lakesoul.{LakeSoulOptions, PartitionFilter, Snapshot * Existing Table Semantics * - The save mode will control how existing data is handled (i.e. overwrite, append, etc) * - The schema will of the DataFrame will be checked and if there are new columns present - * they will be added to the tables schema. Conflicting columns (i.e. a INT, and a STRING) - * will result in an exception + * they will be added to the tables schema. Conflicting columns (i.e. a INT, and a STRING) + * will result in an exception * - The partition columns, if present are validated against the existing metadata. If not - * present, then the partitioning of the table is respected. + * present, then the partitioning of the table is respected. * * In combination with `Overwrite`, a `replaceWhere` option can be used to transactionally * replace data that matches a predicate. @@ -64,7 +63,6 @@ case class WriteIntoTable(snapshotManagement: SnapshotManagement, /** @return (newFiles, deletedFiles) */ def write(tc: TransactionCommit, sparkSession: SparkSession): (Seq[DataFileInfo], Seq[DataFileInfo]) = { - import sparkSession.implicits._ val hashCols = if (tc.isFirstCommit) { hashPartitions diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/alterTableCommands.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/alterTableCommands.scala index 48fa8dcb0..61cc82bba 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/alterTableCommands.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/alterTableCommands.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul.commands +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, First} @@ -13,7 +14,6 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.schema.SchemaUtils -import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.lakesoul.{LakeSoulConfig, TransactionCommit} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/MetaRerunErrors.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/MetaRerunErrors.scala index 160e41642..c4a9cb97c 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/MetaRerunErrors.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/exception/MetaRerunErrors.scala @@ -4,11 +4,12 @@ package org.apache.spark.sql.lakesoul.exception -import org.apache.spark.sql.lakesoul.utils.PartitionInfo +import com.dmetasoul.lakesoul.meta.PartitionInfoScala + object MetaRerunErrors { - def fileChangedException(info: PartitionInfo, + def fileChangedException(info: PartitionInfoScala, file_path: String, write_version: Long, commit_id: String): MetaRerunException = { @@ -21,7 +22,7 @@ object MetaRerunErrors { commit_id) } - def fileDeletedException(info: PartitionInfo, + def fileDeletedException(info: PartitionInfoScala, file_path: String, write_version: Long, commit_id: String): MetaRerunException = { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSourceUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSourceUtils.scala index 325fa86ea..1f193c1cc 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSourceUtils.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSourceUtils.scala @@ -4,7 +4,7 @@ package org.apache.spark.sql.lakesoul.sources -import com.dmetasoul.lakesoul.meta.{MetaCommit, MetaUtils, MetaVersion} +import com.dmetasoul.lakesoul.meta.{DataFileInfo, MetaCommit, MetaUtils, MetaVersion} import java.util.Locale import org.apache.spark.rdd.RDD @@ -16,7 +16,7 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan} import org.apache.spark.sql.lakesoul.commands.WriteIntoTable -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil, TableInfo} +import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo} import org.apache.spark.sql.lakesoul.{LakeSoulOptions, SnapshotManagement} import org.apache.spark.sql.types.StructType diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/MetaData.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/MetaData.scala index 35e06bd42..778b841b5 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/MetaData.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/MetaData.scala @@ -5,35 +5,22 @@ package org.apache.spark.sql.lakesoul.utils import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_HASH_PARTITION_SPLITTER, LAKESOUL_RANGE_PARTITION_SPLITTER} -import com.dmetasoul.lakesoul.meta.{CommitState, CommitType} +import com.dmetasoul.lakesoul.meta.{CommitState, CommitType, DataFileInfo, PartitionInfoScala} import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.hadoop.fs.Path -import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.types.{DataType, StructType} import java.util.UUID case class MetaInfo(table_info: TableInfo, - partitionInfoArray: Array[PartitionInfo], + partitionInfoArray: Array[PartitionInfoScala], dataCommitInfo: Array[DataCommitInfo], commit_type: CommitType, commit_id: String = "", query_id: String = "", batch_id: Long = -1L, - readPartitionInfo: Array[PartitionInfo] = null) - -//range_value -> partition_desc -case class PartitionInfo(table_id: String, - range_value: String, - version: Int = -1, - read_files: Array[UUID] = Array.empty[UUID], - expression: String = "", - commit_op: String = "" - ) { - override def toString: String = { - s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}" - } -} + readPartitionInfo: Array[PartitionInfoScala] = null) + case class Format(provider: String = "parquet", options: Map[String, String] = Map.empty) @@ -104,25 +91,6 @@ case class TableInfo(namespace: String, lazy val format: Format = Format() } -//file_exist_cols col1,col2.col3 -case class DataFileInfo( - range_partitions: String, - path: String, - file_op: String, - size: Long, - modification_time: Long = -1L, - file_exist_cols: String = "" - ) { - lazy val range_version: String = range_partitions + "-" + file_exist_cols - - lazy val file_bucket_id: Int = BucketingUtils - .getBucketId(new Path(path).getName) - .getOrElse(sys.error(s"Invalid bucket file $path")) - - //trans to files which need to delete - def expire(deleteTime: Long): DataFileInfo = this.copy(modification_time = deleteTime) -} - //single file info case class DataCommitInfo(table_id: String, range_value: String, diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala index 650895a05..d08113411 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/utils/SparkUtil.scala @@ -4,7 +4,7 @@ package org.apache.spark.sql.lakesoul.utils -import com.dmetasoul.lakesoul.meta.{DataOperation, MetaUtils} +import com.dmetasoul.lakesoul.meta.{DataFileInfo, DataOperation, MetaUtils} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Expression diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala index 1a74f18a2..1cf726cd8 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul +import com.dmetasoul.lakesoul.meta.DataFileInfo import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.hadoop.fs.Path import org.apache.spark.SparkException @@ -17,7 +18,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog import org.apache.spark.sql.lakesoul.sources.LakeSoulSourceUtils import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.SparkUtil import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala index 89f3d95e4..81313531d 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul.commands +import com.dmetasoul.lakesoul.meta.DataFileInfo import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.TableIdentifier @@ -14,7 +15,7 @@ import org.apache.spark.sql.lakesoul.SnapshotManagement import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog import org.apache.spark.sql.lakesoul.sources.LakeSoulSourceUtils import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.SparkUtil import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} @@ -1169,7 +1170,7 @@ trait AlterTableByNameTests extends AlterTableTests { sql("ALTER TABLE lakesoul_test SET TBLPROPERTIES ('lakesoul_cdc_column'='cdc_change_kind')") val table = catalog.loadTable(Identifier.of(Array("default"), "lakesoul_test")) - getProperties(table) should contain ("lakesoul_cdc_column" -> "cdc_change_kind") + getProperties(table) should contain("lakesoul_cdc_column" -> "cdc_change_kind") } } } @@ -1204,7 +1205,7 @@ trait AlterTableByPathTests extends AlterTableLakeSoulTestBase { override protected def getSnapshotManagement(identifier: String): SnapshotManagement = { SnapshotManagement( SparkUtil.makeQualifiedTablePath(new Path(identifier.split("\\.") - .last.stripPrefix("`").stripSuffix("`"))).toString) + .last.stripPrefix("`").stripSuffix("`"))).toString) } override protected def ddlTest(testName: String)(f: String => Unit): Unit = { diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala index 022105bac..4ea122eb5 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul.schema +import com.dmetasoul.lakesoul.meta.DataFileInfo import org.apache.hadoop.fs.Path import java.io.File @@ -12,7 +13,7 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.SnapshotManagement import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.SparkUtil import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException} import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} import org.apache.spark.sql.types.StructType @@ -39,7 +40,7 @@ class CaseSensitivitySuite extends QueryTest } private def getPartitionValues(allFiles: Dataset[DataFileInfo], colName: String): Array[String] = { - allFiles.select(col(s"range_partitions")).distinct().as[String].collect() + allFiles.select(col(s"range_partitions")).distinct().as[String].collect() } diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/InvariantEnforcementSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/InvariantEnforcementSuite.scala index d5443a135..6c02d3fe8 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/InvariantEnforcementSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/schema/InvariantEnforcementSuite.scala @@ -4,6 +4,7 @@ package org.apache.spark.sql.lakesoul.schema +import com.dmetasoul.lakesoul.meta.DataFileInfo import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.hadoop.fs.Path import org.apache.spark.SparkException @@ -11,7 +12,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.lakesoul.SnapshotManagement import org.apache.spark.sql.lakesoul.schema.Invariants.{ArbitraryExpression, NotNull, PersistedExpression} import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils -import org.apache.spark.sql.lakesoul.utils.{DataFileInfo, SparkUtil} +import org.apache.spark.sql.lakesoul.utils.SparkUtil import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} import org.apache.spark.sql.types._ import org.junit.runner.RunWith