Skip to content

Commit

Permalink
Spark once task supports engingeConnRuntimeMode label (#4896)
Browse files Browse the repository at this point in the history
* Spark once task supports engingeConnRuntimeMode label

* isYarnClusterMode extracts to LabelUtil

* Modify SparkEngineConnFactory
  • Loading branch information
ChengJie1053 authored Sep 18, 2023
1 parent 5ca6726 commit 8f149e1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.manager.label.utils

import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
Expand Down Expand Up @@ -135,4 +136,13 @@ object LabelUtil {
null.asInstanceOf[A]
}

def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = {
val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels)
val isYarnClusterMode: Boolean = {
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false
}
isYarnClusterMode
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.linkis.manager.engineplugin.common.creation.{
}
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.manager.label.utils.LabelUtil
Expand Down Expand Up @@ -86,12 +85,13 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
val sparkHome = SPARK_HOME.getValue(options)
val sparkConfDir = SPARK_CONF_DIR.getValue(options)
val sparkConfig: SparkConfig = getSparkConfig(options)
val sparkConfig: SparkConfig =
getSparkConfig(options, LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()))
val context = new EnvironmentContext(sparkConfig, hadoopConfDir, sparkConfDir, sparkHome, null)
context
}

def getSparkConfig(options: util.Map[String, String]): SparkConfig = {
def getSparkConfig(options: util.Map[String, String], isYarnClusterMode: Boolean): SparkConfig = {
logger.info("options: " + JsonUtils.jackson.writeValueAsString(options))
val sparkConfig: SparkConfig = new SparkConfig()
sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
Expand All @@ -114,7 +114,14 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
}
sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))

if (master.startsWith("yarn")) {
if (isYarnClusterMode) {
sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLUSTER)
} else {
sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLIENT)
}
}
sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo
Expand Down Expand Up @@ -149,10 +156,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue)
logger.info(s"------ Create new SparkContext {$master} -------")

val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
val isYarnClusterMode: Boolean =
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false
val isYarnClusterMode = LabelUtil.isYarnClusterMode(engineCreationContext.getLabels())

if (isYarnClusterMode) {
sparkConf.set("spark.submit.deployMode", "cluster")
Expand Down

0 comments on commit 8f149e1

Please sign in to comment.