Skip to content

Commit

Permalink
Enable customized and isolated python environment for Pyspark
Browse files Browse the repository at this point in the history
  • Loading branch information
saLeox committed Nov 2, 2022
1 parent 4dc4247 commit d2df0e5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 26 deletions.
4 changes: 4 additions & 0 deletions linkis-dist/package/db/linkis_dml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`,
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.port', NULL, NULL, '4000', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.user', NULL, NULL, 'root', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.python.version', '取值范围:python2,python3', 'python版本','python2', 'OFT', '[\"python3\",\"python2\"]', '0', '0', '1', 'spark引擎设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.yarn.dist.archives', 'Comma separated list of archives to be extracted into the working directory of each executor.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.pyspark.python', 'Python binary executable to use for PySpark in both driver and executors.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.pyspark.driver.python', 'Python binary executable to use for PySpark in driver.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark');

-- hive
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'hive引擎最大并发数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'hive');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', '取值范围:1-10,单位:G', 'hive引擎初始化内存大小','1g', 'Regex', '^([1-9]|10)(G|g)$', '0', '0', '1', 'hive引擎设置', 'hive');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ object SparkConfiguration extends Logging {
val ENGINE_SHUTDOWN_LOGS =
CommonVars("wds.linkis.spark.engineconn.fatal.log", "error writing class;OutOfMemoryError")

val PYSPARK_PYTHON3_PATH =
CommonVars[String]("pyspark.python3.path", "/appcom/Install/anaconda3/bin/python")

val ENABLE_REPLACE_PACKAGE_NAME =
CommonVars("wds.linkis.spark.engine.scala.replace_package_header.enable", true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,31 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
.toString
.toLowerCase()
val sparkPythonVersion =
if (
StringUtils
.isNotBlank(userDefinePythonVersion) && userDefinePythonVersion.equals("python3")
) {
SparkConfiguration.PYSPARK_PYTHON3_PATH.getValue
} else {
userDefinePythonVersion
}
val pythonExec = CommonVars("PYSPARK_DRIVER_PYTHON", sparkPythonVersion).getValue
if (StringUtils.isNotBlank(userDefinePythonVersion)) userDefinePythonVersion else "python"
val pySparkDriverPythonFromVersion =
if (new File(sparkPythonVersion).exists()) sparkPythonVersion else ""

// extra pyspark driver Python
val pySparkDriverPythonConf = "spark.pyspark.driver.python"
val userDefinePySparkDriverPython =
sc.getConf.getOption(pySparkDriverPythonConf).getOrElse(pySparkDriverPythonFromVersion)
val defaultPySparkDriverPython = CommonVars("PYSPARK_DRIVER_PYTHON", "").getValue
// spark.pyspark.driver.python > spark.python.version > PYSPARK_DRIVER_PYTHON
val pySparkDriverPython =
if (StringUtils.isNotBlank(userDefinePySparkDriverPython)) userDefinePySparkDriverPython
else defaultPySparkDriverPython
logger.info(s"PYSPARK_DRIVER_PYTHON => $pySparkDriverPython")

// extra pyspark Python
val pySparkPythonConf = "spark.pyspark.python"
val userDefinePySparkPython = sc.getConf.getOption(pySparkPythonConf).getOrElse("")
val defaultPySparkPython = CommonVars("PYSPARK_PYTHON", "").getValue
val pySparkPython =
if (StringUtils.isNotBlank(userDefinePySparkPython)) userDefinePySparkPython
else defaultPySparkPython
logger.info(s"PYSPARK_PYTHON => $pySparkPython")

val pythonScriptPath = CommonVars("python.script.path", "python/mix_pyspark.py").getValue

val port: Int = EngineUtils.findAvailPort
gatewayServer = gwBuilder.entryPoint(this).javaPort(port).build()
gatewayServer.start()
Expand All @@ -168,6 +181,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
)
val pythonClasspath = new StringBuilder(pythonPath)

// extra spark files
val files = sc.getConf.get("spark.files", "")
logger.info(s"output spark files ${files}")
if (StringUtils.isNotEmpty(files)) {
Expand All @@ -186,7 +200,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
.filter(_.endsWith(".zip"))
.foreach(pythonClasspath ++= File.pathSeparator ++= _)

val cmd = CommandLine.parse(pythonExec)
val cmd = CommandLine.parse(pySparkDriverPython)
cmd.addArgument(createFakeShell(pythonScriptPath).getAbsolutePath, false)
cmd.addArgument(port.toString, false)
cmd.addArgument(EngineUtils.sparkSubmitVersion().replaceAll("\\.", ""), false)
Expand All @@ -195,19 +209,8 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
cmd.addArgument(pyFiles, false)

val builder = new ProcessBuilder(cmd.toStrings.toSeq.toList.asJava)

val env = builder.environment()
if (StringUtils.isBlank(sc.getConf.get("spark.pyspark.python", ""))) {
logger.info("spark.pyspark.python is null")
if (userDefinePythonVersion.equals("python3")) {
logger.info(s"userDefinePythonVersion is $pythonExec will be set to PYSPARK_PYTHON")
env.put("PYSPARK_PYTHON", pythonExec)
}
} else {
val executorPython = sc.getConf.get("spark.pyspark.python")
logger.info(s"set PYSPARK_PYTHON spark.pyspark.python is $executorPython")
env.put("PYSPARK_PYTHON", executorPython)
}
if (StringUtils.isNotBlank(pySparkPython)) env.put("PYSPARK_PYTHON", pySparkPython)
env.put("PYTHONPATH", pythonClasspath.toString())
env.put("PYTHONUNBUFFERED", "YES")
env.put("PYSPARK_GATEWAY_PORT", "" + port)
Expand Down

0 comments on commit d2df0e5

Please sign in to comment.