Skip to content

Commit

Permalink
merge paral_compact_editting into merge_main
Browse files Browse the repository at this point in the history
MR-title: compaction with file size condition in parallel
Created-by: hw_syl_zenghua
Author-id: 7155563
MR-id: 7278395
Commit-by: zenghua
Merged-by: hw_syl_zenghua
Description: merge "paral_compact_editting" into "merge_main"
fix ci

Signed-off-by: zenghua <[email protected]>,
compaction with file size condition in parallel

Signed-off-by: zenghua <[email protected]>

See merge request: 42b369588d84469d95d7b738fc58da8e/LakeSoul/for-nanhang!7
  • Loading branch information
hw_syl_zenghua authored and zenghua committed Nov 1, 2024
1 parent 334c76a commit f7a4d0e
Show file tree
Hide file tree
Showing 15 changed files with 258 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,23 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.vectorized.ArrowFakeRowAdaptor
import org.apache.spark.util.{SerializableConfiguration, Utils}
import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_NON_PARTITION_TABLE_PART_DESC, LAKESOUL_RANGE_PARTITION_SPLITTER}
import com.dmetasoul.lakesoul.meta.DBUtil
import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.splitCompactFilePath
import org.apache.spark.sql.lakesoul.DelayedCopyCommitProtocol
import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetCompactionColumnarOutputWriter, NativeParquetOutputWriter}
import org.apache.spark.sql.lakesoul.{DelayedCommitProtocol, DelayedCopyCommitProtocol}
import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}

