Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
lzlfred committed Aug 28, 2024
1 parent 10ed75a commit ebf9d77
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ class IcebergConverter(spark: SparkSession)
var hasRemoves = false
var hasDataChange = false
var hasCommitInfo = false
var commitInfo: Option[CommitInfo] = None
breakable {
for (action <- actionsToCommit) {
action match {
Expand All @@ -370,7 +371,9 @@ class IcebergConverter(spark: SparkSession)
case r: RemoveFile =>
hasRemoves = true
if (r.dataChange) hasDataChange = true
case _: CommitInfo => hasCommitInfo = true
case ci: CommitInfo =>
commitInfo = Some(ci)
hasCommitInfo = true
case _ => // Do nothing
}
if (hasAdds && hasRemoves && hasDataChange && hasCommitInfo) break // Short-circuit
Expand Down Expand Up @@ -404,9 +407,14 @@ class IcebergConverter(spark: SparkSession)
}
overwriteHelper.commit()
} else if (hasAdds) {
val appendHelper = icebergTxn.getAppendOnlyHelper()
addsAndRemoves.foreach(action => appendHelper.add(action.add))
appendHelper.commit()
if (!hasRemoves && !hasDataChange && allDeltaActionsCaptured) {
logInfo(s"Skip Iceberg conversion for commit that only has AddFiles " +
s"without any RemoveFiles or data change. CommitInfo: $commitInfo")
} else {
val appendHelper = icebergTxn.getAppendOnlyHelper()
addsAndRemoves.foreach(action => appendHelper.add(action.add))
appendHelper.commit()
}
} else if (hasRemoves) {
val removeHelper = icebergTxn.getRemoveOnlyHelper()
addsAndRemoves.foreach(action => removeHelper.remove(action.remove))
Expand Down

0 comments on commit ebf9d77

Please sign in to comment.