From c01775f626488ca7d8c4a3feef40b74a90c22d76 Mon Sep 17 00:00:00 2001 From: fphantam Date: Mon, 16 Oct 2023 18:16:37 +0800 Subject: [PATCH] optimize incremental read and fix compact operation cause data column disorder bug Signed-off-by: fphantam --- .../dmetasoul/lakesoul/meta/DBManager.java | 6 ++- .../lakesoul/meta/DataOperation.scala | 37 ++++++++++++++----- .../spark/clean/CleanExpiredData.scala | 11 ++++-- .../sql/lakesoul/TransactionalWrite.scala | 18 +++++---- 4 files changed, 50 insertions(+), 22 deletions(-) diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index 46ffc040e..d42bb88d4 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -200,7 +200,7 @@ public List getDeleteFilePath(String tableId, String partitionDesc, long if (StringUtils.isNotBlank(partitionDesc)) { deleteSinglePartitionMetaInfo(tableId, partitionDesc, utcMills, fileOps, deleteFilePathList); } else { - List allPartitionDesc = partitionInfoDao.getAllPartitionDescByTableId(tableId); + List allPartitionDesc = getTableAllPartitionDesc(tableId); allPartitionDesc.forEach(partition -> deleteSinglePartitionMetaInfo(tableId, partition, utcMills, fileOps, deleteFilePathList)); } @@ -882,6 +882,10 @@ public void updateNamespaceProperties(String namespace, String properties) { namespaceDao.updatePropertiesByNamespace(namespace, newProperties.toJSONString()); } + public List getTableAllPartitionDesc(String tableId) { + return partitionInfoDao.getAllPartitionDescByTableId(tableId); + } + public void deleteNamespace(String namespace) { namespaceDao.deleteByNamespace(namespace); } diff --git a/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala index 04c0250a3..f35de94d6 100644 --- a/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala +++ b/lakesoul-common/src/main/scala/com/dmetasoul/lakesoul/meta/DataOperation.scala @@ -158,21 +158,40 @@ object DataOperation { def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long, endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = { - if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ) || readType - .equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) { + val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]() + if (readType.equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) { if (null == partition_desc || "".equals(partition_desc)) { - val partitions = dbManager.getAllPartitionInfo(table_id) - val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]() + val partitions = dbManager.getTableAllPartitionDesc(table_id) partitions.forEach(partition => { - val preVersionTimestamp = dbManager - .getLastedVersionTimestampUptoTime(table_id, partition.getPartitionDesc, startTimestamp) - files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition.getPartitionDesc, - preVersionTimestamp, endTimestamp) + val version = dbManager.getLastedVersionUptoTime(table_id, partition, endTimestamp) + files_all_partitions_buf ++= getSinglePartitionDataInfo(table_id, partition, version) + }) + files_all_partitions_buf + } else { + val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp) + getSinglePartitionDataInfo(table_id, partition_desc, version) + } + } else if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ)) { + if (null == partition_desc || "".equals(partition_desc)) { + val partitions = dbManager.getTableAllPartitionDesc(table_id) + partitions.forEach(partition => { + val preVersionTimestamp = dbManager.getLastedVersionTimestampUptoTime(table_id, partition, startTimestamp) + if (preVersionTimestamp == 0) { + val version = dbManager.getLastedVersionUptoTime(table_id, partition, endTimestamp) + files_all_partitions_buf ++= getSinglePartitionDataInfo(table_id, partition, version) + } else { + files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition, preVersionTimestamp, endTimestamp) + } }) files_all_partitions_buf } else { val preVersionTimestamp = dbManager.getLastedVersionTimestampUptoTime(table_id, partition_desc, startTimestamp) - getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp) + if (preVersionTimestamp == 0) { + val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp) + getSinglePartitionDataInfo(table_id, partition_desc, version) + } else { + getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp) + } } } else { val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp) diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/spark/clean/CleanExpiredData.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/spark/clean/CleanExpiredData.scala index 277a67355..f79a91570 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/spark/clean/CleanExpiredData.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/spark/clean/CleanExpiredData.scala @@ -10,13 +10,16 @@ import org.apache.spark.sql.SparkSession import com.dmetasoul.lakesoul.spark.clean.CleanUtils.sqlToDataframe import org.apache.hadoop.fs.Path -import java.time.{LocalDateTime, Period, ZoneOffset} +import java.time.{LocalDateTime, Period, ZoneId} object CleanExpiredData { private val conn = DBConnector.getConn + var serverTimeZone = "" def main(args: Array[String]): Unit = { + val parameter = ParametersTool.fromArgs(args) + serverTimeZone = parameter.get("server.time.zone", serverTimeZone) val spark: SparkSession = SparkSession.builder .getOrCreate() @@ -210,11 +213,11 @@ object CleanExpiredData { } def getExpiredDateZeroTimeStamp(days: Int): Long = { - val currentTime = LocalDateTime.now() + val currentTime = LocalDateTime.now(ZoneId.of(serverTimeZone)) val period = Period.ofDays(days) val timeLine = currentTime.minus(period) - val zeroTime = timeLine.toLocalDate.atStartOfDay() - val expiredDateTimestamp = zeroTime.toInstant(ZoneOffset.UTC).toEpochMilli + val zeroTime = timeLine.toLocalDate.atStartOfDay(ZoneId.of(serverTimeZone)) + val expiredDateTimestamp = zeroTime.toInstant().toEpochMilli expiredDateTimestamp } } \ No newline at end of file diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index 1f4d78350..0cba56a72 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -7,18 +7,19 @@ package org.apache.spark.sql.lakesoul import com.dmetasoul.lakesoul.meta.{CommitType, DataFileInfo} import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER import org.apache.hadoop.fs.Path -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{Column, Dataset} import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualTo, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualNullSafe, EqualTo, Literal, Not} +import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker} -import org.apache.spark.sql.execution.{ProjectExec, QueryExecution, SQLExecution} -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, QueryExecution, SQLExecution} +import org.apache.spark.sql.functions.{col, expr, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.schema.{InvariantCheckerExec, Invariants, SchemaUtils} import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.lakesoul.utils.SparkUtil -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{BooleanType, StringType, StructType} import org.apache.spark.util.SerializableConfiguration import scala.collection.mutable @@ -149,12 +150,13 @@ trait TransactionalWrite { if (cdcCol.nonEmpty) { val tmpSparkPlan = queryExecution.executedPlan val outColumns = outputSpec.outputColumns - val nonCdcAttrCols = outColumns.filter(p => (!p.name.equalsIgnoreCase(cdcCol.get))) val cdcAttrCol = outColumns.filter(p => p.name.equalsIgnoreCase(cdcCol.get)) + val pos = outColumns.indexOf(cdcAttrCol(0)) val cdcCaseWhen = CaseWhen.createFromParser(Seq(EqualTo(cdcAttrCol(0), Literal("update")), Literal("insert"), cdcAttrCol(0))) val alias = Alias(cdcCaseWhen, cdcCol.get)() - val allAttrCols = nonCdcAttrCols :+ alias - ProjectExec(allAttrCols, tmpSparkPlan) + val allAttrCols = outColumns.updated(pos, alias) + val filterCdcAdd = FilterExec(Not(EqualTo(cdcAttrCol(0), Literal("delete"))), tmpSparkPlan) + ProjectExec(allAttrCols, filterCdcAdd) } else { queryExecution.executedPlan }