import java.util.{Date, UUID}
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`

/** A helper object for writing FileFormat data out to a location. */
object LakeSoulFileWriter extends Logging {
val MAX_FILE_SIZE_KEY = "max_file_size"
val HASH_BUCKET_ID_KEY = "hash_bucket_id"
val SNAPPY_COMPRESS_RATIO = 3
val COPY_FILE_WRITER_KEY = "copy_file_writer"

/**
* Basic work flow of this command is:
* 1. Driver side setup, including output committer initialization and data source specific
Expand Down Expand Up @@ -178,7 +187,11 @@ object LakeSoulFileWriter extends Logging {
val nativeIOEnable = sparkSession.sessionState.conf.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE)

def nativeWrap(plan: SparkPlan): RDD[InternalRow] = {
if (isCompaction && !isCDC && !isBucketNumChanged && nativeIOEnable) {
if (isCompaction
&& staticBucketId != -1
&& !isCDC
&& !isBucketNumChanged
&& nativeIOEnable) {
plan match {
case withPartitionAndOrdering(_, _, child) =>
return nativeWrap(child)
Expand All @@ -203,8 +216,8 @@ object LakeSoulFileWriter extends Logging {
try {
// for compaction, we won't break ordering from batch scan
val (rdd, concurrentOutputWriterSpec) =
if (isCompaction && options.getOrElse("copyCompactedFile", "").nonEmpty) {
val data = Seq(InternalRow(options("copyCompactedFile")))
if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "false").toBoolean) {
val data = Seq(InternalRow(COPY_FILE_WRITER_KEY))
(sparkSession.sparkContext.parallelize(data), None)
} else if (!isBucketNumChanged && (orderingMatched || isCompaction)) {
(nativeWrap(empty2NullPlan), None)
Expand Down Expand Up @@ -410,6 +423,7 @@ object LakeSoulFileWriter extends Logging {
private var recordsInFile: Long = _
private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC)
.map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/"))
private val maxFileSize = options.get(MAX_FILE_SIZE_KEY)

/** Given an input row, returns the corresponding `bucketId` */
protected lazy val getBucketId: InternalRow => Int = {
Expand All @@ -419,26 +433,56 @@ object LakeSoulFileWriter extends Logging {
row => proj(row).getInt(0)
}

override protected def releaseCurrentWriter(): Unit = {
if (currentWriter != null) {
try {
currentWriter.close()
if (maxFileSize.isDefined) {
currentWriter.asInstanceOf[NativeParquetOutputWriter].flushResult.foreach(result => {
val (partitionDesc, flushResult) = result
val partitionDescList = if (partitionDesc == "-4") {
DBUtil.parsePartitionDesc(options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC)).asScala.toList
} else {
DBUtil.parsePartitionDesc(partitionDesc).asScala.toList
}
committer.asInstanceOf[DelayedCommitProtocol].addOutputFile(partitionDescList, flushResult.map(_.getFilePath).toList)
})
}
statsTrackers.foreach(_.closeFile(currentWriter.path()))
} finally {
currentWriter = null
}
}
}

private def newOutputWriter(record: InternalRow): Unit = {
recordsInFile = 0
releaseResources()

val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
val suffix = if (bucketSpec.isDefined) {
val bucketIdStr = if (partitionId == -1) {
BucketingUtils.bucketIdToString(getBucketId(record))
val bucketId = if (partitionId == -1) {
getBucketId(record)
} else {
BucketingUtils.bucketIdToString(partitionId)
partitionId
}
taskAttemptContext.getConfiguration.set(HASH_BUCKET_ID_KEY, bucketId.toString)

val bucketIdStr = BucketingUtils.bucketIdToString(bucketId)
f"$bucketIdStr.c$fileCounter%03d" + ext
} else {
f"-c$fileCounter%03d" + ext
}

if (maxFileSize.isDefined) {
taskAttemptContext.getConfiguration.set(MAX_FILE_SIZE_KEY, maxFileSize.get)
}

val currentPath = committer.newTaskTempFile(
taskAttemptContext,
partValue,
suffix)
if (maxFileSize.isDefined) "" else suffix
)

currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
Expand Down Expand Up @@ -473,21 +517,8 @@ object LakeSoulFileWriter extends Logging {
customMetrics: Map[String, SQLMetric] = Map.empty)
extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) {

private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC)
.map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/"))

/** Given an input row, returns the corresponding `bucketId` */
protected lazy val getSrcPath: InternalRow => String = {
row => row.get(0, StringType).asInstanceOf[String]
}

override def write(record: InternalRow): Unit = {
val dstPath = committer.newTaskTempFile(
taskAttemptContext,
partValue,
getSrcPath(record))

statsTrackers.foreach(_.newFile(dstPath))
logInfo("copy file")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.{EqualTo, Filter, Not}
import org.apache.spark.sql.lakesoul._
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT}
import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo, TimestampFormatter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -412,15 +413,34 @@ case class OnePartitionMergeBucketScan(sparkSession: SparkSession,
val fileWithBucketId = groupByPartition.head._2
.groupBy(_.fileBucketId).map(f => (f._1, f._2.toArray))

val fileNumLimit = options.getOrDefault(SCAN_FILE_NUMBER_LIMIT.key, Int.MaxValue.toString).toInt
val isCompactionTask = options.getOrDefault(COMPACTION_TASK.key, COMPACTION_TASK.defaultValueString).toBoolean

Seq.tabulate(bucketNum) { bucketId =>
var files = fileWithBucketId.getOrElse(bucketId, Array.empty)
val isSingleFile = files.length == 1
var groupedFiles = if (fileNumLimit < Int.MaxValue && isCompactionTask) {
val groupedFiles = new ArrayBuffer[Array[MergePartitionedFile]]
for (i <- files.indices by fileNumLimit) {
groupedFiles += files.slice(i, i + fileNumLimit)
}
groupedFiles.toArray
} else {
Array(files)
}

if (!isSingleFile) {
val versionFiles = for (version <- files.indices) yield files(version).copy(writeVersion = version + 1)
files = versionFiles.toArray
var allPartitionIsSingleFile = true
var isSingleFile = false

for (index <- groupedFiles.indices) {
isSingleFile = groupedFiles(index).length == 1
if (!isSingleFile) {
val versionFiles = for (elem <- groupedFiles(index).indices) yield groupedFiles(index)(elem).copy(writeVersion = elem)
groupedFiles(index) = versionFiles.toArray
allPartitionIsSingleFile = false
}
}
MergeFilePartition(bucketId, Array(files), isSingleFile)

MergeFilePartition(bucketId, groupedFiles, allPartitionIsSingleFile)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class NativeParquetFileFormat extends FileFormat

if (options.getOrElse("isCompaction", "false").toBoolean &&
!options.getOrElse("isCDC", "false").toBoolean &&
!options.getOrElse("isBucketNumChanged", "false").toBoolean
!options.getOrElse("isBucketNumChanged", "false").toBoolean &&
options.contains("staticBucketId")
) {
new OutputWriterFactory {
override def newInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.apache.spark.sql.execution.datasources.v2.parquet

import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter.FlushResult
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.types.pojo.Schema
Expand All @@ -18,19 +19,29 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{GlutenUtils, NativeIOUtils}

import java.util
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable

class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter {

val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE: Int = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)

private var recordCount = 0

var flushResult: mutable.Map[String, util.List[FlushResult]] = mutable.Map.empty

val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId)

protected val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema)

GlutenUtils.setArrowAllocator(nativeIOWriter)
nativeIOWriter.setRowGroupRowNumber(NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)
nativeIOWriter.addFile(path)
if (path.endsWith(".parquet")) {
nativeIOWriter.addFile(path)
} else {
nativeIOWriter.withPrefix(path)
}

NativeIOUtils.setNativeIOOptions(nativeIOWriter, NativeIOUtils.getNativeIOOptions(context, new Path(path)))

Expand Down Expand Up @@ -59,7 +70,7 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo
recordWriter.finish()

nativeIOWriter.write(root)
nativeIOWriter.flush()
flushResult = nativeIOWriter.flush().asScala

recordWriter.reset()
root.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,39 @@ class DelayedCommitProtocol(jobId: String,
}

override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFileName(taskContext, ext)
val partitionValues = dir.map(parsePartitions).getOrElse(List.empty[(String, String)])
val unescapedDir = if (partitionValues.nonEmpty) {
Some(partitionValues.map(partitionValue => partitionValue._1 + "=" + partitionValue._2).mkString("/"))
} else {
dir
}
val relativePath = randomPrefixLength.map { prefixLength =>
getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
}.orElse {
// or else write into the partition unescaped directory if it is partitioned
if (ext.isEmpty) {
unescapedDir
}.map { subDir =>
new Path(subDir, filename)
}.getOrElse(new Path(filename)) // or directly write out to the output path

val absolutePath = new Path(path, relativePath).toUri.toString
//returns the absolute path to the file
addedFiles.append((partitionValues, absolutePath))
absolutePath
.map(new Path(path, _))
.getOrElse(new Path(path))
.toUri.toString
} else {
val filename = getFileName(taskContext, ext)

val relativePath = randomPrefixLength.map { prefixLength =>
getRandomPrefix(prefixLength) // Generate a random prefix as a first choice
}.orElse {
// or else write into the partition unescaped directory if it is partitioned
unescapedDir
}.map { subDir =>
new Path(subDir, filename)
}.getOrElse(new Path(filename)) // or directly write out to the output path


val absolutePath = new Path(path, relativePath).toUri.toString
//returns the absolute path to the file
addedFiles.append((partitionValues, absolutePath))
absolutePath
}
}

def addOutputFile(partitionValues: List[(String, String)], files: List[String]): Unit = {
files.foreach(file => addedFiles.append((partitionValues, file)))
}

override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,16 @@ import scala.util.Random
/**
* Writes out the files to `path` and returns a list of them in `addedStatuses`.
*/
class DelayedCopyCommitProtocol(jobId: String,
class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo],
jobId: String,
dstPath: String,
randomPrefixLength: Option[Int])
extends DelayedCommitProtocol(jobId, dstPath, randomPrefixLength)
with Serializable with Logging {

@transient private var copyFiles: ArrayBuffer[(String, String)] = _

override def setupJob(jobContext: JobContext): Unit = {

}

override def abortJob(jobContext: JobContext): Unit = {
// TODO: Best effort cleanup
}

override def setupTask(taskContext: TaskAttemptContext): Unit = {
copyFiles = new ArrayBuffer[(String, String)]
}


override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val (srcCompactDir, srcBasePath) = splitCompactFilePath(ext)
copyFiles += dir.getOrElse("-5") -> ext
new Path(dstPath, srcBasePath).toString
throw new UnsupportedOperationException(
s"$this does not support adding files with an absolute path")
}

override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
Expand All @@ -57,15 +42,14 @@ class DelayedCopyCommitProtocol(jobId: String,

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {

if (copyFiles.nonEmpty) {
val fs = new Path(copyFiles.head._2).getFileSystem(taskContext.getConfiguration)
val statuses = copyFiles.map { f =>
val (partitionDesc, srcPath) = f
val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcPath)
if (srcFiles.nonEmpty) {
val fs = new Path(srcFiles.head.path).getFileSystem(taskContext.getConfiguration)
val statuses = srcFiles.map { srcFile =>
val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcFile.path)
val dstFile = new Path(dstPath, srcBasePath)
FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration)
FileUtil.copy(fs, new Path(srcFile.path), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration)
val status = fs.getFileStatus(dstFile)
DataFileInfo(partitionDesc, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime)
DataFileInfo(srcFile.range_partitions, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime)
}

new TaskCommitMessage(statuses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.COPY_FILE_WRITER_KEY
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, LakeSoulFileWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.functions.{col, when}
Expand Down Expand Up @@ -103,7 +104,8 @@ trait TransactionalWrite {
*/
def writeFiles(oriData: Dataset[_],
writeOptions: Option[LakeSoulOptions],
isCompaction: Boolean): (Seq[DataFileInfo], Path) = {
isCompaction: Boolean,
copyCompactedFile: Seq[DataFileInfo] = Seq.empty): (Seq[DataFileInfo], Path) = {
val spark = oriData.sparkSession
// LakeSoul always writes timestamp data with timezone=UTC
spark.conf.set("spark.sql.session.timeZone", "UTC")
Expand Down Expand Up @@ -160,7 +162,7 @@ trait TransactionalWrite {
options.put("isBucketNumChanged", "false")
}
val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
if (cdcCol.nonEmpty) {
if (cdcCol.nonEmpty && copyCompactedFile.isEmpty) {
options.put("isCDC", "true")
val cdcColName = cdcCol.get
if (writeOptions.forall(_.options.getOrElse("fullCompaction", "true").equals("true"))) {
Expand Down Expand Up @@ -214,10 +216,9 @@ trait TransactionalWrite {
output.length < data.schema.size)
}

val committer = if (writeOptions.exists(_.options.getOrElse("copyCompactedFile", "").nonEmpty)) {
val srcPath = writeOptions.get.options.get("copyCompactedFile")
options.put("copyCompactedFile", srcPath.get)
new DelayedCopyCommitProtocol("lakesoul", outputPath.toString, None)
val committer = if (copyCompactedFile.nonEmpty) {
options.put(COPY_FILE_WRITER_KEY, "true")
new DelayedCopyCommitProtocol(copyCompactedFile, "lakesoul", outputPath.toString, None)
} else {
getCommitter(outputPath)
}
Expand Down
Loading

0 comments on commit f7a4d0e

Please sign in to comment.