From f7a4d0e5d78f4a98c62691b412778ac9dadfd689 Mon Sep 17 00:00:00 2001 From: hw_syl_zenghua Date: Thu, 31 Oct 2024 19:51:11 +0800 Subject: [PATCH] merge paral_compact_editting into merge_main MR-title: compaction with file size condition in parallel Created-by: hw_syl_zenghua Author-id: 7155563 MR-id: 7278395 Commit-by: zenghua Merged-by: hw_syl_zenghua Description: merge "paral_compact_editting" into "merge_main" fix ci Signed-off-by: zenghua , compaction with file size condition in parallel Signed-off-by: zenghua See merge request: 42b369588d84469d95d7b738fc58da8e/LakeSoul/for-nanhang!7 --- .../datasources/LakeSoulFileWriter.scala | 75 ++++++++++----- .../v2/merge/MergeParquetScan.scala | 30 +++++- .../v2/parquet/NativeParquetFileFormat.scala | 3 +- .../parquet/NativeParquetOutputWriter.scala | 15 ++- .../sql/lakesoul/DelayedCommitProtocol.scala | 39 +++++--- .../lakesoul/DelayedCopyCommitProtocol.scala | 36 ++----- .../sql/lakesoul/TransactionalWrite.scala | 13 +-- .../catalog/LakeSoulScanBuilder.scala | 1 - .../lakesoul/commands/CompactionCommand.scala | 96 +++++++------------ .../lakesoul/sources/LakeSoulSQLConf.scala | 19 ++++ .../spark/sql/vectorized/NativeIOUtils.scala | 18 +++- .../lakesoul/commands/CompactionSuite.scala | 71 ++++++++------ .../lakesoul/lakesoul/io/NativeIOWriter.java | 9 ++ rust/lakesoul-io/src/lakesoul_io_config.rs | 4 + rust/lakesoul-io/src/lakesoul_writer.rs | 4 + 15 files changed, 258 insertions(+), 175 deletions(-) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala index 880a99649..da78e4b5b 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala @@ -34,14 +34,23 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.vectorized.ArrowFakeRowAdaptor import org.apache.spark.util.{SerializableConfiguration, Utils} import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_NON_PARTITION_TABLE_PART_DESC, LAKESOUL_RANGE_PARTITION_SPLITTER} +import com.dmetasoul.lakesoul.meta.DBUtil import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.splitCompactFilePath -import org.apache.spark.sql.lakesoul.DelayedCopyCommitProtocol +import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetCompactionColumnarOutputWriter, NativeParquetOutputWriter} +import org.apache.spark.sql.lakesoul.{DelayedCommitProtocol, DelayedCopyCommitProtocol} import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} import java.util.{Date, UUID} +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` /** A helper object for writing FileFormat data out to a location. */ object LakeSoulFileWriter extends Logging { + val MAX_FILE_SIZE_KEY = "max_file_size" + val HASH_BUCKET_ID_KEY = "hash_bucket_id" + val SNAPPY_COMPRESS_RATIO = 3 + val COPY_FILE_WRITER_KEY = "copy_file_writer" + /** * Basic work flow of this command is: * 1. Driver side setup, including output committer initialization and data source specific @@ -178,7 +187,11 @@ object LakeSoulFileWriter extends Logging { val nativeIOEnable = sparkSession.sessionState.conf.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE) def nativeWrap(plan: SparkPlan): RDD[InternalRow] = { - if (isCompaction && !isCDC && !isBucketNumChanged && nativeIOEnable) { + if (isCompaction + && staticBucketId != -1 + && !isCDC + && !isBucketNumChanged + && nativeIOEnable) { plan match { case withPartitionAndOrdering(_, _, child) => return nativeWrap(child) @@ -203,8 +216,8 @@ object LakeSoulFileWriter extends Logging { try { // for compaction, we won't break ordering from batch scan val (rdd, concurrentOutputWriterSpec) = - if (isCompaction && options.getOrElse("copyCompactedFile", "").nonEmpty) { - val data = Seq(InternalRow(options("copyCompactedFile"))) + if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "false").toBoolean) { + val data = Seq(InternalRow(COPY_FILE_WRITER_KEY)) (sparkSession.sparkContext.parallelize(data), None) } else if (!isBucketNumChanged && (orderingMatched || isCompaction)) { (nativeWrap(empty2NullPlan), None) @@ -410,6 +423,7 @@ object LakeSoulFileWriter extends Logging { private var recordsInFile: Long = _ private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC) .map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/")) + private val maxFileSize = options.get(MAX_FILE_SIZE_KEY) /** Given an input row, returns the corresponding `bucketId` */ protected lazy val getBucketId: InternalRow => Int = { @@ -419,26 +433,56 @@ object LakeSoulFileWriter extends Logging { row => proj(row).getInt(0) } + override protected def releaseCurrentWriter(): Unit = { + if (currentWriter != null) { + try { + currentWriter.close() + if (maxFileSize.isDefined) { + currentWriter.asInstanceOf[NativeParquetOutputWriter].flushResult.foreach(result => { + val (partitionDesc, flushResult) = result + val partitionDescList = if (partitionDesc == "-4") { + DBUtil.parsePartitionDesc(options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC)).asScala.toList + } else { + DBUtil.parsePartitionDesc(partitionDesc).asScala.toList + } + committer.asInstanceOf[DelayedCommitProtocol].addOutputFile(partitionDescList, flushResult.map(_.getFilePath).toList) + }) + } + statsTrackers.foreach(_.closeFile(currentWriter.path())) + } finally { + currentWriter = null + } + } + } + private def newOutputWriter(record: InternalRow): Unit = { recordsInFile = 0 releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) val suffix = if (bucketSpec.isDefined) { - val bucketIdStr = if (partitionId == -1) { - BucketingUtils.bucketIdToString(getBucketId(record)) + val bucketId = if (partitionId == -1) { + getBucketId(record) } else { - BucketingUtils.bucketIdToString(partitionId) + partitionId } + taskAttemptContext.getConfiguration.set(HASH_BUCKET_ID_KEY, bucketId.toString) + + val bucketIdStr = BucketingUtils.bucketIdToString(bucketId) f"$bucketIdStr.c$fileCounter%03d" + ext } else { f"-c$fileCounter%03d" + ext } + if (maxFileSize.isDefined) { + taskAttemptContext.getConfiguration.set(MAX_FILE_SIZE_KEY, maxFileSize.get) + } + val currentPath = committer.newTaskTempFile( taskAttemptContext, partValue, - suffix) + if (maxFileSize.isDefined) "" else suffix + ) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, @@ -473,21 +517,8 @@ object LakeSoulFileWriter extends Logging { customMetrics: Map[String, SQLMetric] = Map.empty) extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC) - .map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/")) - - /** Given an input row, returns the corresponding `bucketId` */ - protected lazy val getSrcPath: InternalRow => String = { - row => row.get(0, StringType).asInstanceOf[String] - } - override def write(record: InternalRow): Unit = { - val dstPath = committer.newTaskTempFile( - taskAttemptContext, - partValue, - getSrcPath(record)) - - statsTrackers.foreach(_.newFile(dstPath)) + logInfo("copy file") } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala index 5137c2007..0f339ea0d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.{EqualTo, Filter, Not} import org.apache.spark.sql.lakesoul._ import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf +import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT} import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo, TimestampFormatter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -412,15 +413,34 @@ case class OnePartitionMergeBucketScan(sparkSession: SparkSession, val fileWithBucketId = groupByPartition.head._2 .groupBy(_.fileBucketId).map(f => (f._1, f._2.toArray)) + val fileNumLimit = options.getOrDefault(SCAN_FILE_NUMBER_LIMIT.key, Int.MaxValue.toString).toInt + val isCompactionTask = options.getOrDefault(COMPACTION_TASK.key, COMPACTION_TASK.defaultValueString).toBoolean + Seq.tabulate(bucketNum) { bucketId => var files = fileWithBucketId.getOrElse(bucketId, Array.empty) - val isSingleFile = files.length == 1 + var groupedFiles = if (fileNumLimit < Int.MaxValue && isCompactionTask) { + val groupedFiles = new ArrayBuffer[Array[MergePartitionedFile]] + for (i <- files.indices by fileNumLimit) { + groupedFiles += files.slice(i, i + fileNumLimit) + } + groupedFiles.toArray + } else { + Array(files) + } - if (!isSingleFile) { - val versionFiles = for (version <- files.indices) yield files(version).copy(writeVersion = version + 1) - files = versionFiles.toArray + var allPartitionIsSingleFile = true + var isSingleFile = false + + for (index <- groupedFiles.indices) { + isSingleFile = groupedFiles(index).length == 1 + if (!isSingleFile) { + val versionFiles = for (elem <- groupedFiles(index).indices) yield groupedFiles(index)(elem).copy(writeVersion = elem) + groupedFiles(index) = versionFiles.toArray + allPartitionIsSingleFile = false + } } - MergeFilePartition(bucketId, Array(files), isSingleFile) + + MergeFilePartition(bucketId, groupedFiles, allPartitionIsSingleFile) } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala index e47044cb5..bc817a9a2 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala @@ -31,7 +31,8 @@ class NativeParquetFileFormat extends FileFormat if (options.getOrElse("isCompaction", "false").toBoolean && !options.getOrElse("isCDC", "false").toBoolean && - !options.getOrElse("isBucketNumChanged", "false").toBoolean + !options.getOrElse("isBucketNumChanged", "false").toBoolean && + options.contains("staticBucketId") ) { new OutputWriterFactory { override def newInstance( diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala index c4d599ef8..9cf9f9139 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala @@ -5,6 +5,7 @@ package org.apache.spark.sql.execution.datasources.v2.parquet import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter +import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter.FlushResult import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.types.pojo.Schema @@ -18,19 +19,29 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.{GlutenUtils, NativeIOUtils} +import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.mutable + class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter { val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE: Int = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) private var recordCount = 0 + var flushResult: mutable.Map[String, util.List[FlushResult]] = mutable.Map.empty + val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId) protected val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema) GlutenUtils.setArrowAllocator(nativeIOWriter) nativeIOWriter.setRowGroupRowNumber(NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) - nativeIOWriter.addFile(path) + if (path.endsWith(".parquet")) { + nativeIOWriter.addFile(path) + } else { + nativeIOWriter.withPrefix(path) + } NativeIOUtils.setNativeIOOptions(nativeIOWriter, NativeIOUtils.getNativeIOOptions(context, new Path(path))) @@ -59,7 +70,7 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo recordWriter.finish() nativeIOWriter.write(root) - nativeIOWriter.flush() + flushResult = nativeIOWriter.flush().asScala recordWriter.reset() root.close() diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala index 086012cf0..0c82d0215 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala @@ -90,26 +90,39 @@ class DelayedCommitProtocol(jobId: String, } override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val filename = getFileName(taskContext, ext) val partitionValues = dir.map(parsePartitions).getOrElse(List.empty[(String, String)]) val unescapedDir = if (partitionValues.nonEmpty) { Some(partitionValues.map(partitionValue => partitionValue._1 + "=" + partitionValue._2).mkString("/")) } else { dir } - val relativePath = randomPrefixLength.map { prefixLength => - getRandomPrefix(prefixLength) // Generate a random prefix as a first choice - }.orElse { - // or else write into the partition unescaped directory if it is partitioned + if (ext.isEmpty) { unescapedDir - }.map { subDir => - new Path(subDir, filename) - }.getOrElse(new Path(filename)) // or directly write out to the output path - - val absolutePath = new Path(path, relativePath).toUri.toString - //returns the absolute path to the file - addedFiles.append((partitionValues, absolutePath)) - absolutePath + .map(new Path(path, _)) + .getOrElse(new Path(path)) + .toUri.toString + } else { + val filename = getFileName(taskContext, ext) + + val relativePath = randomPrefixLength.map { prefixLength => + getRandomPrefix(prefixLength) // Generate a random prefix as a first choice + }.orElse { + // or else write into the partition unescaped directory if it is partitioned + unescapedDir + }.map { subDir => + new Path(subDir, filename) + }.getOrElse(new Path(filename)) // or directly write out to the output path + + + val absolutePath = new Path(path, relativePath).toUri.toString + //returns the absolute path to the file + addedFiles.append((partitionValues, absolutePath)) + absolutePath + } + } + + def addOutputFile(partitionValues: List[(String, String)], files: List[String]): Unit = { + files.foreach(file => addedFiles.append((partitionValues, file))) } override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala index b46be810e..4284f3026 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala @@ -23,31 +23,16 @@ import scala.util.Random /** * Writes out the files to `path` and returns a list of them in `addedStatuses`. */ -class DelayedCopyCommitProtocol(jobId: String, +class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo], + jobId: String, dstPath: String, randomPrefixLength: Option[Int]) extends DelayedCommitProtocol(jobId, dstPath, randomPrefixLength) with Serializable with Logging { - @transient private var copyFiles: ArrayBuffer[(String, String)] = _ - - override def setupJob(jobContext: JobContext): Unit = { - - } - - override def abortJob(jobContext: JobContext): Unit = { - // TODO: Best effort cleanup - } - - override def setupTask(taskContext: TaskAttemptContext): Unit = { - copyFiles = new ArrayBuffer[(String, String)] - } - - override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val (srcCompactDir, srcBasePath) = splitCompactFilePath(ext) - copyFiles += dir.getOrElse("-5") -> ext - new Path(dstPath, srcBasePath).toString + throw new UnsupportedOperationException( + s"$this does not support adding files with an absolute path") } override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { @@ -57,15 +42,14 @@ class DelayedCopyCommitProtocol(jobId: String, override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { - if (copyFiles.nonEmpty) { - val fs = new Path(copyFiles.head._2).getFileSystem(taskContext.getConfiguration) - val statuses = copyFiles.map { f => - val (partitionDesc, srcPath) = f - val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcPath) + if (srcFiles.nonEmpty) { + val fs = new Path(srcFiles.head.path).getFileSystem(taskContext.getConfiguration) + val statuses = srcFiles.map { srcFile => + val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcFile.path) val dstFile = new Path(dstPath, srcBasePath) - FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration) + FileUtil.copy(fs, new Path(srcFile.path), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration) val status = fs.getFileStatus(dstFile) - DataFileInfo(partitionDesc, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime) + DataFileInfo(srcFile.range_partitions, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime) } new TaskCommitMessage(statuses) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index 455e652b8..66109a1d3 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -12,6 +12,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.COPY_FILE_WRITER_KEY import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, LakeSoulFileWriter, WriteJobStatsTracker} import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.functions.{col, when} @@ -103,7 +104,8 @@ trait TransactionalWrite { */ def writeFiles(oriData: Dataset[_], writeOptions: Option[LakeSoulOptions], - isCompaction: Boolean): (Seq[DataFileInfo], Path) = { + isCompaction: Boolean, + copyCompactedFile: Seq[DataFileInfo] = Seq.empty): (Seq[DataFileInfo], Path) = { val spark = oriData.sparkSession // LakeSoul always writes timestamp data with timezone=UTC spark.conf.set("spark.sql.session.timeZone", "UTC") @@ -160,7 +162,7 @@ trait TransactionalWrite { options.put("isBucketNumChanged", "false") } val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey) - if (cdcCol.nonEmpty) { + if (cdcCol.nonEmpty && copyCompactedFile.isEmpty) { options.put("isCDC", "true") val cdcColName = cdcCol.get if (writeOptions.forall(_.options.getOrElse("fullCompaction", "true").equals("true"))) { @@ -214,10 +216,9 @@ trait TransactionalWrite { output.length < data.schema.size) } - val committer = if (writeOptions.exists(_.options.getOrElse("copyCompactedFile", "").nonEmpty)) { - val srcPath = writeOptions.get.options.get("copyCompactedFile") - options.put("copyCompactedFile", srcPath.get) - new DelayedCopyCommitProtocol("lakesoul", outputPath.toString, None) + val committer = if (copyCompactedFile.nonEmpty) { + options.put(COPY_FILE_WRITER_KEY, "true") + new DelayedCopyCommitProtocol(copyCompactedFile, "lakesoul", outputPath.toString, None) } else { getCommitter(outputPath) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala index eb47fa81d..830371192 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/catalog/LakeSoulScanBuilder.scala @@ -116,7 +116,6 @@ case class LakeSoulScanBuilder(sparkSession: SparkSession, } val writableOptions = mutable.Map.empty[String, String] ++ options.asScala if (fileIndex.snapshotManagement.snapshot.getPartitionInfoArray.forall(p => p.commit_op.equals("CompactionCommit"))) { - println(s"set NATIVE_IO_IS_COMPACTED with ${fileIndex.snapshotManagement.snapshot.getPartitionInfoArray.mkString("Array(", ", ", ")")}") writableOptions.put(NATIVE_IO_IS_COMPACTED.key, "true") } val updatedOptions = new CaseInsensitiveStringMap(writableOptions.asJava) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala index f74a35bf5..b0df3475c 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala @@ -14,13 +14,14 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.{MAX_FILE_SIZE_KEY, SNAPPY_COMPRESS_RATIO} import org.apache.spark.sql.execution.datasources.v2.merge.MergeDeltaParquetScan import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetScan, ParquetScan} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} -import org.apache.spark.sql.functions.{expr, forall} +import org.apache.spark.sql.functions.expr import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.RENAME_COMPACTED_FILE +import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT} import org.apache.spark.sql.lakesoul.utils.TableInfo import org.apache.spark.sql.lakesoul.{BatchDataSoulFileIndexV2, LakeSoulOptions, SnapshotManagement, TransactionCommit} import org.apache.spark.sql.types.StructType @@ -29,9 +30,9 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.util.Utils import java.util.UUID +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConverters._ case class CompactionCommand(snapshotManagement: SnapshotManagement, conditionString: String, @@ -69,8 +70,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, files: Seq[DataFileInfo], readPartitionInfo: Array[PartitionInfoScala], compactionPath: String, - fullCompaction: Boolean, - copyCompactedFile: String = ""): List[DataCommitInfo] = { + copySrcFiles: Boolean = false): List[DataCommitInfo] = { if (newBucketNum.isEmpty && readPartitionInfo.forall(p => p.commit_op.equals("CompactionCommit") && p.read_files.length == 1)) { logInfo("=========== All Partitions Have Compacted, This Operation Will Cancel!!! ===========") return List.empty @@ -85,7 +85,11 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, Option(mergeOperatorInfo) ) val option = new CaseInsensitiveStringMap( - Map("basePath" -> tc.tableInfo.table_path_s.get, "isCompaction" -> "true").asJava) + Map("basePath" -> tc.tableInfo.table_path_s.get, + "isCompaction" -> "true", + SCAN_FILE_NUMBER_LIMIT.key -> fileNumLimit.getOrElse(Int.MaxValue).toString, + COMPACTION_TASK.key -> "true" + ).asJava) val partitionNames = readPartitionInfo.head.range_value.split(',').map(p => { p.split('=').head @@ -123,13 +127,19 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, val map = mutable.HashMap[String, String]() map.put("isCompaction", "true") map.put("compactionPath", compactionPath) - map.put("fullCompaction", fullCompaction.toString) - if (copyCompactedFile.nonEmpty) { - map.put("copyCompactedFile", copyCompactedFile) + + val copyCompactedFiles = if (copySrcFiles) { + files + } else { + Seq.empty } if (readPartitionInfo.nonEmpty) { map.put("partValue", readPartitionInfo.head.range_value) } + if (fileSizeLimit.isDefined) { + map.put("fullCompaction", "false") + map.put(MAX_FILE_SIZE_KEY, (fileSizeLimit.get * SNAPPY_COMPRESS_RATIO).toString) + } if (bucketNumChanged) { map.put("newBucketNum", newBucketNum.get.toString) } else if (tableInfo.hash_partition_columns.nonEmpty) { @@ -140,7 +150,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, } logInfo(s"write CompactData with Option=$map") - val (newFiles, path) = tc.writeFiles(compactDF, Some(new LakeSoulOptions(map.toMap, spark.sessionState.conf)), isCompaction = true) + val (newFiles, path) = tc.writeFiles(compactDF, Some(new LakeSoulOptions(map.toMap, spark.sessionState.conf)), isCompaction = true, copyCompactedFiles) tc.createDataCommitInfo(newFiles, Seq.empty, "", -1)._1 } @@ -195,63 +205,21 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, def compactSinglePartition(sparkSession: SparkSession, tc: TransactionCommit, files: Seq[DataFileInfo], sourcePartition: PartitionInfoScala): String = { logInfo(s"Compacting Single Partition=${sourcePartition} with ${files.length} files") - val bucketedFiles = if (tableInfo.hash_partition_columns.isEmpty || bucketNumChanged) { - Seq(-1 -> files) + val (copyFiles, scanFiles) = if (fileSizeLimit.isEmpty || bucketNumChanged || force) { + (Seq.empty, files) } else { - files.groupBy(_.file_bucket_id) + files.splitAt(files.indexWhere(_.size < fileSizeLimit.get * 0.5)) } - val compactionPath = newCompactPath - val allDataCommitInfo = bucketedFiles.flatMap(groupByBucketId => { - val (bucketId, files) = groupByBucketId - val groupedFiles = if (fileNumLimit.isDefined || fileSizeLimit.isDefined) { - val groupedFiles = new ArrayBuffer[Seq[DataFileInfo]] - var groupHead = 0 - var groupSize = 0L - var groupFileCount = 0 - for (i <- files.indices) { - // each group contains at least one file - if (i == groupHead) { - groupSize += files(i).size - groupFileCount += 1 - } else if (fileSizeLimit.exists(groupSize + files(i).size > _) || fileNumLimit.exists(groupFileCount + 1 > _)) { - // if the file size limit is reached, or the file count limit is reached, we need to start a new group - groupedFiles += files.slice(groupHead, i) - groupHead = i - groupSize = files(i).size - groupFileCount = 1 - } else { - // otherwise, we add the file to the current group - groupSize += files(i).size - groupFileCount += 1 - } - } - // add the last group to the groupedFiles - groupedFiles += files.slice(groupHead, files.length) - groupedFiles - } else { - Seq(files) - } - val fullCompaction = groupedFiles.size == 1 - groupedFiles.flatMap(files => { - lazy val incrementFiles = if (force || newBucketNum.isDefined) { - false - } else { - files.size == 1 && splitCompactFilePath(files.head.path)._1.nonEmpty - } - if (!incrementFiles) { - executeCompaction(sparkSession, tc, files, Array(sourcePartition), compactionPath, fullCompaction) - } else { - logInfo(s"== Partition ${sourcePartition.range_value} has no increment file.") - val origCompactedFile = files.head - if (sparkSession.sessionState.conf.getConf(RENAME_COMPACTED_FILE)) { - renameOldCompactedFile(tc, origCompactedFile, sourcePartition.range_value, compactionPath) - } else { - executeCompaction(sparkSession, tc, files, Array(sourcePartition), compactionPath, fullCompaction, origCompactedFile.path) - } - } - }) - }) + + val compactionPath = newCompactPath + val allDataCommitInfo = new ArrayBuffer[DataCommitInfo] + if (copyFiles.nonEmpty) { + allDataCommitInfo ++= executeCompaction(sparkSession, tc, copyFiles, Array(sourcePartition), compactionPath, true) + } + if (scanFiles.nonEmpty) { + allDataCommitInfo ++= executeCompaction(sparkSession, tc, scanFiles, Array(sourcePartition), compactionPath) + } if (allDataCommitInfo.nonEmpty) { val compactDataCommitInfoId = UUID.randomUUID diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala index 9bdc4893e..49e50b431 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala @@ -167,4 +167,23 @@ object LakeSoulSQLConf { """.stripMargin) .booleanConf .createWithDefault(false) + + val SCAN_FILE_NUMBER_LIMIT: ConfigEntry[Int] = + buildConf("scan.file.number.limit") + .doc( + """ + |If SCAN_FILE_NUMBER_LIMIT < Int.MaxValue, Scan will scan file with number less than SCAN_FILE_NUMBER_LIMIT per file group + """.stripMargin) + .intConf + .createWithDefault(Int.MaxValue) + + + val COMPACTION_TASK: ConfigEntry[Boolean] = + buildConf("scan.file.size.limit") + .doc( + """ + |If SCAN_FILE_NUMBER_LIMIT < Int.MaxValue, Scan will scan file with number less than SCAN_FILE_NUMBER_LIMIT per file group + """.stripMargin) + .booleanConf + .createWithDefault(false) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala index 1f550ef7f..c01e7a61f 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.{HASH_BUCKET_ID_KEY, MAX_FILE_SIZE_KEY} import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -32,10 +33,11 @@ class NativeIOOptions(val s3Bucket: String, val s3Region: String, val fsUser: String, val defaultFS: String, - val virtual_path_style: Boolean + val virtual_path_style: Boolean, + val others: Map[String, String] = Map.empty ) -object NativeIOUtils{ +object NativeIOUtils { def asArrayColumnVector(vectorSchemaRoot: VectorSchemaRoot): Array[ColumnVector] = { asScalaIteratorConverter(vectorSchemaRoot.getFieldVectors.iterator()) @@ -62,6 +64,13 @@ object NativeIOUtils{ var defaultFS = taskAttemptContext.getConfiguration.get("fs.defaultFS") if (defaultFS == null) defaultFS = taskAttemptContext.getConfiguration.get("fs.default.name") val fileSystem = file.getFileSystem(taskAttemptContext.getConfiguration) + var otherOptions = Map[String, String]() + if (taskAttemptContext.getConfiguration.get(HASH_BUCKET_ID_KEY, "").nonEmpty) { + otherOptions += HASH_BUCKET_ID_KEY -> taskAttemptContext.getConfiguration.get(HASH_BUCKET_ID_KEY) + } + if (taskAttemptContext.getConfiguration.get(MAX_FILE_SIZE_KEY, "").nonEmpty) { + otherOptions += MAX_FILE_SIZE_KEY -> taskAttemptContext.getConfiguration.get(MAX_FILE_SIZE_KEY) + } if (hasS3AFileSystemClass) { fileSystem match { case s3aFileSystem: S3AFileSystem => @@ -71,11 +80,11 @@ object NativeIOUtils{ val s3aAccessKey = taskAttemptContext.getConfiguration.get("fs.s3a.access.key") val s3aSecretKey = taskAttemptContext.getConfiguration.get("fs.s3a.secret.key") val virtualPathStyle = taskAttemptContext.getConfiguration.getBoolean("fs.s3a.path.style.access", false) - return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS, virtualPathStyle) + return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS, virtualPathStyle, otherOptions) case _ => } } - new NativeIOOptions(null, null, null, null, null, user, defaultFS, false) + new NativeIOOptions(null, null, null, null, null, user, defaultFS, false, otherOptions) } def setNativeIOOptions(nativeIO: NativeIOBase, options: NativeIOOptions): Unit = { @@ -89,6 +98,7 @@ object NativeIOUtils{ options.defaultFS, options.virtual_path_style ) + options.others.foreach(options => nativeIO.setOption(options._1, options._2)) } def setParquetConfigurations(sparkSession: SparkSession, hadoopConf: Configuration, readDataSchema: StructType): Unit = { diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala index 810f10db9..cfad5989b 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala @@ -547,29 +547,30 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialFileCount = getFileList(tablePath).length - println(s"before compact initialPartitionInfoCount=$initialFileCount") + println(s"before ${c}th time compact file count=$initialFileCount") lakeSoulTable.toDF.show // Perform limited compaction (group every compactGroupSize PartitionInfo) lakeSoulTable.compaction(fileNumLimit = Some(compactGroupSize)) // Get PartitionInfo count after compaction - val compactedFileCount = getFileList(tablePath).length + val compactedFileList = getFileList(tablePath) + val compactedFileCount = compactedFileList.length - println(s"after compact compactedPartitionInfoCount=$compactedFileCount") + println(s"after ${c}th time compact file count=$compactedFileCount") lakeSoulTable.toDF.show // Verify results - assert(compactedFileCount < initialFileCount, - s"Compaction should reduce the number of files, but it changed from ${initialFileCount} to $compactedFileCount") - + assert(compactedFileCount <= hashBucketNum, + s"Compaction should have hashBucketNum files, but it has $compactedFileCount") - assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, - s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") - assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, - s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") + // assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, + // s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") + // + // assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, + // s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") } // Verify data integrity @@ -648,15 +649,15 @@ class CompactionSuite extends QueryTest lakeSoulTable.toDF.show // Verify results - assert(compactedFileCount < initialFileCount, - s"Compaction should reduce the number of files, but it changed from ${initialFileCount} to $compactedFileCount") + assert(compactedFileCount <= hashBucketNum, + s"Compaction should have hashBucketNum files, but it has $compactedFileCount") - assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, - s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") - - assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, - s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") + // assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, + // s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") + // + // assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, + // s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") } // Verify data integrity @@ -713,23 +714,26 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialMaxFileSize = getFileList(tablePath).map(_.size).max - println(s"before compact initialMaxFileSize=$initialMaxFileSize") - - // Perform limited compaction (group every compactGroupSize PartitionInfo) + println(s"before ${c}th compact initialMaxFileSize=$initialMaxFileSize") LakeSoulTable.uncached(tablePath) - lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + spark.time({ + // Perform limited compaction (group every compactGroupSize PartitionInfo) + lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + // lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = true) + // lakeSoulTable.compaction() + }) // Get PartitionInfo count after compaction val compactedFiles = getFileList(tablePath) val compactedFileMax = compactedFiles.map(_.size).max - println(s"after compact compactedFileMax=$compactedFileMax") + println(s"after ${c}th compact compactedFileMax=$compactedFileMax") // Verify results - assert(compactedFileMax >= initialMaxFileSize, - s"Compaction should reduce the number of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") + // assert(compactedFileMax >= initialMaxFileSize, + // s"Compaction should increase the max size of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") - assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.2, + assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.1, s"Compaction should produce file with upper-bounded size, but there is a larger ${compactedFileMax} file size") val (compactDir, _) = splitCompactFilePath(compactedFiles.head.path) @@ -755,7 +759,7 @@ class CompactionSuite extends QueryTest val hashBucketNum = 4 val compactRounds = 5 val upsertPerRounds = 10 - val rowsPerUpsert = 1002 + val rowsPerUpsert = 1000 val compactFileSize = "10KB" // Create test data @@ -799,23 +803,26 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialMaxFileSize = getFileList(tablePath).map(_.size).max - println(s"before compact initialMaxFileSize=$initialMaxFileSize") + println(s"before ${c}th compact initialMaxFileSize=$initialMaxFileSize") // Perform limited compaction (group every compactGroupSize PartitionInfo) LakeSoulTable.uncached(tablePath) - lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + spark.time({ + lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + // lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = true) + }) // Get PartitionInfo count after compaction val compactedFiles = getFileList(tablePath) val compactedFileMax = compactedFiles.map(_.size).max - println(s"after compact compactedFileMax=$compactedFileMax") + println(s"after ${c}th compact compactedFileMax=$compactedFileMax") // Verify results // assert(compactedFileMax >= initialMaxFileSize, // s"Compaction should reduce the number of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") - assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.2, + assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.1, s"Compaction should produce file with upper-bounded size, but there is a larger ${compactedFileMax} file size") val (compactDir, _) = splitCompactFilePath(compactedFiles.head.path) @@ -825,7 +832,9 @@ class CompactionSuite extends QueryTest // Verify data integrity LakeSoulTable.uncached(tablePath) - val compactedData = lakeSoulTable.toDF.orderBy("id", "date").collect() + val finalData = lakeSoulTable.toDF.orderBy("id", "date") + // println(finalData.queryExecution) + val compactedData = finalData.collect() // println(compactedData.mkString("Array(", ", ", ")")) assert(compactedData.length == 6 + rowsPerUpsert * upsertPerRounds * compactRounds / 2, s"The compressed data should have ${6 + rowsPerUpsert * upsertPerRounds * compactRounds / 2} rows, but it actually has ${compactedData.length} rows") diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java index 5af7f9023..7a30b369a 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java @@ -166,6 +166,15 @@ public String getFilePath() { public String getFileExistCols() { return fileExistCols; } + + @Override + public String toString() { + return "FlushResult{" + + "filePath='" + filePath + '\'' + + ", fileSize=" + fileSize + + ", fileExistCols='" + fileExistCols + '\'' + + '}'; + } } public static FlushResult decodeFlushResult(String encoded) { diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 4230e6cc7..b6dde9c0f 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -173,6 +173,10 @@ impl LakeSoulIOConfig { self.option(OPTION_KEY_MEM_LIMIT).map(|x| x.parse().unwrap()) } + pub fn max_file_size_option(&self) -> Option { + self.option(OPTION_KEY_MAX_FILE_SIZE).map(|x| x.parse().unwrap()) + } + pub fn pool_size(&self) -> Option { self.option(OPTION_KEY_POOL_SIZE).map(|x| x.parse().unwrap()) } diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 370a461d4..3d034c59b 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -41,6 +41,10 @@ impl SyncSendableMutableLakeSoulWriter { let writer = Self::create_writer(writer_config).await?; let schema = writer.schema(); + if let Some(max_file_size) = config.max_file_size_option() { + config.max_file_size = Some(max_file_size); + } + if let Some(mem_limit) = config.mem_limit() { if config.use_dynamic_partition { config.max_file_size = Some((mem_limit as f64 * 0.15) as u64);