Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][SPARK] Delta Optimize CommitInfo's operationalParameters incompatible with Delta Kernel API CommitInfo #3888

Open
2 of 8 tasks
MaicoTimmerman opened this issue Nov 19, 2024 · 2 comments · May be fixed by #3889
Open
2 of 8 tasks
Labels
bug Something isn't working

Comments

@MaicoTimmerman
Copy link

MaicoTimmerman commented Nov 19, 2024

Bug

Unable to read in commit info timestamp after running optimize.

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

Steps to reproduce

Test case for kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala:

  test("CommitInfo getCommitInfoOpt should work after Spark optimize") {
    withTempDir { dir =>
      spark.range(10).repartition(2).write.format("delta").save(dir.toString)
      spark.sql(s"OPTIMIZE delta.`$dir`").collect()

      val engine = DefaultEngine.create(new Configuration())

      CommitInfo.getCommitInfoOpt(engine, new Path(dir.getCanonicalPath, "_delta_log"), 1L)
    }
  }

Observed results

Delta Kernel API is unable to retrieve the CommitInfo's, due to new operationParameter field being arbitrary JSON.

Couldn't decode false, expected a string
java.lang.RuntimeException: Couldn't decode false, expected a string
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.throwIfTypeMismatch(DefaultJsonRow.java:132)
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.decodeElement(DefaultJsonRow.java:241)
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.decodeElement(DefaultJsonRow.java:314)
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.decodeField(DefaultJsonRow.java:356)
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.<init>(DefaultJsonRow.java:49)
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.decodeElement(DefaultJsonRow.java:268)
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.decodeField(DefaultJsonRow.java:356)
	at io.delta.kernel.defaults.internal.data.DefaultJsonRow.<init>(DefaultJsonRow.java:49)
	at io.delta.kernel.defaults.engine.DefaultJsonHandler.parseJson(DefaultJsonHandler.java:196)
	at io.delta.kernel.defaults.engine.DefaultJsonHandler.access$000(DefaultJsonHandler.java:44)
	at io.delta.kernel.defaults.engine.DefaultJsonHandler$1.next(DefaultJsonHandler.java:128)
	at io.delta.kernel.defaults.engine.DefaultJsonHandler$1.next(DefaultJsonHandler.java:84)
	at io.delta.kernel.internal.actions.CommitInfo.getCommitInfoOpt(CommitInfo.java:228)
	at io.delta.kernel.defaults.InCommitTimestampSuite.$anonfun$new$42(InCommitTimestampSuite.scala:595)
	at io.delta.kernel.defaults.InCommitTimestampSuite.$anonfun$new$42$adapted(InCommitTimestampSuite.scala:589)
	at io.delta.kernel.defaults.utils.TestUtils.withTempDir(TestUtils.scala:487)
	at io.delta.kernel.defaults.utils.TestUtils.withTempDir$(TestUtils.scala:485)
	at io.delta.kernel.defaults.InCommitTimestampSuite.withTempDir(InCommitTimestampSuite.scala:46)
	at io.delta.kernel.defaults.InCommitTimestampSuite.$anonfun$new$41(InCommitTimestampSuite.scala:589)

Expected results

Being able to read the CommitInfo.

Further details

According to the protocol, the commit info field can be arbitrary JSON:

Implementations are free to store any valid JSON-formatted data via the commitInfo action.

The delta log JSON file will be:

{
  "commitInfo": {
    // brevity
    "operationParameters": {
      // brevity
      "auto": true
    }
}

Environment information

  • Delta Lake version: 3.2.0
  • Spark version: 3.5.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@MaicoTimmerman MaicoTimmerman added the bug Something isn't working label Nov 19, 2024
@MaicoTimmerman
Copy link
Author

My proposal would be to change the operationParameters map back to MapType<StringType, StringType>:

diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
index 644a468af..ffc4d2ae9 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
@@ -699,7 +699,7 @@ object DeltaOperations {
       // When clustering columns are specified, set the zOrderBy key to empty.
       ZORDER_PARAMETER_KEY -> JsonUtils.toJson(if (clusterBy.isEmpty) zOrderBy else Seq.empty),
       CLUSTERING_PARAMETER_KEY -> JsonUtils.toJson(clusterBy.getOrElse(Seq.empty)),
-      AUTO_COMPACTION_PARAMETER_KEY -> auto
+      AUTO_COMPACTION_PARAMETER_KEY -> auto.toString
     )
     // `isFull` is not relevant for non-clustering tables, so skip it.
     .++(clusterBy.filter(_.nonEmpty).map(_ => CLUSTERING_IS_FULL_KEY -> isFull))

Alternatively, we can adapt how io.delta.kernel.internal.actions.CommitInfo is read/defined. However, at this moment I don't see an easy way of forcing arbitrary JSON into a fixed schema. It could be adapted to coerce all fields into strings, but those changes will need to go deep in the io.delta.kernel.defaults.internal.data package.

MaicoTimmerman pushed a commit to MaicoTimmerman/delta that referenced this issue Nov 19, 2024
MaicoTimmerman pushed a commit to MaicoTimmerman/delta that referenced this issue Nov 19, 2024
@scovich
Copy link
Collaborator

scovich commented Nov 19, 2024

My proposal would be to change the operationParameters map back to MapType<StringType, StringType>

That would resolve the immediate symptom, but --

This issue is super annoying because there's no guarantee from the Delta spec that operationParameters itself is a JSON object -- "arbitrary JSON data" could be e.g. an array containing arbitrary elements:

{ 
  "commitInfo": {
    "operationParameters": ["I", "committed", "version", 23, "append-only", false],
}

Technically, the spec doesn't even mandate that the top-level commitInfo is a JSON object -- a writer could legally produce:

{
  "commitInfo": "yup, I committed!",
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants