Skip to content

Commit

Permalink
optimize incremental read and fix compact operation cause data column…
Browse files Browse the repository at this point in the history
… disorder bug

Signed-off-by: fphantam <[email protected]>
  • Loading branch information
F-PHantam committed Oct 16, 2023
1 parent c4e9ba3 commit c01775f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public List<String> getDeleteFilePath(String tableId, String partitionDesc, long
if (StringUtils.isNotBlank(partitionDesc)) {
deleteSinglePartitionMetaInfo(tableId, partitionDesc, utcMills, fileOps, deleteFilePathList);
} else {
List<String> allPartitionDesc = partitionInfoDao.getAllPartitionDescByTableId(tableId);
List<String> allPartitionDesc = getTableAllPartitionDesc(tableId);
allPartitionDesc.forEach(partition -> deleteSinglePartitionMetaInfo(tableId, partition, utcMills, fileOps,
deleteFilePathList));
}
Expand Down Expand Up @@ -882,6 +882,10 @@ public void updateNamespaceProperties(String namespace, String properties) {
namespaceDao.updatePropertiesByNamespace(namespace, newProperties.toJSONString());
}

public List<String> getTableAllPartitionDesc(String tableId) {
return partitionInfoDao.getAllPartitionDescByTableId(tableId);
}

public void deleteNamespace(String namespace) {
namespaceDao.deleteByNamespace(namespace);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,40 @@ object DataOperation {

def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long,
endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = {
if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ) || readType
.equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) {
val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]()
if (readType.equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) {
if (null == partition_desc || "".equals(partition_desc)) {
val partitions = dbManager.getAllPartitionInfo(table_id)
val files_all_partitions_buf = new ArrayBuffer[DataFileInfo]()
val partitions = dbManager.getTableAllPartitionDesc(table_id)
partitions.forEach(partition => {
val preVersionTimestamp = dbManager
.getLastedVersionTimestampUptoTime(table_id, partition.getPartitionDesc, startTimestamp)
files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition.getPartitionDesc,
preVersionTimestamp, endTimestamp)
val version = dbManager.getLastedVersionUptoTime(table_id, partition, endTimestamp)
files_all_partitions_buf ++= getSinglePartitionDataInfo(table_id, partition, version)
})
files_all_partitions_buf
} else {
val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp)
getSinglePartitionDataInfo(table_id, partition_desc, version)
}
} else if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ)) {
if (null == partition_desc || "".equals(partition_desc)) {
val partitions = dbManager.getTableAllPartitionDesc(table_id)
partitions.forEach(partition => {
val preVersionTimestamp = dbManager.getLastedVersionTimestampUptoTime(table_id, partition, startTimestamp)
if (preVersionTimestamp == 0) {
val version = dbManager.getLastedVersionUptoTime(table_id, partition, endTimestamp)
files_all_partitions_buf ++= getSinglePartitionDataInfo(table_id, partition, version)
} else {
files_all_partitions_buf ++= getSinglePartitionIncrementalDataInfos(table_id, partition, preVersionTimestamp, endTimestamp)
}
})
files_all_partitions_buf
} else {
val preVersionTimestamp = dbManager.getLastedVersionTimestampUptoTime(table_id, partition_desc, startTimestamp)
getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp)
if (preVersionTimestamp == 0) {
val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp)
getSinglePartitionDataInfo(table_id, partition_desc, version)
} else {
getSinglePartitionIncrementalDataInfos(table_id, partition_desc, preVersionTimestamp, endTimestamp)
}
}
} else {
val version = dbManager.getLastedVersionUptoTime(table_id, partition_desc, endTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import org.apache.spark.sql.SparkSession
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.sqlToDataframe
import org.apache.hadoop.fs.Path

import java.time.{LocalDateTime, Period, ZoneOffset}
import java.time.{LocalDateTime, Period, ZoneId}

object CleanExpiredData {

private val conn = DBConnector.getConn
var serverTimeZone = ""

def main(args: Array[String]): Unit = {
val parameter = ParametersTool.fromArgs(args)
serverTimeZone = parameter.get("server.time.zone", serverTimeZone)

val spark: SparkSession = SparkSession.builder
.getOrCreate()
Expand Down Expand Up @@ -210,11 +213,11 @@ object CleanExpiredData {
}

def getExpiredDateZeroTimeStamp(days: Int): Long = {
val currentTime = LocalDateTime.now()
val currentTime = LocalDateTime.now(ZoneId.of(serverTimeZone))
val period = Period.ofDays(days)
val timeLine = currentTime.minus(period)
val zeroTime = timeLine.toLocalDate.atStartOfDay()
val expiredDateTimestamp = zeroTime.toInstant(ZoneOffset.UTC).toEpochMilli
val zeroTime = timeLine.toLocalDate.atStartOfDay(ZoneId.of(serverTimeZone))
val expiredDateTimestamp = zeroTime.toInstant().toEpochMilli
expiredDateTimestamp
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ package org.apache.spark.sql.lakesoul
import com.dmetasoul.lakesoul.meta.{CommitType, DataFileInfo}
import com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_RANGE_PARTITION_SPLITTER
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{Column, Dataset}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualTo, Literal}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, EqualNullSafe, EqualTo, Literal, Not}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.{ProjectExec, QueryExecution, SQLExecution}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, QueryExecution, SQLExecution}
import org.apache.spark.sql.functions.{col, expr, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.schema.{InvariantCheckerExec, Invariants, SchemaUtils}
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.utils.SparkUtil
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{BooleanType, StringType, StructType}
import org.apache.spark.util.SerializableConfiguration

import scala.collection.mutable
Expand Down Expand Up @@ -149,12 +150,13 @@ trait TransactionalWrite {
if (cdcCol.nonEmpty) {
val tmpSparkPlan = queryExecution.executedPlan
val outColumns = outputSpec.outputColumns
val nonCdcAttrCols = outColumns.filter(p => (!p.name.equalsIgnoreCase(cdcCol.get)))
val cdcAttrCol = outColumns.filter(p => p.name.equalsIgnoreCase(cdcCol.get))
val pos = outColumns.indexOf(cdcAttrCol(0))
val cdcCaseWhen = CaseWhen.createFromParser(Seq(EqualTo(cdcAttrCol(0), Literal("update")), Literal("insert"), cdcAttrCol(0)))
val alias = Alias(cdcCaseWhen, cdcCol.get)()
val allAttrCols = nonCdcAttrCols :+ alias
ProjectExec(allAttrCols, tmpSparkPlan)
val allAttrCols = outColumns.updated(pos, alias)
val filterCdcAdd = FilterExec(Not(EqualTo(cdcAttrCol(0), Literal("delete"))), tmpSparkPlan)
ProjectExec(allAttrCols, filterCdcAdd)
} else {
queryExecution.executedPlan
}
Expand Down

0 comments on commit c01775f

Please sign in to comment.