Skip to content

Commit

Permalink
implement optimized write
Browse files Browse the repository at this point in the history
  • Loading branch information
weiluo-db committed Oct 6, 2023
1 parent 879df3c commit f94cc4c
Show file tree
Hide file tree
Showing 9 changed files with 867 additions and 6 deletions.
12 changes: 12 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,18 @@ trait DeltaConfigsBase extends DeltaLogging {
_ => true,
"needs to be a boolean."
)

/**
* Enable optimized writes into a Delta table. Optimized writes adds an adaptive shuffle before
* the write to write compacted files into a Delta table during a write.
*/
val OPTIMIZE_WRITE = buildConfig[Option[Boolean]](
"autoOptimize.optimizeWrite",
null,
v => Option(v).map(_.toBoolean),
_ => true,
"needs to be a boolean."
)
}

object DeltaConfigs extends DeltaConfigsBase
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.commands.merge.{MergeIntoMaterializeSource, MergeIntoMaterializeSourceReason, MergeStats}
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex}
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex, TransactionalWrite}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -228,7 +228,9 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand
txn: OptimisticTransaction,
outputDF: DataFrame): Seq[FileAction] = {
val partitionColumns = txn.metadata.partitionColumns
if (partitionColumns.nonEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
// If the write will be optimized write, which shuffles the data anyway, then don't repartition.
if (partitionColumns.nonEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)
&& !TransactionalWrite.shouldOptimizeWrite(txn.metadata, spark.sessionState.conf)) {
txn.writeFiles(outputDF.repartition(partitionColumns.map(col): _*))
} else {
txn.writeFiles(outputDF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ class OptimizeExecutor(
sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID),
description)

val addFiles = txn.writeFiles(repartitionDF).collect {
val addFiles = txn.writeFiles(repartitionDF, None, isOptimize = true, Nil).collect {
case a: AddFile =>
a.copy(dataChange = false)
case other =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package org.apache.spark.sql.delta.files

import java.net.URI

import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec
import org.apache.spark.sql.delta.schema._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf.DELTA_COLLECT_STATS_USING_TABLE_SCHEMA
Expand Down Expand Up @@ -228,6 +227,13 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
writeFiles(data, Nil)
}

def writeFiles(
data: Dataset[_],
deltaOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
writeFiles(data, deltaOptions, isOptimize = false, additionalConstraints)
}

/**
* Returns a tuple of (data, partition schema). For CDC writes, a `__is_cdc` column is added to
* the data and `__is_cdc=true/false` is added to the front of the partition schema.
Expand Down Expand Up @@ -348,6 +354,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
def writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
hasWritten = true

Expand Down Expand Up @@ -379,7 +386,13 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

val empty2NullPlan = convertEmptyToNullIfNeeded(queryExecution.executedPlan,
partitioningColumns, constraints)
val physicalPlan = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val physicalPlan = if (!isOptimize &&
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)) {
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
} else {
checkInvariants
}

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

Expand Down Expand Up @@ -449,4 +462,27 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

resultFiles.toSeq ++ committer.changeFiles
}

/**
* Optimized writes can be enabled/disabled through the following order:
* - Through DataFrameWriter options
* - Through the table parameter
* - Through SQL configuration
*/
private def shouldOptimizeWrite(
writeOptions: Option[DeltaOptions], sessionConf: SQLConf): Boolean = {
writeOptions.flatMap(_.optimizeWrite)
.getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf))
}

}

object TransactionalWrite {
def shouldOptimizeWrite(metadata: Metadata, sessionConf: SQLConf): Boolean = {
// We want table properties to take precedence over the session/default conf.
DeltaConfigs.OPTIMIZE_WRITE
.fromMetaData(metadata)
.orElse(sessionConf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED))
.getOrElse(false)
}
}
Loading

0 comments on commit f94cc4c

Please sign in to comment.