Skip to content

Commit

Permalink
clean old compaction data and redundant data (#304)
Browse files Browse the repository at this point in the history
* new branch to clean old compaction and redundant data

Signed-off-by: ChenYunHey <[email protected]>

* add file head for new files

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored Aug 22, 2023
1 parent 8c72b73 commit f1328cf
Show file tree
Hide file tree
Showing 9 changed files with 1,243 additions and 247 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0

package com.dmetasoul.lakesoul.spark.clean

import com.dmetasoul.lakesoul.meta.DBConnector
import com.dmetasoul.lakesoul.spark.ParametersTool
import org.apache.spark.sql.SparkSession
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.sqlToDataframe
import org.apache.hadoop.fs.Path

import java.time.{LocalDateTime, Period, ZoneOffset}

object CleanExpiredData {

private val conn = DBConnector.getConn

def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession.builder
.getOrCreate()
cleanAllPartitionExpiredData(spark)

}

def cleanAllPartitionExpiredData(spark: SparkSession): Unit = {
val sql =
"""
|SELECT DISTINCT
| p.table_id,
| partition_desc,
| (properties::json)->>'partition.ttl' AS partition_ttl,
| (properties::json)->>'compaction.ttl' AS compaction_ttl
|FROM
| partition_info p
|JOIN
| table_info t
|ON
| p.table_id=t.table_id;
|""".stripMargin
val partitionRows = sqlToDataframe(sql, spark).rdd.collect()
partitionRows.foreach(p => {
val tableId = p.get(0).toString
val partitionDesc = p.get(1).toString
val latestCompactionTimestamp = getLatestCompactionTimestamp(tableId, partitionDesc, p.get(3), spark)
val latestCommitTimestamp = getLatestCommitTimestamp(tableId, partitionDesc, spark)
//no compaction action
if (latestCompactionTimestamp == 0L) {
if (p.get(2) != null) {
val partitionTtlMils = getExpiredDateZeroTimeStamp(p.get(2).toString.toInt)
if (partitionTtlMils > latestCommitTimestamp) {
cleanSinglePartitionExpiredDiskData(tableId, partitionDesc, partitionTtlMils, spark)
cleanSingleDataCommitInfo(tableId, partitionDesc, partitionTtlMils)
cleanSinglePartitionInfo(tableId, partitionDesc, partitionTtlMils)
}
}
}
else if (p.get(2) == null && p.get(3) != null) {
val compactionTtlMils = getExpiredDateZeroTimeStamp(p.get(3).toString.toInt)
if (compactionTtlMils > latestCompactionTimestamp) {
cleanSinglePartitionExpiredDiskData(tableId, partitionDesc, latestCompactionTimestamp, spark)
cleanSingleDataCommitInfo(tableId, partitionDesc, latestCompactionTimestamp)
cleanSinglePartitionInfo(tableId, partitionDesc, latestCompactionTimestamp)
}
}
else if (p.get(2) != null && p.get(3) == null) {
val partitionTtlMils = getExpiredDateZeroTimeStamp(p.get(2).toString.toInt)
if (partitionTtlMils > latestCommitTimestamp) {
cleanSinglePartitionExpiredDiskData(tableId, partitionDesc, partitionTtlMils, spark)
cleanSingleDataCommitInfo(tableId, partitionDesc, partitionTtlMils)
cleanSinglePartitionInfo(tableId, partitionDesc, partitionTtlMils)
}
}
else if (p.get(2) != null && p.get(3) != null) {
val compactionTtlMils = getExpiredDateZeroTimeStamp(p.get(3).toString.toInt)
val partitionTtlMils = getExpiredDateZeroTimeStamp(p.get(2).toString.toInt)
if (partitionTtlMils > latestCommitTimestamp) {
cleanSinglePartitionExpiredDiskData(tableId, partitionDesc, partitionTtlMils, spark)
cleanSingleDataCommitInfo(tableId, partitionDesc, partitionTtlMils)
cleanSinglePartitionInfo(tableId, partitionDesc, partitionTtlMils)
}
else if (partitionTtlMils <= latestCommitTimestamp && compactionTtlMils > latestCompactionTimestamp) {
cleanSinglePartitionExpiredDiskData(tableId, partitionDesc, latestCompactionTimestamp, spark)
cleanSingleDataCommitInfo(tableId, partitionDesc, latestCompactionTimestamp)
cleanSinglePartitionInfo(tableId, partitionDesc, latestCompactionTimestamp)
}
}
})
}

def cleanSinglePartitionExpiredDiskData(tableId: String, partitionDesc: String, deadTimestamp: Long, spark: SparkSession): Unit = {
val sql =
s"""
|SELECT file_op.path AS path
|FROM
|(SELECT file_ops
|FROM data_commit_info
|WHERE commit_id
|IN
|(
|SELECT
| DISTINCT unnest(snapshot) AS commit_id
|FROM partition_info
|WHERE
| timestamp<$deadTimestamp
|AND
| table_id='$tableId'
|AND
| partition_desc='$partitionDesc'
| )
| )t
|CROSS JOIN LATERAL (
| SELECT
| (file_op_data).path
| FROM unnest(file_ops) AS file_op_data
|) AS file_op
|""".stripMargin
sqlToDataframe(sql, spark).rdd.collect().foreach(p => {
val path = new Path(p.get(0).toString)
val sessionHadoopConf = spark.sessionState.newHadoopConf()
val fs = path.getFileSystem(sessionHadoopConf)
fs.delete(path, true)
})
}

def cleanSingleDataCommitInfo(tableId: String, partitionDesc: String, deadTimestamp: Long): Unit = {
val sql =
s"""
|DELETE FROM data_commit_info
|WHERE commit_id IN
|(
|SELECT
| DISTINCT unnest(snapshot) AS commit_id
|FROM partition_info
|WHERE
| timestamp< $deadTimestamp
|AND
| table_id = '$tableId'
|AND
| partition_desc='$partitionDesc')
|""".stripMargin

val stmt = conn.prepareStatement(sql)
stmt.execute()
}

def cleanSinglePartitionInfo(tableId: String, partitionDesc: String, deadTimestamp: Long): Unit = {
val sql =
s"""
|DELETE FROM partition_info
|WHERE (table_id,partition_desc,version)
|IN
|(SELECT
| table_id,
| partition_desc,
| version
|FROM partition_info
|WHERE
| timestamp < $deadTimestamp
|AND
| table_id = '$tableId'
|AND
| partition_desc='$partitionDesc')
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
}

def getLatestCommitTimestamp(table_id: String, partitionDesc: String, spark: SparkSession): Long = {
val sql =
s"""
|SELECT
| max(timestamp) max_time
|from
| partition_info
|where
| table_id='$table_id'
|AND partition_desc = '$partitionDesc'
|
|""".stripMargin
sqlToDataframe(sql, spark).select("max_time").first().getLong(0)
}

def getLatestCompactionTimestamp(table_id: String, partitionDesc: String, expiredDaysField: Any, spark: SparkSession): Long = {
var latestTimestampMils = 0L
if (expiredDaysField != null) {
val expiredDateZeroTimeMils = getExpiredDateZeroTimeStamp(expiredDaysField.toString.toInt)
val sql =
s"""
|SELECT DISTINCT ON (table_id)
| table_id,
| commit_op,
| timestamp
| FROM
| partition_info
| WHERE
| commit_op in ('CompactionCommit','UpdateCommit')
| AND partition_desc = '$partitionDesc'
| AND table_id = '$table_id'
| AND timestamp < '$expiredDateZeroTimeMils'
| ORDER BY
| table_id,
| timestamp DESC
|""".stripMargin
if (sqlToDataframe(sql, spark).count() > 0)
latestTimestampMils = sqlToDataframe(sql, spark).select("timestamp").first().getLong(0)
}
latestTimestampMils
}

def getExpiredDateZeroTimeStamp(days: Int): Long = {
val currentTime = LocalDateTime.now()
val period = Period.ofDays(days)
val timeLine = currentTime.minus(period)
val zeroTime = timeLine.toLocalDate.atStartOfDay()
val expiredDateTimestamp = zeroTime.toInstant(ZoneOffset.UTC).toEpochMilli
expiredDateTimestamp
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0

package com.dmetasoul.lakesoul.spark.clean

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.sqlToDataframe
import sun.java2d.marlin.MarlinUtils.logInfo

import scala.collection.mutable.Set

object CleanOldCompaction {

def cleanOldCommitOpDiskData(tablePath: String, spark: SparkSession): Unit = {
val sql =
s"""
|SELECT DISTINCT table_id,
| partition_desc
|FROM partition_info
|WHERE table_id =
|(
| SELECT table_id FROM table_info
| WHERE table_path = '$tablePath'
|);
|""".stripMargin
val partitionRows = sqlToDataframe(sql, spark).rdd.collect()
partitionRows.foreach(p => {
val table_id = p.get(0).toString
val partition_desc = p.get(1).toString
cleanSinglePartitionCompactionDataInDisk(table_id, partition_desc, spark)
})

}

def cleanSinglePartitionCompactionDataInDisk(tableId: String, partitionDesc: String, spark: SparkSession): Unit = {
val pathSet = Set.empty[String]
val sql =
s"""
|WITH expiredCommit AS (
| SELECT DISTINCT ON (table_id)
| table_id,
| commit_op,
| partition_desc,
| timestamp
| FROM partition_info
| WHERE commit_op = 'CompactionCommit'
| AND partition_desc = '$partitionDesc'
| AND table_id= '$tableId'
| ORDER BY
| table_id,
| timestamp DESC
|)
|SELECT file_op.path AS path
|FROM (
| SELECT file_ops
| FROM data_commit_info
| WHERE commit_id IN (
| SELECT DISTINCT unnest(pi.snapshot) AS commit_id
| FROM partition_info pi
| LEFT JOIN expiredCommit ec ON pi.table_id = ec.table_id
| WHERE pi.timestamp < ec.timestamp
| AND pi.commit_op = 'CompactionCommit'
| AND pi.partition_desc = ec.partition_desc
| )
|) t
|CROSS JOIN LATERAL (
| SELECT
| (file_op_data).path,
| (file_op_data).file_op
| FROM unnest(file_ops) AS file_op_data
|) AS file_op
|WHERE file_op.file_op = 'add';
|""".stripMargin

sqlToDataframe(sql, spark).rdd.collect().foreach(p => {
pathSet.add(getPath(p.get(0).toString))

})
pathSet.foreach(p => {
val path = new Path(p)
val sessionHadoopConf = spark.sessionState.newHadoopConf()
val fs = path.getFileSystem(sessionHadoopConf)
fs.delete(path, true)
})
}

def getPath(filePath: String): String = {
val targetString = "compact_"
var directoryPath = ""
val lastCompactIndex = filePath.lastIndexOf(targetString)
if (lastCompactIndex != -1) {
val nextDirectoryIndex = filePath.indexOf("/", lastCompactIndex)
if (nextDirectoryIndex != -1) {
directoryPath = filePath.substring(0, nextDirectoryIndex)
}
}
directoryPath
}
}
Loading

0 comments on commit f1328cf

Please sign in to comment.