diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index bd69ab3c3f1..81d8513243f 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -72,6 +72,7 @@ class IcebergConversionTransaction( protected abstract class TransactionHelper(impl: PendingUpdate[_]) { private var committed = false + var writeSize = 0L def opType: String @@ -94,6 +95,7 @@ class IcebergConversionTransaction( override def opType: String = "append" def add(add: AddFile): Unit = { + writeSize += add.size appender.appendFile( convertDeltaAddFileToIcebergDataFile( add, @@ -134,6 +136,7 @@ class IcebergConversionTransaction( override def opType: String = "overwrite" def add(add: AddFile): Unit = { + writeSize += add.size overwriter.addFile( convertDeltaAddFileToIcebergDataFile( add, @@ -169,6 +172,7 @@ class IcebergConversionTransaction( override def opType: String = "rewrite" def rewrite(removes: Seq[RemoveFile], adds: Seq[AddFile]): Unit = { + writeSize += adds.map(_.size).sum val dataFilesToDelete = removes.map { f => assert(!f.dataChange, "Rewrite operation should not add data") convertDeltaRemoveFileToIcebergDataFile( @@ -487,7 +491,9 @@ class IcebergConversionTransaction( "version" -> postCommitSnapshot.version, "timestamp" -> postCommitSnapshot.timestamp, "tableOp" -> tableOp.getClass.getSimpleName.stripSuffix("$"), - "prevConvertedDeltaVersion" -> lastConvertedDeltaVersion + "prevConvertedDeltaVersion" -> lastConvertedDeltaVersion, + "tableSize" -> postCommitSnapshot.sizeInBytes, + "commitWriteSize" -> fileUpdates.map(_.writeSize).sum ) ++ icebergTxnTypes ++ errorData ) }