Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Refine background jobs #565

Merged
merged 5 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ public static LinkedHashMap<String, String> parsePartitionDesc(String partitionD
public static void fillDataSourceConfig(HikariConfig config) {
config.setConnectionTimeout(10000);
config.setIdleTimeout(10000);
config.setMaximumPoolSize(2);
config.setMaximumPoolSize(8);
config.setKeepaliveTime(5000);
config.setMinimumIdle(0);
config.setMaxLifetime(1800000);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
Expand Down Expand Up @@ -396,4 +397,4 @@ public static long parseMemoryExpression(String memoryExpression) {
return 0;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ object SparkMetaVersion {
dbManager.listTablePathsByNamespace(namespace.mkString("."))
}

def getTableInfo(table_path: String): TableInfo = {
getTableInfo(LakeSoulCatalog.showCurrentNamespace().mkString("."), table_path)
}

def getTableInfo(namespace: String, table_path: String): TableInfo = {
def getTableInfoByPath(table_path: String): TableInfo = {
val path = SparkUtil.makeQualifiedPath(table_path).toUri.toString
val info = dbManager.getTableInfoByPath(path)
if (info == null) {
Expand All @@ -105,7 +101,7 @@ object SparkMetaVersion {
case _ => -1
}
TableInfo(
namespace,
info.getTableNamespace,
Some(table_path),
info.getTableId,
info.getTableSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@

package com.dmetasoul.lakesoul.spark.clean

import com.dmetasoul.lakesoul.meta.DBConnector
import com.dmetasoul.lakesoul.spark.ParametersTool
import org.apache.spark.sql.SparkSession
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.sqlToDataframe
import com.dmetasoul.lakesoul.spark.clean.CleanUtils.{executeMetaSql, sqlToDataframe}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

import java.time.{LocalDateTime, Period, ZoneId}
import java.util.TimeZone

object CleanExpiredData {

private val conn = DBConnector.getConn
var serverTimeZone = TimeZone.getDefault.getID
private var defaultPartitionTTL: Int = -1
private var defaultRedundantTTL: Int = -1
Expand Down Expand Up @@ -206,8 +204,7 @@ object CleanExpiredData {
| partition_desc='$partitionDesc')
|""".stripMargin

val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def cleanSinglePartitionInfo(tableId: String, partitionDesc: String, deadTimestamp: Long): Unit = {
Expand All @@ -221,8 +218,7 @@ object CleanExpiredData {
|AND
| timestamp < $deadTimestamp
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def getLatestCommitTimestamp(table_id: String, partitionDesc: String, spark: SparkSession): Long = {
Expand Down Expand Up @@ -299,4 +295,4 @@ object CleanExpiredData {
val expiredDateTimestamp = zeroTime.toInstant().toEpochMilli
expiredDateTimestamp
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ object CleanUtils {
}

def sqlToDataframe(sql: String, spark: SparkSession): DataFrame = {
val stmt = conn.prepareStatement(sql)
val resultSet = stmt.executeQuery()
createResultSetToDF(resultSet, spark)
tryWithResource(DBConnector.getConn) { conn =>
tryWithResource(conn.prepareStatement(sql)) { stmt =>
val resultSet = stmt.executeQuery()
createResultSetToDF(resultSet, spark)
}
}
}

def setTableDataExpiredDays(tablePath: String, expiredDays: Int): Unit = {
Expand All @@ -85,8 +88,7 @@ object CleanUtils {
|SET properties = properties::jsonb || '{"partition.ttl": "$expiredDays"}'::jsonb
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def setCompactionExpiredDays(tablePath: String, expiredDays: Int): Unit = {
Expand All @@ -96,8 +98,7 @@ object CleanUtils {
|SET properties = properties::jsonb || '{"compaction.ttl": "$expiredDays"}'::jsonb
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def setTableOnlySaveOnceCompactionValue(tablePath: String, value: Boolean): Unit = {
Expand All @@ -107,8 +108,7 @@ object CleanUtils {
|SET properties = properties::jsonb || '{"only_save_once_compaction": "$value"}'::jsonb
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def cancelTableDataExpiredDays(tablePath: String): Unit = {
Expand All @@ -118,8 +118,7 @@ object CleanUtils {
|SET properties = properties::jsonb - 'partition.ttl'
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def cancelCompactionExpiredDays(tablePath: String): Unit = {
Expand All @@ -129,8 +128,7 @@ object CleanUtils {
|SET properties = properties::jsonb - 'compaction.ttl'
|WHERE table_id = (SELECT table_id from table_info where table_path='$tablePath');
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def setPartitionInfoTimestamp(tableId: String, timestamp: Long, version: Int): Unit = {
Expand All @@ -141,8 +139,7 @@ object CleanUtils {
|WHERE table_id = '$tableId'
|AND version = $version
|""".stripMargin
val stmt = conn.prepareStatement(sql)
stmt.execute()
executeMetaSql(sql)
}

def readPartitionInfo(tableId: String, spark: SparkSession): DataFrame = {
Expand All @@ -164,4 +161,17 @@ object CleanUtils {
|""".stripMargin
sqlToDataframe(sql, spark)
}
}

def tryWithResource[R <: AutoCloseable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
}

def executeMetaSql(sql: String): Unit = {
tryWithResource(DBConnector.getConn) { conn =>
tryWithResource(conn.prepareStatement(sql)) { stmt =>
stmt.execute()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ trait LakeSoulTableOperations extends AnalysisHelper {
fileNumLimit: Option[Int],
newBucketNum: Option[Int],
fileSizeLimit: Option[Long]): Unit = {
val t = snapshotManagement.getTableInfoOnly
sparkSession.sparkContext.setJobDescription(
s"Compact(${t.namespace}.${t.short_table_name.getOrElse(t.table_path)}/$condition" +
s",f=$force,c=$cleanOldCompaction,n=$fileNumLimit,s=$fileSizeLimit,b=$newBucketNum)")
toDataset(sparkSession, CompactionCommand(
snapshotManagement,
condition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SnapshotManagement(path: String, namespace: String) extends Logging {
def snapshot: Snapshot = currentSnapshot

private def createSnapshot: Snapshot = {
val table_info = SparkMetaVersion.getTableInfo(table_namespace, table_path)
val table_info = SparkMetaVersion.getTableInfoByPath(table_path)
val partition_info_arr = SparkMetaVersion.getAllPartitionInfo(table_info.table_id)

if (table_info.table_schema.isEmpty) {
Expand Down Expand Up @@ -83,7 +83,7 @@ class SnapshotManagement(path: String, namespace: String) extends Logging {
//get table info only
def getTableInfoOnly: TableInfo = {
if (LakeSoulSourceUtils.isLakeSoulTableExists(table_path)) {
SparkMetaVersion.getTableInfo(table_path)
SparkMetaVersion.getTableInfoByPath(table_path)
} else {
val table_id = "table_" + UUID.randomUUID().toString
TableInfo(table_namespace, Some(table_path), table_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class LakeSoulCatalog(val spark: SparkSession) extends TableCatalog
case _ => identifier
}
if (isPathIdentifier(ident)) {
val tableInfo = SparkMetaVersion.getTableInfo(ident.namespace().mkString("."), ident.name())
val tableInfo = SparkMetaVersion.getTableInfoByPath(ident.name())
if (tableInfo == null) {
throw new NoSuchTableException(ident)
}
Expand Down Expand Up @@ -624,7 +624,7 @@ object LakeSoulCatalog {

def listTables(namespaces: Array[String]): Array[Identifier] = {
SparkMetaVersion.listTables(namespaces).asScala.map(tablePath => {
val tableInfo = SparkMetaVersion.getTableInfo(tablePath)
val tableInfo = SparkMetaVersion.getTableInfoByPath(tablePath)
Identifier.of(namespaces, tableInfo.short_table_name.getOrElse(tableInfo.table_path.toUri.toString))
}).toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class LakeSoulSinkSuite extends StreamTest with LakeSoulTestUtils {
(1, 1000), (2, 2000), (3, 3000))

val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(outputDir.getCanonicalPath)).toString)
val tableInfo = SparkMetaVersion.getTableInfo(snapshotManagement.table_path)
val tableInfo = SparkMetaVersion.getTableInfoByPath(snapshotManagement.table_path)
assert(tableInfo.hash_column.equals("id")
&& tableInfo.range_column.isEmpty
&& tableInfo.bucket_num == 2)
Expand Down Expand Up @@ -237,7 +237,7 @@ class LakeSoulSinkSuite extends StreamTest with LakeSoulTestUtils {
(1, 1000), (2, 2000), (3, 3000))

val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(outputDir.getCanonicalPath)).toString)
val tableInfo = SparkMetaVersion.getTableInfo(snapshotManagement.table_path)
val tableInfo = SparkMetaVersion.getTableInfoByPath(snapshotManagement.table_path)
assert(tableInfo.range_column.equals("id")
&& tableInfo.hash_column.isEmpty
&& tableInfo.bucket_num == -1)
Expand Down Expand Up @@ -285,7 +285,7 @@ class LakeSoulSinkSuite extends StreamTest with LakeSoulTestUtils {
(1, 1, 1000), (2, 2, 2000), (3, 3, 3000))

val snapshotManagement = SnapshotManagement(SparkUtil.makeQualifiedTablePath(new Path(outputDir.getCanonicalPath)).toString)
val tableInfo = SparkMetaVersion.getTableInfo(snapshotManagement.table_path)
val tableInfo = SparkMetaVersion.getTableInfoByPath(snapshotManagement.table_path)
assert(tableInfo.range_column.equals("range")
&& tableInfo.hash_column.equals("hash")
&& tableInfo.bucket_num == 2)
Expand Down
Loading