Skip to content

Commit

Permalink
[KYUUBI #5767][FOLLOWUP] Fix spark batch conf not convert issue
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

Convert the spark batch conf key if not start with `spark.`, it is impacted by #5767

```
  protected def convertConfigKey(key: String): String = {
    if (key.startsWith("spark.")) {
      key
    } else if (key.startsWith("hadoop.")) {
      "spark.hadoop." + key
    } else {
      "spark." + key
    }
  }
```
## Describe Your Solution 🔧

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6256 from turboFei/convert_key.

Closes #5767

d76655e [Wang, Fei] Fix break

Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
  • Loading branch information
turboFei committed Apr 3, 2024
1 parent 9b618c9 commit c8a40d9
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SparkBatchProcessBuilder(
extends SparkProcessBuilder(proxyUser, true, conf, batchId, extraEngineLog) {
import SparkProcessBuilder._

override protected lazy val commands: Iterable[String] = {
override protected[kyuubi] lazy val commands: Iterable[String] = {
val buffer = new mutable.ListBuffer[String]()
buffer += executable
Option(mainClass).foreach { cla =>
Expand All @@ -53,11 +53,12 @@ class SparkBatchProcessBuilder(
// tag batch application
KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf)

val allConfigs = batchKyuubiConf.getAll ++
(batchKyuubiConf.getAll ++
sparkAppNameConf() ++
engineLogPathConf() ++
appendPodNameConf(batchConf)
buffer ++= confKeyValues(allConfigs)
appendPodNameConf(batchConf)).map { case (k, v) =>
buffer ++= confKeyValue(convertConfigKey(k), v)
}

setupKerberos(buffer)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class SparkProcessBuilder(
file.isDirectory && r.findFirstMatchIn(file.getName).isDefined
}

override protected lazy val commands: Iterable[String] = {
override protected[kyuubi] lazy val commands: Iterable[String] = {
// complete `spark.master` if absent on kubernetes
completeMasterUrl(conf)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark

import java.util.UUID

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf

class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
test("spark batch conf should be converted with `spark.` prefix") {
val builder = new SparkBatchProcessBuilder(
"kyuubi",
KyuubiConf(false),
UUID.randomUUID().toString,
"test",
Some("test"),
"test",
Map("kyuubi.key" -> "value"),
Seq.empty,
None)
assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,21 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
val commands2 = builder2.toString.split(' ')
assert(commands2.contains("spark.yarn.maxAppAttempts=1"))
}

test("spark conf should be converted with `spark.` prefix") {
val kyuubiConf = KyuubiConf(false)
kyuubiConf.set("kyuubi.key", "value")
val builder = new SparkProcessBuilder(
"kyuubi",
false,
kyuubiConf,
UUID.randomUUID().toString,
None)
assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
}
}

class FakeSparkProcessBuilder(config: KyuubiConf)
extends SparkProcessBuilder("fake", true, config) {
override protected lazy val commands: Iterable[String] = Seq("ls")
override lazy val commands: Iterable[String] = Seq("ls")
}

0 comments on commit c8a40d9

Please sign in to comment.