From c8a40d97eb4af53f329a6833f216b5da83c12863 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Wed, 3 Apr 2024 10:15:03 -0700 Subject: [PATCH] [KYUUBI #5767][FOLLOWUP] Fix spark batch conf not convert issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— Convert the spark batch conf key if not start with `spark.`, it is impacted by #5767 ``` protected def convertConfigKey(key: String): String = { if (key.startsWith("spark.")) { key } else if (key.startsWith("hadoop.")) { "spark.hadoop." + key } else { "spark." + key } } ``` ## Describe Your Solution ๐Ÿ”ง ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6256 from turboFei/convert_key. Closes #5767 d76655e05 [Wang, Fei] Fix break Authored-by: Wang, Fei Signed-off-by: Wang, Fei --- .../spark/SparkBatchProcessBuilder.scala | 9 +++-- .../engine/spark/SparkProcessBuilder.scala | 2 +- .../spark/SparkBatchProcessBuilderSuite.scala | 39 +++++++++++++++++++ .../spark/SparkProcessBuilderSuite.scala | 14 ++++++- 4 files changed, 58 insertions(+), 6 deletions(-) create mode 100644 kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index 813e9dce775..713a34d0c87 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -38,7 +38,7 @@ class SparkBatchProcessBuilder( extends SparkProcessBuilder(proxyUser, true, conf, batchId, extraEngineLog) { import SparkProcessBuilder._ - override protected lazy val commands: Iterable[String] = { + override protected[kyuubi] lazy val commands: Iterable[String] = { val buffer = new mutable.ListBuffer[String]() buffer += executable Option(mainClass).foreach { cla => @@ -53,11 +53,12 @@ class SparkBatchProcessBuilder( // tag batch application KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf) - val allConfigs = batchKyuubiConf.getAll ++ + (batchKyuubiConf.getAll ++ sparkAppNameConf() ++ engineLogPathConf() ++ - appendPodNameConf(batchConf) - buffer ++= confKeyValues(allConfigs) + appendPodNameConf(batchConf)).map { case (k, v) => + buffer ++= confKeyValue(convertConfigKey(k), v) + } setupKerberos(buffer) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 4c06d7951a3..a651e99efbc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -124,7 +124,7 @@ class SparkProcessBuilder( file.isDirectory && r.findFirstMatchIn(file.getName).isDefined } - override protected lazy val commands: Iterable[String] = { + override protected[kyuubi] lazy val commands: Iterable[String] = { // complete `spark.master` if absent on kubernetes completeMasterUrl(conf) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala new file mode 100644 index 00000000000..e3603e24ec9 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilderSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark + +import java.util.UUID + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf + +class SparkBatchProcessBuilderSuite extends KyuubiFunSuite { + test("spark batch conf should be converted with `spark.` prefix") { + val builder = new SparkBatchProcessBuilder( + "kyuubi", + KyuubiConf(false), + UUID.randomUUID().toString, + "test", + Some("test"), + "test", + Map("kyuubi.key" -> "value"), + Seq.empty, + None) + assert(builder.commands.toSeq.contains("spark.kyuubi.key=value")) + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 74c28539b2e..4ee98a0802a 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -449,9 +449,21 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { val commands2 = builder2.toString.split(' ') assert(commands2.contains("spark.yarn.maxAppAttempts=1")) } + + test("spark conf should be converted with `spark.` prefix") { + val kyuubiConf = KyuubiConf(false) + kyuubiConf.set("kyuubi.key", "value") + val builder = new SparkProcessBuilder( + "kyuubi", + false, + kyuubiConf, + UUID.randomUUID().toString, + None) + assert(builder.commands.toSeq.contains("spark.kyuubi.key=value")) + } } class FakeSparkProcessBuilder(config: KyuubiConf) extends SparkProcessBuilder("fake", true, config) { - override protected lazy val commands: Iterable[String] = Seq("ls") + override lazy val commands: Iterable[String] = Seq("ls") }