diff --git a/bin/livy-server b/bin/livy-server index 71b913440..928424f30 100755 --- a/bin/livy-server +++ b/bin/livy-server @@ -83,9 +83,6 @@ start_livy_server() { LIVY_CLASSPATH="$LIBDIR/*:$LIVY_CONF_DIR" - if [ -n "$SPARK_CONF_DIR" ]; then - LIVY_CLASSPATH="$LIVY_CLASSPATH:$SPARK_CONF_DIR" - fi if [ -n "$HADOOP_CONF_DIR" ]; then LIVY_CLASSPATH="$LIVY_CLASSPATH:$HADOOP_CONF_DIR" fi diff --git a/conf/livy.conf.template b/conf/livy.conf.template index d57717a13..a14951a68 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -98,9 +98,48 @@ # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 1s +<<<<<<< b5ef8c0df0a0fd6c64a36ee33ae30a2a2ee6e89f:conf/livy.conf.template # # Days to keep Livy server request logs. # livy.server.request-log-retain.days = 5 # If the Livy Web UI should be included in the Livy Server. Enabled by default. # livy.ui.enabled = true +======= + +# Define Spark environments in Livy Server. User could pre-define multiple Spark environments, and +# pick one environment in the run-time via session creation request. +# +# A Spark enviroment is combined by several configurations: +# livy.server.spark-env.${sparkEnv}.spark-home = +# livy.server.spark-env.${sparkEnv}.spark-conf-dir = +# livy.server.spark-env.${sparkEnv}.scalaVersion = +# whether to enable HiveContext in interpreter session, by default is false. +# livy.server.spark-env.${sparkEnv}.enableHiveContext = false +# livy.server.spark-env.${sparkEnv}.sparkr.package = +# livy.server.spark-env.${sparkEnv}.pyspark.archives = +# +# Only livy.server.spark-env.${sparkEnv}.spark-home is required, others can be inferred from +# provided spark-home. +# +# Environement variables like SPARK_HOME, SPARK_CONF_DIR can still be used and the value will be +# merged into "default" environment. +# +# User can also define "${SPARK_ENV}_SPARK_HOME" and "${SPARK_ENV}_SPARK_CONF_DIR", and these values +# will merged with ${sparkEnv} environment. +# +# ${sparkEnv} can be replaced with any name wanted. When creating a session, user could specify the +# name of Spark environment, Livy server internally will pick right Spark environment accordingly, +# by default "default" spark environment will be pick if not specify. +# +# For the backward compatibility, all the previous configurations: + +# livy.server.spark-home +# livy.server.spark-conf-dir +# livy.spark.scalaVersion +# livy.repl.enableHiveContext +# livy.sparkr.package +# livy.pyspark.archives +# +# can still be used and will automatically be merged into "default" Spark environment. +>>>>>>> Add unit tests and change docs and scripts:conf/livy.conf diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala index 973d7d757..748dfafab 100644 --- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala +++ b/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala @@ -156,12 +156,13 @@ object MiniLivyMain extends MiniClusterBase { var livyConf = Map( LivyConf.LIVY_SPARK_MASTER.key -> "yarn", LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster", - LivyConf.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(), LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s", + "livy.server.spark-env.default.scala-version" -> getSparkScalaVersion(), LivyConf.YARN_POLL_INTERVAL.key -> "500ms", LivyConf.RECOVERY_MODE.key -> "recovery", LivyConf.RECOVERY_STATE_STORE.key -> "filesystem", - LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store") + LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store", + "livy.server.spark-env.default.enable-hive-context" -> "true") if (Cluster.isRunningOnTravis) { livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m") @@ -171,7 +172,6 @@ object MiniLivyMain extends MiniClusterBase { val server = new LivyServer() server.start() - server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, true) // Write a serverUrl.conf file to the conf directory with the location of the Livy // server. Do it atomically since it's used by MiniCluster to detect when the Livy server // is up and ready. diff --git a/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala b/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala index 40656c78e..a8ae6ad91 100644 --- a/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala +++ b/repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala @@ -47,6 +47,7 @@ class ReplDriverSuite extends FunSuite with LivyBaseUnitTestSuite { .setURI(new URI("rsc:/")) .setConf(RSCConf.Entry.DRIVER_CLASS.key(), classOf[ReplDriver].getName()) .setConf(RSCConf.Entry.SESSION_KIND.key(), Spark().toString) + .setConf(RSCConf.Entry.SPARK_HOME.key(), System.getenv("SPARK_HOME")) .build() .asInstanceOf[RSCClient] diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java index 9a5e447ea..179e9bf88 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java @@ -28,13 +28,7 @@ import java.io.Reader; import java.io.Writer; import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -66,8 +60,6 @@ class ContextLauncher { private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; private static final String SPARK_JARS_KEY = "spark.jars"; - private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives"; - private static final String SPARK_HOME_ENV = "SPARK_HOME"; static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf) throws IOException { @@ -124,6 +116,7 @@ public void run() { this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask, conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); } catch (Exception e) { + LOG.error("exception: ", e); dispose(true); throw Utils.propagate(e); } @@ -172,11 +165,10 @@ private static ChildProcess startDriver(final RSCConf conf, Promise promise) } merge(conf, SPARK_JARS_KEY, livyJars, ","); + HashMap childEnv = new HashMap<>(); String kind = conf.get(SESSION_KIND); - if ("sparkr".equals(kind)) { - merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ","); - } else if ("pyspark".equals(kind)) { - merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ","); + if ("pyspark".equals(kind) && conf.get(RSCConf.Entry.PYSPARK_ARCHIVES) != null) { + childEnv.put("PYSPARK_ARCHIVES_PATH", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES)); } // Disable multiple attempts since the RPC server doesn't yet support multiple @@ -220,7 +212,7 @@ public void run() { }; return new ChildProcess(conf, promise, child, confFile); } else { - final SparkLauncher launcher = new SparkLauncher(); + final SparkLauncher launcher = new SparkLauncher(childEnv); // Spark 1.x does not support specifying deploy mode in conf and needs special handling. String deployMode = conf.get(SPARK_DEPLOY_MODE); @@ -228,7 +220,7 @@ public void run() { launcher.setDeployMode(deployMode); } - launcher.setSparkHome(System.getenv(SPARK_HOME_ENV)); + launcher.setSparkHome(conf.get(SPARK_HOME)); launcher.setAppResource("spark-internal"); launcher.setPropertiesFile(confFile.getAbsolutePath()); launcher.setMainClass(RSCDriverBootstrapper.class.getName()); @@ -266,11 +258,7 @@ private static File writeConfToFile(RSCConf conf) throws IOException { } // Load the default Spark configuration. - String confDir = System.getenv("SPARK_CONF_DIR"); - if (confDir == null && System.getenv(SPARK_HOME_ENV) != null) { - confDir = System.getenv(SPARK_HOME_ENV) + File.separator + "conf"; - } - + String confDir = conf.get(SPARK_CONF_DIR); if (confDir != null) { File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf"); if (sparkDefaults.isFile()) { diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java index d1b8b399f..4e069558b 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java @@ -46,8 +46,10 @@ public static enum Entry implements ConfEntry { SESSION_KIND("session.kind", null), LIVY_JARS("jars", null), - SPARKR_PACKAGE("sparkr.package", null), + PYSPARK_ARCHIVES("pyspark.archives", null), + SPARK_HOME("spark_home", null), + SPARK_CONF_DIR("spark_conf_dir", null), // Address for the RSC driver to connect back with it's connection info. LAUNCHER_ADDRESS("launcher.address", null), diff --git a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java b/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java index 973d0125b..22dc6f9ed 100644 --- a/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java +++ b/rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java @@ -76,6 +76,7 @@ private Properties createConf(boolean local) { } conf.put(LIVY_JARS.key(), ""); + conf.put(SPARK_HOME.key(), System.getenv("SPARK_HOME")); return conf; } diff --git a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala index 1753c5564..f2c8e8add 100644 --- a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala +++ b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala @@ -185,6 +185,7 @@ object ScalaClientTest { conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath) } conf.put(LIVY_JARS.key, "") + conf.put(SPARK_HOME.key(), System.getenv("SPARK_HOME")) conf } diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala index b347c2d19..b4db4b7a5 100644 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala @@ -27,8 +27,8 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import com.cloudera.livy.client.common.ClientConf -import com.cloudera.livy.client.common.ClientConf.ConfEntry -import com.cloudera.livy.client.common.ClientConf.DeprecatedConf +import com.cloudera.livy.client.common.ClientConf._ +import com.cloudera.livy.utils.SparkEnvironment object LivyConf { @@ -42,21 +42,12 @@ object LivyConf { val TEST_MODE = ClientConf.TEST_MODE - val SPARK_HOME = Entry("livy.server.spark-home", null) val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local") val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null) - // Two configurations to specify Spark and related Scala version. These are internal - // configurations will be set by LivyServer and used in session creation. It is not required to - // set usually unless running with unofficial Spark + Scala versions - // (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11) - val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scala-version", null) - val LIVY_SPARK_VERSION = Entry("livy.spark.version", null) - val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null) val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024) val LOCAL_FS_WHITELIST = Entry("livy.file.local-dir-whitelist", null) - val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enable-hive-context", false) val ENVIRONMENT = Entry("livy.environment", "production") @@ -142,13 +133,6 @@ object LivyConf { // How long a finished session state will be kept in memory val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s") - val SPARK_MASTER = "spark.master" - val SPARK_DEPLOY_MODE = "spark.submit.deployMode" - val SPARK_JARS = "spark.jars" - val SPARK_FILES = "spark.files" - val SPARK_ARCHIVES = "spark.yarn.dist.archives" - val SPARK_PY_FILES = "spark.submit.pyFiles" - /** * These are Spark configurations that contain lists of files that the user can add to * their jobs in one way or another. Livy needs to pre-process these to make sure the @@ -160,18 +144,6 @@ object LivyConf { */ val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null) - private val HARDCODED_SPARK_FILE_LISTS = Seq( - SPARK_JARS, - SPARK_FILES, - SPARK_ARCHIVES, - SPARK_PY_FILES, - "spark.yarn.archive", - "spark.yarn.dist.files", - "spark.yarn.dist.jars", - "spark.yarn.jar", - "spark.yarn.jars" - ) - case class DepConf( override val key: String, override val version: String, @@ -180,8 +152,8 @@ object LivyConf { private val configsWithAlternatives: Map[String, DeprecatedConf] = Map[String, DepConf]( LIVY_SPARK_DEPLOY_MODE.key -> DepConf("livy.spark.deployMode", "0.4"), - LIVY_SPARK_SCALA_VERSION.key -> DepConf("livy.spark.scalaVersion", "0.4"), - ENABLE_HIVE_CONTEXT.key -> DepConf("livy.repl.enableHiveContext", "0.4"), + "livy.spark.scala-version" -> DepConf("livy.spark.scalaVersion", "0.4"), + "livy-repl.enable-hive-context" -> DepConf("livy.repl.enableHiveContext", "0.4"), CSRF_PROTECTION.key -> DepConf("livy.server.csrf_protection.enabled", "0.4"), ACCESS_CONTROL_ENABLED.key -> DepConf("livy.server.access_control.enabled", "0.4"), ACCESS_CONTROL_USERS.key -> DepConf("livy.server.access_control.users", "0.4"), @@ -202,7 +174,6 @@ object LivyConf { Map(configs.map { cfg => (cfg.key -> cfg) }: _*) } - } /** @@ -222,7 +193,8 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { path.stripSuffix("/") + "/" } - lazy val sparkFileLists = HARDCODED_SPARK_FILE_LISTS ++ configToSeq(SPARK_FILE_LISTS) + lazy val sparkFileLists = SparkEnvironment.HARDCODED_SPARK_FILE_LISTS ++ + configToSeq(SPARK_FILE_LISTS) /** * Create a LivyConf that loads defaults from the system properties and the classpath. @@ -247,17 +219,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) { /** Return the spark deploy mode Livy sessions should use. */ def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty) - /** Return the location of the spark home directory */ - def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME")) - /** Return the spark master Livy sessions should use. */ def sparkMaster(): String = get(LIVY_SPARK_MASTER) - /** Return the path to the spark-submit executable. */ - def sparkSubmit(): String = { - sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get - } - /** Return the list of superusers. */ def superusers(): Seq[String] = _superusers diff --git a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala index 7bfe9cea5..fa3006393 100644 --- a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala +++ b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala @@ -64,30 +64,6 @@ class LivyServer extends Logging { maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE)) ).toMultipartConfigElement - // Make sure the `spark-submit` program exists, otherwise much of livy won't work. - testSparkHome(livyConf) - - // Test spark-submit and get Spark Scala version accordingly. - val (sparkVersion, scalaVersionFromSparkSubmit) = sparkSubmitVersion(livyConf) - testSparkVersion(sparkVersion) - - // If Spark and Scala version is set manually, should verify if they're consistent with - // ones parsed from "spark-submit --version" - val formattedSparkVersion = formatSparkVersion(sparkVersion) - Option(livyConf.get(LIVY_SPARK_VERSION)).map(formatSparkVersion).foreach { version => - require(formattedSparkVersion == version, - s"Configured Spark version $version is not equal to Spark version $formattedSparkVersion " + - "got from spark-submit -version") - } - - // Set formatted Spark and Scala version into livy configuration, this will be used by - // session creation. - // TODO Create a new class to pass variables from LivyServer to sessions and remove these - // internal LivyConfs. - livyConf.set(LIVY_SPARK_VERSION.key, formattedSparkVersion.productIterator.mkString(".")) - livyConf.set(LIVY_SPARK_SCALA_VERSION.key, - sparkScalaVersion(formattedSparkVersion, scalaVersionFromSparkSubmit, livyConf)) - if (UserGroupInformation.isSecurityEnabled) { // If Hadoop security is enabled, run kinit periodically. runKinit() should be called // before any Hadoop operation, otherwise Kerberos exception will be thrown. diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala index 1a097b75b..500746dc0 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala @@ -29,7 +29,7 @@ import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions.{Session, SessionState} import com.cloudera.livy.sessions.Session._ -import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder} +import com.cloudera.livy.utils._ @JsonIgnoreProperties(ignoreUnknown = true) case class BatchRecoveryMetadata( @@ -53,6 +53,7 @@ object BatchSession extends Logging { sessionStore: SessionStore, mockApp: Option[SparkApp] = None): BatchSession = { val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}" + val sparkEnv = SparkEnvironment.getSparkEnv(livyConf, request.sparkEnv) def createSparkApp(s: BatchSession): SparkApp = { val conf = SparkApp.prepareSparkConf( @@ -63,7 +64,7 @@ object BatchSession extends Logging { require(request.file != null, "File is required.") val builder = new SparkProcessBuilder(livyConf) - builder.conf(conf) + builder.conf(conf).executable(sparkEnv.sparkSubmit()) proxyUser.foreach(builder.proxyUser) request.className.foreach(builder.className) diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala b/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala index 29c269c1b..39cd4f793 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala @@ -20,6 +20,7 @@ package com.cloudera.livy.server.batch class CreateBatchRequest { + var sparkEnv: String = "default" var file: String = _ var proxyUser: Option[String] = None var args: List[String] = List() diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala index 726119964..30e98942f 100644 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala +++ b/server/src/main/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequest.scala @@ -22,6 +22,7 @@ import com.cloudera.livy.sessions.{Kind, Spark} class CreateInteractiveRequest { var kind: Kind = Spark() + var sparkEnv: String = "default" var proxyUser: Option[String] = None var jars: List[String] = List() var pyFiles: List[String] = List() diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala index 05bc7be75..fdb727f37 100644 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala @@ -21,7 +21,6 @@ package com.cloudera.livy.server.interactive import java.io.{File, InputStream} import java.net.URI import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong @@ -44,8 +43,7 @@ import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions._ import com.cloudera.livy.sessions.Session._ import com.cloudera.livy.sessions.SessionState.Dead -import com.cloudera.livy.util.LineBufferedProcess -import com.cloudera.livy.utils.{AppInfo, LivySparkUtils, SparkApp, SparkAppListener} +import com.cloudera.livy.utils._ @JsonIgnoreProperties(ignoreUnknown = true) case class InteractiveRecoveryMetadata( @@ -61,8 +59,6 @@ case class InteractiveRecoveryMetadata( extends RecoveryMetadata object InteractiveSession extends Logging { - private[interactive] val SPARK_YARN_IS_PYTHON = "spark.yarn.isPython" - val RECOVERY_SESSION_TYPE = "interactive" def create( @@ -75,12 +71,13 @@ object InteractiveSession extends Logging { mockApp: Option[SparkApp] = None, mockClient: Option[RSCClient] = None): InteractiveSession = { val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}" + val sparkEnv = SparkEnvironment.getSparkEnv(livyConf, request.sparkEnv) val client = mockClient.orElse { val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf)) - val builderProperties = prepareBuilderProp(conf, request.kind, livyConf) + val builderProperties = prepareBuilderProp(conf, request.kind, livyConf, sparkEnv) val userOpts: Map[String, Option[String]] = Map( "spark.driver.cores" -> request.driverCores.map(_.toString), @@ -154,7 +151,8 @@ object InteractiveSession extends Logging { private[interactive] def prepareBuilderProp( conf: Map[String, String], kind: Kind, - livyConf: LivyConf): mutable.Map[String, String] = { + livyConf: LivyConf, + sparkEnv: SparkEnvironment): mutable.Map[String, String] = { val builderProperties = mutable.Map[String, String]() builderProperties ++= conf @@ -179,62 +177,13 @@ object InteractiveSession extends Logging { } } - def findSparkRArchive(): Option[String] = { - Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse { - sys.env.get("SPARK_HOME").map { case sparkHome => - val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator) - val rArchivesFile = new File(path) - require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.") - rArchivesFile.getAbsolutePath() - } - } - } - - def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = { - if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) { - // datanucleus jars has already been in classpath in integration test - Seq.empty - } else { - val sparkHome = livyConf.sparkHome().get - val libdir = sparkMajorVersion match { - case 1 => - if (new File(sparkHome, "RELEASE").isFile) { - new File(sparkHome, "lib") - } else { - new File(sparkHome, "lib_managed/jars") - } - case 2 => - if (new File(sparkHome, "RELEASE").isFile) { - new File(sparkHome, "jars") - } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) { - new File(sparkHome, "assembly/target/scala-2.11/jars") - } else { - new File(sparkHome, "assembly/target/scala-2.10/jars") - } - case v => - throw new RuntimeException("Unsupported spark major version:" + sparkMajorVersion) - } - val jars = if (!libdir.isDirectory) { - Seq.empty[String] - } else { - libdir.listFiles().filter(_.getName.startsWith("datanucleus-")) - .map(_.getAbsolutePath).toSeq - } - if (jars.isEmpty) { - warn("datanucleus jars can not be found") - } - jars - } - } - /** * Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf) * 1. First look for hive-site.xml in user request * 2. Then look for that under classpath - * @param livyConf * @return (hive-site.xml path, whether it is provided by user) */ - def hiveSiteFile(sparkFiles: Array[String], livyConf: LivyConf): (Option[File], Boolean) = { + def hiveSiteFile(sparkFiles: Array[String]): (Option[File], Boolean) = { if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) { (None, true) } else { @@ -247,28 +196,6 @@ object InteractiveSession extends Logging { } } - def findPySparkArchives(): Seq[String] = { - Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES)) - .map(_.split(",").toSeq) - .getOrElse { - sys.env.get("SPARK_HOME") .map { case sparkHome => - val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) - val pyArchivesFile = new File(pyLibPath, "pyspark.zip") - require(pyArchivesFile.exists(), - "pyspark.zip not found; cannot run pyspark application in YARN mode.") - - val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip") - .iterator() - .next() - .toFile - - require(py4jFile.exists(), - "py4j-*-src.zip not found; cannot run pyspark application in YARN mode.") - Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath) - }.getOrElse(Seq()) - } - } - def mergeConfList(list: Seq[String], key: String): Unit = { if (list.nonEmpty) { builderProperties.get(key) match { @@ -282,16 +209,18 @@ object InteractiveSession extends Logging { } def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = { - val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String]) - hiveSiteFile(sparkFiles, livyConf) match { + val sparkFiles = conf.get(SparkEnvironment.SPARK_FILES) + .map(_.split(",")) + .getOrElse(Array.empty[String]) + hiveSiteFile(sparkFiles) match { case (_, true) => debug("Enable HiveContext because hive-site.xml is found in user request.") - mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS) + mergeConfList(sparkEnv.datanucleusJars(), SparkEnvironment.SPARK_JARS) case (Some(file), false) => debug("Enable HiveContext because hive-site.xml is found under classpath, " + file.getAbsolutePath) - mergeConfList(List(file.getAbsolutePath), LivyConf.SPARK_FILES) - mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS) + mergeConfList(List(file.getAbsolutePath), SparkEnvironment.SPARK_FILES) + mergeConfList(sparkEnv.datanucleusJars(), SparkEnvironment.SPARK_JARS) case (None, false) => warn("Enable HiveContext but no hive-site.xml found under" + " classpath or user request.") @@ -300,40 +229,36 @@ object InteractiveSession extends Logging { kind match { case PySpark() | PySpark3() => - val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil - mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES) - builderProperties.put(SPARK_YARN_IS_PYTHON, "true") + val pySparkFiles = if (!LivyConf.TEST_MODE) sparkEnv.findPySparkArchives() else Nil + mergeConfList(pySparkFiles, RSCConf.Entry.PYSPARK_ARCHIVES.key()) + builderProperties.put(SparkEnvironment.SPARK_YARN_IS_PYTHON, "true") case SparkR() => - val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None + val sparkRArchive = if (!LivyConf.TEST_MODE) Some(sparkEnv.findSparkRArchive()) else None sparkRArchive.foreach { archive => - builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr") + mergeConfList(List(archive + "#sparkr"), SparkEnvironment.SPARK_ARCHIVES) } case _ => } builderProperties.put(RSCConf.Entry.SESSION_KIND.key, kind.toString) + builderProperties.put(RSCConf.Entry.SPARK_HOME.key, sparkEnv.sparkHome()) + builderProperties.put(RSCConf.Entry.SPARK_CONF_DIR.key, sparkEnv.sparkConfDir()) // Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set. Option(livyConf.get(LivyConf.RSC_JARS)).foreach( builderProperties.getOrElseUpdate(RSCConf.Entry.LIVY_JARS.key(), _)) - require(livyConf.get(LivyConf.LIVY_SPARK_VERSION) != null) - require(livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) != null) - - val (sparkMajorVersion, _) = - LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION)) - val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) + mergeConfList(livyJars(livyConf, sparkEnv.scalaVersion()), SparkEnvironment.SPARK_JARS) - mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS) - val enableHiveContext = livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT) + val enableHiveContext = sparkEnv.getBoolean(SparkEnvironment.ENABLE_HIVE_CONTEXT) // pass spark.livy.spark_major_version to driver - builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString) + val sparkMajorVersion = sparkEnv.sparkVersion()._1 + builderProperties.put(SparkEnvironment.SPARK_MAJOR_VERSION, sparkMajorVersion.toString) if (sparkMajorVersion <= 1) { - builderProperties.put("spark.repl.enableHiveContext", - livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT).toString) + builderProperties.put(SparkEnvironment.SPARK_ENABLE_HIVE_CONTEXT, enableHiveContext.toString) } else { val confVal = if (enableHiveContext) "hive" else "in-memory" - builderProperties.put("spark.sql.catalogImplementation", confVal) + builderProperties.put(SparkEnvironment.SPARK2_ENABLE_HIVE_CONTEXT, confVal) } if (enableHiveContext) { diff --git a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala index a0d0f8b10..d0a4c482e 100644 --- a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala +++ b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.security.UserGroupInformation import com.cloudera.livy.{LivyConf, Logging, Utils} -import com.cloudera.livy.utils.AppInfo +import com.cloudera.livy.utils.{AppInfo, SparkEnvironment} object Session { trait RecoveryMetadata { val id: Int } @@ -70,10 +70,10 @@ object Session { .map { key => (key -> Nil) }.toMap val userLists = confLists ++ Map( - LivyConf.SPARK_JARS -> jars, - LivyConf.SPARK_FILES -> files, - LivyConf.SPARK_ARCHIVES -> archives, - LivyConf.SPARK_PY_FILES -> pyFiles) + SparkEnvironment.SPARK_JARS -> jars, + SparkEnvironment.SPARK_FILES -> files, + SparkEnvironment.SPARK_ARCHIVES -> archives, + SparkEnvironment.SPARK_PY_FILES -> pyFiles) val merged = userLists.flatMap { case (key, list) => val confList = conf.get(key) @@ -89,8 +89,8 @@ object Session { } } - val masterConfList = Map(LivyConf.SPARK_MASTER -> livyConf.sparkMaster()) ++ - livyConf.sparkDeployMode().map(LivyConf.SPARK_DEPLOY_MODE -> _).toMap + val masterConfList = Map(SparkEnvironment.SPARK_MASTER -> livyConf.sparkMaster()) ++ + livyConf.sparkDeployMode().map(SparkEnvironment.SPARK_DEPLOY_MODE -> _).toMap conf ++ masterConfList ++ merged } diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala index 9f6b614b3..317d39c5d 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.cloudera.livy.util +package com.cloudera.livy.utils import com.cloudera.livy.{Logging, Utils} diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala index a8949aff0..d7f9ce0b6 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.cloudera.livy.util +package com.cloudera.livy.utils import java.io.InputStream import java.util.concurrent.locks.ReentrantLock diff --git a/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala b/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala index 766eb91ee..5758bf4f3 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala @@ -24,8 +24,6 @@ import scala.collection.SortedMap import scala.math.Ordering.Implicits._ import com.cloudera.livy.{LivyConf, Logging} -import com.cloudera.livy.LivyConf.LIVY_SPARK_SCALA_VERSION -import com.cloudera.livy.util.LineBufferedProcess object LivySparkUtils extends Logging { @@ -50,22 +48,18 @@ object LivySparkUtils extends Logging { /** * Test that Spark home is configured and configured Spark home is a directory. */ - def testSparkHome(livyConf: LivyConf): Unit = { - val sparkHome = livyConf.sparkHome().getOrElse { - throw new IllegalArgumentException("Livy requires the SPARK_HOME environment variable") - } - - require(new File(sparkHome).isDirectory(), "SPARK_HOME path does not exist") + def testSparkHome(sparkEnv: SparkEnvironment): Unit = { + require(new File(sparkEnv.sparkHome()).isDirectory(), "SPARK_HOME path does not exist") } /** * Test that the configured `spark-submit` executable exists. * - * @param livyConf + * @param sparkEnv */ - def testSparkSubmit(livyConf: LivyConf): Unit = { + def testSparkSubmit(sparkEnv: SparkEnvironment): Unit = { try { - testSparkVersion(sparkSubmitVersion(livyConf)._1) + testSparkVersion(sparkSubmitVersion(sparkEnv)._1) } catch { case e: IOException => throw new IOException("Failed to run spark-submit executable", e) @@ -87,11 +81,11 @@ object LivySparkUtils extends Logging { /** * Call `spark-submit --version` and parse its output for Spark and Scala version. * - * @param livyConf + * @param sparkEnv * @return Tuple with Spark and Scala version */ - def sparkSubmitVersion(livyConf: LivyConf): (String, Option[String]) = { - val sparkSubmit = livyConf.sparkSubmit() + def sparkSubmitVersion(sparkEnv: SparkEnvironment): (String, Option[String]) = { + val sparkSubmit = sparkEnv.sparkSubmit() val pb = new ProcessBuilder(sparkSubmit, "--version") pb.redirectErrorStream(true) pb.redirectInput(ProcessBuilder.Redirect.PIPE) @@ -122,8 +116,8 @@ object LivySparkUtils extends Logging { def sparkScalaVersion( formattedSparkVersion: (Int, Int), scalaVersionFromSparkSubmit: Option[String], - livyConf: LivyConf): String = { - val scalaVersionInLivyConf = Option(livyConf.get(LIVY_SPARK_SCALA_VERSION)) + sparkEnv: SparkEnvironment): String = { + val scalaVersionInLivyConf = Option(sparkEnv.get(SparkEnvironment.SPARK_SCALA_VERSION)) .filter(_.nonEmpty) .map(formatScalaVersion) diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala index 57aa580c2..c79243109 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala @@ -21,7 +21,6 @@ package com.cloudera.livy.utils import scala.collection.JavaConverters._ import com.cloudera.livy.LivyConf -import com.cloudera.livy.util.LineBufferedProcess object AppInfo { val DRIVER_LOG_URL_NAME = "driverLogUrl" diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkEnvironment.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkEnvironment.scala new file mode 100644 index 000000000..b4d8b3783 --- /dev/null +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkEnvironment.scala @@ -0,0 +1,323 @@ +/* + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.livy.utils + +import java.io.File +import java.lang.{Boolean => JBoolean} +import java.nio.file.{Files, Paths} +import java.util.{Map => JMap} + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import com.google.common.annotations.VisibleForTesting + +import com.cloudera.livy.{LivyConf, Logging} +import com.cloudera.livy.client.common.ClientConf +import com.cloudera.livy.client.common.ClientConf.{ConfEntry, DeprecatedConf} + +object SparkEnvironment extends Logging { + + case class Entry(override val key: String, override val dflt: AnyRef) extends ConfEntry + + object Entry { + def apply(key: String, dflt: Boolean): Entry = Entry(key, dflt: JBoolean) + } + + val DEFAULT_ENV_NAME = "default" + val SPARK_ENV_PREFIX = "livy.server.spark-env" + + val SPARK_HOME = Entry("spark-home", null) + val SPARK_CONF_DIR = Entry("spark-conf-dir", null) + + // This configuration is used to specify Spark's Scala version. It is an internal + // configurations will be used in session creation. It is not required to + // set usually unless running with unofficial Spark + Scala versions + // (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11) + val SPARK_SCALA_VERSION = Entry("scala-version", null) + + val ENABLE_HIVE_CONTEXT = Entry("enable-hive-context", false) + + val SPARKR_PACKAGE = Entry("sparkr.package", null) + val PYSPARK_ARCHIVES = Entry("pyspark.archives", null) + + val backwardCompatibleConfs = Map( + "livy.server.spark-home" -> SPARK_HOME, + "livy.server.spark-conf-dir" -> SPARK_CONF_DIR, + "livy.spark.scala-version" -> SPARK_SCALA_VERSION, + "livy.spark.scalaVersion" -> SPARK_SCALA_VERSION, + "livy.repl.enable-hive-context" -> ENABLE_HIVE_CONTEXT, + "livy.repl.enableHiveContext" -> ENABLE_HIVE_CONTEXT, + "livy.sparkr.package" -> SPARKR_PACKAGE, + "livy.pyspark.archives" -> PYSPARK_ARCHIVES + ) + + val SPARK_MASTER = "spark.master" + val SPARK_DEPLOY_MODE = "spark.submit.deployMode" + val SPARK_JARS = "spark.jars" + val SPARK_FILES = "spark.files" + val SPARK_ARCHIVES = "spark.yarn.dist.archives" + val SPARK_PY_FILES = "spark.submit.pyFiles" + val SPARK_YARN_IS_PYTHON = "spark.yarn.isPython" + + val SPARK_ENABLE_HIVE_CONTEXT = "spark.repl.enableHiveContext" + val SPARK2_ENABLE_HIVE_CONTEXT = "spark.sql.catalogImplementation" + + // Spark major version passed from server to driver, used for interpreter to load right object. + val SPARK_MAJOR_VERSION = "spark.livy.spark_major_version" + + val HARDCODED_SPARK_FILE_LISTS = Seq( + SPARK_JARS, + SPARK_FILES, + SPARK_ARCHIVES, + SPARK_PY_FILES, + "spark.yarn.archive", + "spark.yarn.dist.files", + "spark.yarn.dist.jars", + "spark.yarn.jar", + "spark.yarn.jars" + ) + + @VisibleForTesting + private[livy] val sparkEnvironments = new mutable.HashMap[String, SparkEnvironment] + + def getSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = { + if (sparkEnvironments.contains(env)) { + sparkEnvironments(env) + } else { + synchronized { + if (sparkEnvironments.contains(env)) { + sparkEnvironments(env) + } else { + val sparkEnv = createSparkEnv(livyConf, env) + sparkEnv.environmentCheck(livyConf) + sparkEnvironments(env) = sparkEnv + sparkEnv + } + } + } + } + + @VisibleForTesting + private[livy] def createSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = { + val livySparkConfKeys = getClass.getMethods.filter { + _.getReturnType.getCanonicalName == classOf[Entry].getCanonicalName + }.map(_.invoke(this).asInstanceOf[Entry].key).toSet + + val sparkEnv = new SparkEnvironment(env) + if (env == DEFAULT_ENV_NAME) { + livyConf.asScala + .filter { kv => backwardCompatibleConfs.contains(kv.getKey) } + .foreach { kv => sparkEnv.set(backwardCompatibleConfs(kv.getKey), kv.getValue) } + } + + livyConf.asScala + .filter { kv => kv.getKey.startsWith(s"$SPARK_ENV_PREFIX.$env.") && + livySparkConfKeys.contains(kv.getKey.stripPrefix(s"$SPARK_ENV_PREFIX.$env.")) } + .foreach { + kv => sparkEnv.set(kv.getKey.stripPrefix(s"$SPARK_ENV_PREFIX.$env."), kv.getValue) + } + + info(s"Created Spark environments $env with configuration ${sparkEnv.asScala.mkString(",")}") + sparkEnv + } +} + +/** + * A isolated Spark environment used for isolating Spark related configurations, libraries. + * Livy Can have multiple Spark environments differentiated by name, for example if user + * configured in Livy conf like: + * + * livy.server.spark-env.test.spark-home = xxx + * livy.server.spark-env.test.spark-conf-dir = xxx + * + * livy.server.spark-env.production.spark-home = yyy + * livy.server.spark-env.production.spark-conf-dir = yyy + * + * Livy internally will have two isolated Spark environments "test" and "production". When user + * create batch or interactive session, they could specify through "sparkEnv" in json body. Livy + * server will honor this env name and pick right Spark environment. This is used for Livy to + * support different Spark cluster in runtime. + * + * The Default Spark environment is "default". If user configured + * + * livy.server.spark-home = xxx + * or: + * livy.server.spark-conf-dir = xxx + * + * Livy server will treat configuration to "default" Spark environment to keep + * backward compatibility. This is equal to: + * + * livy.server.spark-env.default.spark-home = xxx + * + * Also for environment variable, user's configuration + * + * SPARK_HOME or DEFAULT_SPARK_HOME will be treated as "default" Spark environment. + * TEST_SPARK_HOME or TEST_SPARK_CONF_DIR will be allocated to "test" Spark environment. + */ +class SparkEnvironment private(name: String) + extends ClientConf[SparkEnvironment](null) with Logging { + + import SparkEnvironment._ + + @VisibleForTesting + private[livy] var _sparkVersion: (Int, Int) = _ + @VisibleForTesting + private[livy] var _scalaVersion: String = _ + + /** + * Return the location of the spark home directory. It will check livy conf as well as + * environment variable. For "default" Spark environment, it will check SPARK_HOME or + * DEFAULT_SPARK_HOME. For other Spark environment, it will check ${NAME}_SPARK_HOME. + */ + def sparkHome(): String = { + Option(get(SPARK_HOME)) + .orElse { + if (name == DEFAULT_ENV_NAME) { + sys.env.get(DEFAULT_ENV_NAME.toUpperCase + "_SPARK_HOME") + .orElse(sys.env.get("SPARK_HOME")) + } else { + sys.env.get(name.toUpperCase + "_SPARK_HOME") + } + }.getOrElse(throw new IllegalStateException(s"SPARK_HOME is not configured")) + } + + /** + * Return the location of Spark conf directory. It will check livy conf + * "livy.server.spark-conf-dir" as well as environment variable. From "default" Spark + * environment, it will check SPARK_CONF_DIR or DEFAULT_SPARK_CONF_DIR. For other Spark + * environment, it will check ${NAME}_SPARK_CONF_DIR. + */ + def sparkConfDir(): String = { + Option(get(SPARK_CONF_DIR)) + .orElse( + if (name == DEFAULT_ENV_NAME) { + sys.env.get(DEFAULT_ENV_NAME.toUpperCase + "_SPARK_CONF_DIR") + .orElse(sys.env.get("SPARK_CONF_DIR")) + } else { + sys.env.get(name.toUpperCase + "_SPARK_CONF_DIR") + } + ).getOrElse(sparkHome + File.separator + "conf") + } + + /** Return the path to the spark-submit executable. */ + def sparkSubmit(): String = { + sparkHome() + File.separator + "bin" + File.separator + "spark-submit" + } + + def sparkVersion(): (Int, Int) = { + require(_sparkVersion != null) + _sparkVersion + } + + def scalaVersion(): String = { + require(_scalaVersion != null) + _scalaVersion + } + + def environmentCheck(livyConf: LivyConf): Unit = { + // Make sure the `spark-submit` program exists, otherwise much of livy won't work. + LivySparkUtils.testSparkHome(this) + + // Test spark-submit and get Spark Scala version accordingly. + val (sparkVersionFromSparkSubmit, scalaVersionFromSparkSubmit) = + LivySparkUtils.sparkSubmitVersion(this) + + LivySparkUtils.testSparkVersion(sparkVersionFromSparkSubmit) + + _sparkVersion = LivySparkUtils.formatSparkVersion(sparkVersionFromSparkSubmit) + _scalaVersion = + LivySparkUtils.sparkScalaVersion(_sparkVersion, scalaVersionFromSparkSubmit, this) + } + + + def findSparkRArchive(): String = { + Option(get(SPARKR_PACKAGE)).getOrElse { + val path = Seq(sparkHome(), "R", "lib", "sparkr.zip").mkString(File.separator) + val rArchivesFile = new File(path) + require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.") + rArchivesFile.getAbsolutePath() + } + } + + def datanucleusJars(): Seq[String] = { + if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) { + // datanucleus jars has already been in classpath in integration test + Seq.empty + } else { + val major = sparkVersion()._1 + val libdir = major match { + case 1 => + if (new File(sparkHome(), "RELEASE").isFile) { + new File(sparkHome(), "lib") + } else { + new File(sparkHome(), "lib_managed/jars") + } + case 2 => + if (new File(sparkHome(), "RELEASE").isFile) { + new File(sparkHome(), "jars") + } else if (new File(sparkHome(), "assembly/target/scala-2.11/jars").isDirectory) { + new File(sparkHome(), "assembly/target/scala-2.11/jars") + } else { + new File(sparkHome(), "assembly/target/scala-2.10/jars") + } + case _ => + throw new IllegalStateException(s"Unsupported spark major version: $major") + } + val jars = if (!libdir.isDirectory) { + Seq.empty[String] + } else { + libdir.listFiles().filter(_.getName.startsWith("datanucleus-")) + .map(_.getAbsolutePath).toSeq + } + if (jars.isEmpty) { + warn("datanucleus jars can not be found") + } + jars + } + } + + def findPySparkArchives(): Seq[String] = { + Option(get(PYSPARK_ARCHIVES)) + .map(_.split(",").toSeq) + .getOrElse { + val pyLibPath = Seq(sparkHome(), "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + require(pyArchivesFile.exists(), + "pyspark.zip not found; cannot run pyspark application in YARN mode.") + + val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip") + .iterator() + .next() + .toFile + + require(py4jFile.exists(), + "py4j-*-src.zip not found; cannot run pyspark application in YARN mode.") + Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath) + } + } + + override protected def getConfigsWithAlternatives: JMap[String, DeprecatedConf] = { + Map.empty[String, DeprecatedConf].asJava + } + + override protected def getDeprecatedConfigs: JMap[String, DeprecatedConf] = { + Map.empty[String, DeprecatedConf].asJava + } +} diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala index ca27ab050..cfd420d7a 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala @@ -19,7 +19,6 @@ package com.cloudera.livy.utils import com.cloudera.livy.{Logging, Utils} -import com.cloudera.livy.util.LineBufferedProcess /** * Provide a class to control a Spark application using spark-submit. diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala index a38a76177..e8a8dd8a1 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala @@ -23,11 +23,10 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.cloudera.livy.{LivyConf, Logging} -import com.cloudera.livy.util.LineBufferedProcess class SparkProcessBuilder(livyConf: LivyConf) extends Logging { - private[this] var _executable: String = livyConf.sparkSubmit() + private[this] var _executable: String = _ private[this] var _master: Option[String] = None private[this] var _deployMode: Option[String] = None private[this] var _className: Option[String] = None diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala index fb47e5e4b..fa3ba784f 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala @@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException import org.apache.hadoop.yarn.util.ConverterUtils import com.cloudera.livy.{LivyConf, Logging, Utils} -import com.cloudera.livy.util.LineBufferedProcess object SparkYarnApp extends Logging { diff --git a/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala index 8ac46a77e..e5985e421 100644 --- a/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala @@ -33,7 +33,7 @@ import org.scalatest.mock.MockitoSugar.mock import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions.SessionState -import com.cloudera.livy.utils.{AppInfo, SparkApp} +import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkEnvironment} class BatchSessionSpec extends FunSpec @@ -63,6 +63,10 @@ class BatchSessionSpec sessionStore = mock[SessionStore] } + after { + SparkEnvironment.sparkEnvironments.clear() + } + it("should create a process") { val req = new CreateBatchRequest() req.file = script.toString @@ -110,5 +114,26 @@ class BatchSessionSpec verify(sessionStore, atLeastOnce()).save( Matchers.eq(BatchSession.RECOVERY_SESSION_TYPE), anyObject()) } + + it("should use right Spark environment") { + assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") + val conf = new LivyConf() + .set("livy.server.spark-home", sys.env("SPARK_HOME")) + .set(SparkEnvironment.SPARK_ENV_PREFIX + ".test." + SparkEnvironment.SPARK_HOME.key, + sys.env("SPARK_HOME")) + + val mockApp = mock[SparkApp] + + val req = new CreateBatchRequest() + req.sparkEnv = "default" + BatchSession.create(0, req, conf, null, None, sessionStore, Some(mockApp)) + + val req1 = new CreateBatchRequest() + req1.sparkEnv = "test" + BatchSession.create(1, req1, conf, null, None, sessionStore, Some(mockApp)) + + SparkEnvironment.sparkEnvironments.get("default") should not be (None) + SparkEnvironment.sparkEnvironments.get("test") should not be (None) + } } } diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala index f08e09ea4..44cd9c5f4 100644 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala @@ -51,8 +51,6 @@ abstract class BaseInteractiveServletSpec super.createConf() .set(LivyConf.SESSION_STAGING_DIR, tempDir.toURI().toString()) .set(LivyConf.REPL_JARS, "dummy.jar") - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5") } protected def createRequest( diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala index 28d715781..5f2c6dca3 100644 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala @@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods.parse import org.mockito.{Matchers => MockitoMatchers} import org.mockito.Matchers._ import org.mockito.Mockito.{atLeastOnce, verify, when} -import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpec, Matchers} import org.scalatest.concurrent.Eventually._ import org.scalatest.mock.MockitoSugar.mock @@ -39,27 +39,26 @@ import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf} import com.cloudera.livy.rsc.driver.StatementState import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions.{PySpark, SessionState, Spark} -import com.cloudera.livy.utils.{AppInfo, SparkApp} +import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkEnvironment} class InteractiveSessionSpec extends FunSpec - with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite { + with Matchers with BeforeAndAfterAll with BeforeAndAfter with LivyBaseUnitTestSuite { private val livyConf = new LivyConf() livyConf.set(LivyConf.REPL_JARS, "dummy.jar") - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5") implicit val formats = DefaultFormats private var session: InteractiveSession = null private def createSession( + livyConf: LivyConf = this.livyConf, sessionStore: SessionStore = mock[SessionStore], - mockApp: Option[SparkApp] = None): InteractiveSession = { - assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") - + mockApp: Option[SparkApp] = None, + sparkEnv: String = "default"): InteractiveSession = { val req = new CreateInteractiveRequest() req.kind = PySpark() + req.sparkEnv = sparkEnv req.driverMemory = Some("512m") req.driverCores = Some(1) req.executorMemory = Some("512m") @@ -81,6 +80,11 @@ class InteractiveSessionSpec extends FunSpec } } + override def beforeAll(): Unit = { + super.beforeAll() + assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") + } + override def afterAll(): Unit = { if (session != null) { Await.ready(session.stop(), 30 seconds) @@ -112,17 +116,22 @@ class InteractiveSessionSpec extends FunSpec ) val livyConf = new LivyConf(false) .set(LivyConf.REPL_JARS, testedJars.mkString(",")) - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10") - val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf) - assert(properties(LivyConf.SPARK_JARS).split(",").toSet === Set("test_2.10-0.1.jar", + val sparkEnv = SparkEnvironment.createSparkEnv(livyConf, "default") + sparkEnv._sparkVersion = (1, 6) + sparkEnv._scalaVersion = "2.10" + val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf, sparkEnv) + assert(properties(SparkEnvironment.SPARK_JARS).split(",").toSet === Set( + "test_2.10-0.1.jar", "local://dummy-path/test/test1_2.10-1.0.jar", "hdfs:///dummy-path/test/test3.jar", "dummy.jar")) - livyConf.set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.11") - val properties1 = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf) - assert(properties1(LivyConf.SPARK_JARS).split(",").toSet === Set( + val newSparkEnv = SparkEnvironment.createSparkEnv(livyConf, "default") + newSparkEnv._sparkVersion = (1, 6) + newSparkEnv._scalaVersion = "2.11" + val properties1 = InteractiveSession.prepareBuilderProp( + Map.empty, Spark(), livyConf, newSparkEnv) + assert(properties1(SparkEnvironment.SPARK_JARS).split(",").toSet === Set( "file:///dummy-path/test/test2_2.11-1.0-SNAPSHOT.jar", "hdfs:///dummy-path/test/test3.jar", "dummy.jar")) @@ -138,9 +147,10 @@ class InteractiveSessionSpec extends FunSpec val livyConf = new LivyConf(false) .set(LivyConf.REPL_JARS, "dummy.jar") .set(LivyConf.RSC_JARS, rscJars.mkString(",")) - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10") - val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf) + val sparkEnv = SparkEnvironment.createSparkEnv(livyConf, "default") + sparkEnv._sparkVersion = (1, 6) + sparkEnv._scalaVersion = "2.10" + val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf, sparkEnv) // if livy.rsc.jars is configured in LivyConf, it should be passed to RSCConf. properties(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars @@ -150,7 +160,7 @@ class InteractiveSessionSpec extends FunSpec "file:///dummy-path/foo2.jar", "hdfs:///dummy-path/foo3.jar") val properties1 = InteractiveSession.prepareBuilderProp( - Map(RSCConf.Entry.LIVY_JARS.key() -> rscJars1.mkString(",")), Spark(), livyConf) + Map(RSCConf.Entry.LIVY_JARS.key() -> rscJars1.mkString(",")), Spark(), livyConf, sparkEnv) // if rsc jars are configured both in LivyConf and RSCConf, RSCConf should take precedence. properties1(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars1 } @@ -163,7 +173,7 @@ class InteractiveSessionSpec extends FunSpec it("should update appId and appInfo and session store") { val mockApp = mock[SparkApp] val sessionStore = mock[SessionStore] - val session = createSession(sessionStore, Some(mockApp)) + val session = createSession(sessionStore = sessionStore, mockApp = Some(mockApp)) val expectedAppId = "APPID" session.appIdKnown(expectedAppId) @@ -262,4 +272,47 @@ class InteractiveSessionSpec extends FunSpec s.logLines().mkString should include("RSCDriver URI is unknown") } } + + describe("multiple Spark environments") { + import SparkEnvironment._ + + var session: InteractiveSession = null + + after ( + if (session != null) { + Await.ready(session.stop(), 30 seconds) + session = null + sparkEnvironments.clear() + } + ) + + it("should honor default Spark environment") { + val conf = new LivyConf() + .set(SPARK_ENV_PREFIX + ".default." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(LivyConf.REPL_JARS, "dummy.jar") + session = createSession(livyConf = conf) + session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) + sparkEnvironments.size should be (1) + sparkEnvironments.get("default") should not be (None) + } + + it("should use customized Spark environment") { + val conf = new LivyConf() + .set(SPARK_ENV_PREFIX + ".test." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(LivyConf.REPL_JARS, "dummy.jar") + session = createSession(livyConf = conf, sparkEnv = "test") + session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) + sparkEnvironments.get("test") should not be (None) + } + + it("should pick right Spark environment") { + val conf = new LivyConf() + .set(SPARK_ENV_PREFIX + ".test." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(SPARK_ENV_PREFIX + ".production." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(LivyConf.REPL_JARS, "dummy.jar") + session = createSession(livyConf = conf, sparkEnv = "production") + session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) + sparkEnvironments.get("production") should not be (None) + } + } } diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala index 7e5c66d82..2ae56bea5 100644 --- a/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala @@ -23,6 +23,7 @@ import java.net.URI import org.scalatest.FunSuite import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} +import com.cloudera.livy.utils.SparkEnvironment class SessionSpec extends FunSuite with LivyBaseUnitTestSuite { @@ -89,8 +90,8 @@ class SessionSpec extends FunSuite with LivyBaseUnitTestSuite { val other = Seq("/file2.txt") val expected = Some(Seq("dummy://" + other(0), "dummy://" + base).mkString(",")) - val userLists = Seq(LivyConf.SPARK_JARS, LivyConf.SPARK_FILES, LivyConf.SPARK_ARCHIVES, - LivyConf.SPARK_PY_FILES) + val userLists = Seq(SparkEnvironment.SPARK_JARS, SparkEnvironment.SPARK_FILES, + SparkEnvironment.SPARK_ARCHIVES, SparkEnvironment.SPARK_PY_FILES) val baseConf = userLists.map { key => (key -> base) }.toMap val result = Session.prepareConf(baseConf, other, other, other, other, conf) userLists.foreach { key => assert(result.get(key) === expected) } diff --git a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala index 9981fa241..94a622d30 100644 --- a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala +++ b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala @@ -30,18 +30,22 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu import LivySparkUtils._ private val livyConf = new LivyConf() + private val sparkEnv = SparkEnvironment.createSparkEnv(livyConf, "default") + private val livyConf210 = new LivyConf() - livyConf210.set(LIVY_SPARK_SCALA_VERSION, "2.10.6") + livyConf210.set("livy.spark.scala-version", "2.10.6") + private val sparkEnv210 = SparkEnvironment.createSparkEnv(livyConf210, "default") private val livyConf211 = new LivyConf() - livyConf211.set(LIVY_SPARK_SCALA_VERSION, "2.11.1") + livyConf211.set("livy.spark.scala-version", "2.11.1") + private val sparkEnv211 = SparkEnvironment.createSparkEnv(livyConf211, "default") test("check for SPARK_HOME") { - testSparkHome(livyConf) + testSparkHome(sparkEnv) } test("check spark-submit version") { - testSparkSubmit(livyConf) + testSparkSubmit(sparkEnv) } test("should support Spark 1.6") { @@ -107,35 +111,35 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu } test("sparkScalaVersion() should use spark-submit detected Scala version.") { - sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), livyConf) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), sparkEnv) shouldBe "2.10" + sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), sparkEnv) shouldBe "2.11" } test("sparkScalaVersion() should throw if configured and detected Scala version mismatch.") { intercept[IllegalArgumentException] { - sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), livyConf210) + sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), sparkEnv210) } intercept[IllegalArgumentException] { - sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), livyConf211) + sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), sparkEnv211) } } test("sparkScalaVersion() should use configured Scala version if spark-submit doesn't tell.") { - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf211) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("1.6.0"), None, sparkEnv210) shouldBe "2.10" + sparkScalaVersion(formatSparkVersion("1.6.2"), None, sparkEnv210) shouldBe "2.10" + sparkScalaVersion(formatSparkVersion("2.0.0"), None, sparkEnv210) shouldBe "2.10" + sparkScalaVersion(formatSparkVersion("2.0.1"), None, sparkEnv210) shouldBe "2.10" + sparkScalaVersion(formatSparkVersion("1.6.0"), None, sparkEnv211) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("1.6.2"), None, sparkEnv211) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("2.0.0"), None, sparkEnv211) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("2.0.1"), None, sparkEnv211) shouldBe "2.11" } test("sparkScalaVersion() should use default Spark Scala version.") { - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.1.0"), None, livyConf) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("1.6.0"), None, sparkEnv) shouldBe "2.10" + sparkScalaVersion(formatSparkVersion("1.6.2"), None, sparkEnv) shouldBe "2.10" + sparkScalaVersion(formatSparkVersion("2.0.0"), None, sparkEnv) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("2.0.1"), None, sparkEnv) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("2.1.0"), None, sparkEnv) shouldBe "2.11" } } diff --git a/server/src/test/scala/com/cloudera/livy/utils/SparkEnvironmentSuite.scala b/server/src/test/scala/com/cloudera/livy/utils/SparkEnvironmentSuite.scala new file mode 100644 index 000000000..7c5d3b892 --- /dev/null +++ b/server/src/test/scala/com/cloudera/livy/utils/SparkEnvironmentSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.livy.utils + +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} + +class SparkEnvironmentSuite extends FunSuite + with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite { + import SparkEnvironment._ + + override def afterAll(): Unit = { + // clean the global data when this test suite is finished. + sparkEnvironments.clear() + super.afterAll() + } + + test("default Spark environment") { + val livyConf = new LivyConf(false) + .set("livy.server.spark-home", "test-home") + .set("livy.server.spark-conf-dir", "test-conf-dir") + .set("livy.sparkr.package", "test-sparkr-package") + .set("livy.pyspark.archives", "test-pyspark-archives") + val sparkEnv = createSparkEnv(livyConf, "default") + + sparkEnv.get(SPARK_HOME) should be ("test-home") + sparkEnv.get(SPARK_CONF_DIR) should be ("test-conf-dir") + sparkEnv.get(PYSPARK_ARCHIVES) should be ("test-pyspark-archives") + sparkEnv.get(SPARKR_PACKAGE) should be ("test-sparkr-package") + + sparkEnv.sparkHome() should be ("test-home") + sparkEnv.sparkConfDir() should be ("test-conf-dir") + } + + test("default Spark environment with environment specified") { + val livyConf = new LivyConf(false) + .set(SPARK_ENV_PREFIX + ".default." + SPARK_HOME.key, "test-default-home") + .set(SPARK_ENV_PREFIX + ".default." + SPARK_CONF_DIR.key, "test-default-conf-dir") + val sparkEnv = createSparkEnv(livyConf, "default") + + sparkEnv.get(SPARK_HOME) should be ("test-default-home") + sparkEnv.get(SPARK_CONF_DIR) should be ("test-default-conf-dir") + sparkEnv.get(PYSPARK_ARCHIVES) should be (null) + sparkEnv.get(SPARKR_PACKAGE) should be (null) + + sparkEnv.sparkHome() should be ("test-default-home") + sparkEnv.sparkConfDir() should be ("test-default-conf-dir") + } + + test("default Spark environment using SPARK_HOME environment variable") { + val livyConf = new LivyConf(false) + val sparkEnv = createSparkEnv(livyConf, "default") + + sparkEnv.get(SPARK_HOME) should be (null) + sparkEnv.get(SPARK_CONF_DIR) should be (null) + sparkEnv.get(PYSPARK_ARCHIVES) should be (null) + sparkEnv.get(SPARKR_PACKAGE) should be (null) + + sparkEnv.sparkHome() should not be (null) + sparkEnv.sparkConfDir() should not be (null) + } + + test("specify different Spark environments through configuration") { + val livyConf = new LivyConf(false) + .set(SPARK_ENV_PREFIX + ".test." + SPARK_HOME.key, "test-home") + .set(SPARK_ENV_PREFIX + ".test." + PYSPARK_ARCHIVES.key, "test-home/python/pyspark.tgz") + .set(SPARK_ENV_PREFIX + ".production." + SPARK_HOME.key, "production-home") + .set(SPARK_ENV_PREFIX + ".production." + SPARKR_PACKAGE.key, "production-home/R/sparkr.zip") + .set(SPARK_ENV_PREFIX + ".default." + SPARK_HOME.key, "default-home") + .set(SPARK_ENV_PREFIX + ".default." + SPARK_CONF_DIR.key, "default-conf-dir") + + sparkEnvironments("test") = createSparkEnv(livyConf, "test") + sparkEnvironments("production") = createSparkEnv(livyConf, "production") + sparkEnvironments("default") = createSparkEnv(livyConf, "default") + + val testSparkEnv = getSparkEnv(livyConf, "test") + testSparkEnv.sparkHome() should be ("test-home") + testSparkEnv.sparkConfDir() should be ("test-home/conf") + testSparkEnv.findPySparkArchives() should be (Seq("test-home/python/pyspark.tgz")) + + val prodSparkEnv = getSparkEnv(livyConf, "production") + prodSparkEnv.sparkHome() should be ("production-home") + prodSparkEnv.findSparkRArchive() should be ("production-home/R/sparkr.zip") + prodSparkEnv.sparkConfDir() should be ("production-home/conf") + + val defaultSparkEnv = getSparkEnv(livyConf, "default") + defaultSparkEnv.sparkHome() should be ("default-home") + defaultSparkEnv.sparkConfDir() should be ("default-conf-dir") + } + + test("create non-existed Spark environment") { + val livyConf = new LivyConf(false) + val sparkEnv = createSparkEnv(livyConf, "non-exist") + + sparkEnv.get(SPARK_HOME) should be (null) + sparkEnv.get(SPARK_CONF_DIR) should be (null) + sparkEnv.get(PYSPARK_ARCHIVES) should be (null) + sparkEnv.get(SPARKR_PACKAGE) should be (null) + + intercept[Exception](sparkEnv.sparkHome()) + intercept[Exception](sparkEnv.sparkConfDir()) + } +} diff --git a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala index 1d903677f..45aaebf59 100644 --- a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala @@ -35,7 +35,6 @@ import org.scalatest.FunSpec import org.scalatest.mock.MockitoSugar.mock import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.util.LineBufferedProcess import com.cloudera.livy.utils.SparkApp._ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {