Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark supports spark.conf parameters to configure multiple parameters #4935

Merged
merged 8 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

<Send name="Send" >
<Filters>
<RegexFilter regex=".*Hive-on-MR is deprecated in Hive \d+ and may not be available in the future versions\..*" onMatch="DENY" onMismatch="NEUTRAL"/>
<RegexFilter regex=".*Hadoop command-line option parsing not.*" onMatch="DENY" onMismatch="NEUTRAL"/>
<RegexFilter regex="Group.*is deprecated. Use.*instead" onMatch="DENY" onMismatch="NEUTRAL"/>
<RegexFilter regex="Failed to get files with ID; using regular API.*" onMatch="DENY" onMismatch="NEUTRAL"/>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
</Filters>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
Expand Down Expand Up @@ -97,8 +101,11 @@
<logger name="org.apache.hadoop.mapreduce.Job" level="INFO" additivity="true">
<appender-ref ref="YarnAppIdOutputFile"/>
</logger>
<logger name="org.apache.tez.client.TezClient" level="INFO" additivity="true">
<appender-ref ref="YarnAppIdOutputFile"/>
<logger name="org.apache.hadoop.hive.conf.HiveConf" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
<logger name="org.apache.hadoop.mapreduce.split.JobSplitWriter" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
</loggers>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.{Driver, QueryPlan}
import org.apache.hadoop.hive.ql.exec.Task.TaskState
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper
import org.apache.hadoop.hive.ql.exec.tez.TezJobExecHelper
Expand Down Expand Up @@ -204,9 +205,11 @@ class HiveEngineConnExecutor(
var compileRet = -1
Utils.tryCatch {
compileRet = driver.compile(realCode)
logger.info(s"driver compile realCode : ${realCode} finished, status : ${compileRet}")
logger.info(
s"driver compile realCode : \n ${realCode} \n finished, status : ${compileRet}"
)
if (0 != compileRet) {
logger.warn(s"compile realCode : ${realCode} error status : ${compileRet}")
logger.warn(s"compile realCode : \n ${realCode} \n error status : ${compileRet}")
throw HiveQueryFailedException(
COMPILE_HIVE_QUERY_ERROR.getErrorCode,
COMPILE_HIVE_QUERY_ERROR.getErrorDesc
Expand Down Expand Up @@ -469,17 +472,35 @@ class HiveEngineConnExecutor(
val totalSQLs = engineExecutorContext.getTotalParagraph
val currentSQL = engineExecutorContext.getCurrentParagraph
val currentBegin = (currentSQL - 1) / totalSQLs.asInstanceOf[Float]
HadoopJobExecHelper.runningJobs synchronized {
HadoopJobExecHelper.runningJobs.asScala foreach { runningJob =>
val name = runningJob.getID.toString
val _progress = runningJob.reduceProgress() + runningJob.mapProgress()
singleSqlProgressMap.put(name, _progress / 2)
val finishedStage =
if (null != driver && null != driver.getPlan() && !driver.getPlan().getRootTasks.isEmpty) {
Utils.tryQuietly(
Utilities
.getMRTasks(driver.getPlan().getRootTasks)
.asScala
.count(task => task.isMapRedTask && task.getTaskState == TaskState.FINISHED)
)
} else {
0
}
}
var totalProgress: Float = 0.0f
if (!HadoopJobExecHelper.runningJobs.isEmpty) {
val runningJob = HadoopJobExecHelper.runningJobs.get(0)
val _progress = Utils.tryCatch(runningJob.reduceProgress() + runningJob.mapProgress()) {
case e: Exception =>
logger.info(s"Failed to get job(${runningJob.getJobName}) progress ", e)
0.2f
}
if (!_progress.isNaN) {
totalProgress = _progress / 2
}
}
logger.info(
s"Running stage progress is $totalProgress, and finished stage is $finishedStage"
)
val hiveRunJobs = if (numberOfMRJobs <= 0) 1 else numberOfMRJobs
singleSqlProgressMap.asScala foreach { case (_name, _progress) =>
totalProgress += _progress
if (finishedStage <= hiveRunJobs) {
totalProgress = totalProgress + finishedStage
}
try {
totalProgress = totalProgress / (hiveRunJobs * totalSQLs)
Expand All @@ -488,10 +509,10 @@ class HiveEngineConnExecutor(
case _ => totalProgress = 0.0f
}

logger.debug(s"hive progress is $totalProgress")
val newProgress =
if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin
else totalProgress + currentBegin
logger.info(s"Hive progress is $newProgress, and finished stage is $finishedStage")
val oldProgress = ProgressUtils.getOldProgress(this.engineExecutorContext)
if (newProgress < oldProgress) oldProgress
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.linkis.manager.common.entity.resource.{
LoadResource,
NodeResource
}
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.engineplugin.io.conf.IOEngineConnConfiguration
import org.apache.linkis.manager.engineplugin.io.domain.FSInfo
Expand Down Expand Up @@ -283,29 +284,44 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10)
val fsType = methodEntity.getFsType
val proxyUser = methodEntity.getProxyUser
if (!userFSInfos.containsKey(proxyUser)) {
throw new StorageErrorException(
StorageErrorCode.FS_NOT_INIT.getCode,
s"not exist storage $fsType, please init first."
)
if (methodEntity.getId != -1) {
createUserFS(methodEntity)
} else {
throw new StorageErrorException(
StorageErrorCode.FS_NOT_INIT.getCode,
s"not exist storage $fsType, ${StorageErrorCode.FS_NOT_INIT.getMessage}"
)
}
}
var fs: Fs = null
userFSInfos.get(proxyUser) synchronized {
val userFsInfo = userFSInfos
val userFsInfoOption = userFSInfos
.get(proxyUser)
.find(fsInfo => fsInfo != null && fsInfo.id == methodEntity.getId)
.getOrElse(
throw new StorageErrorException(
StorageErrorCode.FS_NOT_INIT.getCode,
s"not exist storage $fsType, please init first."
)
if (userFsInfoOption.isDefined) {
val userFsInfo = userFsInfoOption.get
userFsInfo.lastAccessTime = System.currentTimeMillis()
fs = userFsInfo.fs
}
}
if (null == fs) {
if (methodEntity.getId != -1) {
createUserFS(methodEntity)
getUserFS(methodEntity)
} else {
throw new StorageErrorException(
StorageErrorCode.FS_NOT_INIT.getCode,
s"not exist storage $fsType, ${StorageErrorCode.FS_NOT_INIT.getMessage}"
)
userFsInfo.lastAccessTime = System.currentTimeMillis()
userFsInfo.fs
}
} else {
fs
}
}

private def createUserFS(methodEntity: MethodEntity): Long = {
logger.info(
s"Creator ${methodEntity.getCreatorUser}准备为用户${methodEntity.getProxyUser}初始化FS:$methodEntity"
s"Creator ${methodEntity.getCreatorUser} for user ${methodEntity.getProxyUser} init fs:$methodEntity"
)
var fsId = methodEntity.getId
val properties = methodEntity.getParams()(0).asInstanceOf[Map[String, String]]
Expand All @@ -330,12 +346,12 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10)
if (!userFsInfo.exists(fsInfo => fsInfo != null && fsInfo.id == methodEntity.getId)) {
val fs = FSFactory.getFs(methodEntity.getFsType)
fs.init(properties.asJava)
fsId = getFSId()
fsId = if (fsId == -1) getFSId() else fsId
userFsInfo += new FSInfo(fsId, fs)
}
}
logger.info(
s"Creator ${methodEntity.getCreatorUser}为用户${methodEntity.getProxyUser}初始化结束 fsId=$fsId"
s"Creator ${methodEntity.getCreatorUser}for user ${methodEntity.getProxyUser} end init fs fsId=$fsId"
)
fsId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.manager.engineplugin.pipeline.executor

import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.ResultSetUtils
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfiguration
import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfiguration.PIPELINE_OUTPUT_ISOVERWRITE_SWITCH
Expand Down Expand Up @@ -84,8 +85,9 @@ class ExcelExecutor extends PipeLineExecutor {
if (fsPathListWithError == null) {
throw new PipeLineErrorException(EMPTY_DIR.getErrorCode, EMPTY_DIR.getErrorDesc)
}
fileSource =
FileSource.create(fsPathListWithError.getFsPaths.toArray(Array[FsPath]()), sourceFs)
val fsPathList = fsPathListWithError.getFsPaths
ResultSetUtils.sortByNameNum(fsPathList)
fileSource = FileSource.create(fsPathList.toArray(Array[FsPath]()), sourceFs)
}
if (!FileSource.isTableResultSet(fileSource)) {
throw new PipeLineErrorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,78 +76,3 @@ object PythonInterpreter {
}

}

object SQLSession extends Logging {

def showDF(sc: SparkContext, jobGroup: String, df: Any, maxResult: Int = Int.MaxValue): String = {
val startTime = System.currentTimeMillis()

val iterator = Utils.tryThrow(df.asInstanceOf[DataFrame].toLocalIterator)(t => {
sc.clearJobGroup()
t
})

var columns: List[Attribute] = null
// get field names
Utils.tryThrow({
val qe = df.getClass.getMethod("queryExecution").invoke(df)
val a = qe.getClass.getMethod("analyzed").invoke(qe)
val seq = a.getClass.getMethod("output").invoke(a).asInstanceOf[Seq[Any]]
columns = seq.toList.asInstanceOf[List[Attribute]]
})(t => {
sc.clearJobGroup()
t
})
var schema = new StringBuilder
schema ++= "%TABLE\n"
val nameSet = new mutable.HashSet[String]()
for (col <- columns) {
nameSet.add(col.name)
schema ++= col.name ++ "\t"
}
val trim = if (nameSet.size < columns.length) {
var schemaWithAlis = new StringBuilder
schemaWithAlis ++= "%TABLE\n"
for (col <- columns) {
val colName = col.qualifiedName
schemaWithAlis ++= colName ++ "\t"
}
logger.info("I AM IN LESS")
logger.info(schemaWithAlis.toString.trim)
schemaWithAlis.toString.trim
} else {
logger.info("I AM IN MORE")
logger.info(schema.toString.trim)
schema.toString.trim
}
val msg = FSFactory.getFs("").write(new FsPath(""), true)
msg.write(trim.getBytes("utf-8"))

var index = 0
Utils.tryThrow({
while (index < maxResult && iterator.hasNext) {
msg.write("\n".getBytes("utf-8"))
val row = iterator.next()
columns.indices.foreach { i =>
if (row.isNullAt(i)) msg.write("NULL".getBytes("utf-8"))
else msg.write(row.apply(i).asInstanceOf[Object].toString.getBytes("utf-8"))
if (i != columns.size - 1) {
msg.write("\t".getBytes("utf-8"))
}
}
index += 1
}
})(t => {
sc.clearJobGroup()
t
})
val colCount = if (columns != null) columns.size else 0
logger.warn(s"Fetched $colCount col(s) : $index row(s).")
sc.clearJobGroup()
Utils.tryFinally({
msg.flush()
msg.toString
}) { () => IOUtils.closeQuietly(msg) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ object SparkConfiguration extends Logging {

val REPLACE_PACKAGE_TO_HEADER = "org.apache.linkis"

val LINKIS_SPARK_CONF = CommonVars[String]("spark.conf", "")
val SPARK_APPLICATION_ARGS = CommonVars("spark.app.args", "")
val SPARK_APPLICATION_MAIN_CLASS = CommonVars("spark.app.main.class", "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ object SQLSession extends Logging {
)
}
val taken = ByteTimeUtils.msDurationToString(System.currentTimeMillis - startTime)
logger.warn(s"Time taken: ${taken}, Fetched $index row(s).")
logger.info(s"Time taken: ${taken}, Fetched $index row(s).")
// to register TempTable
// Utils.tryAndErrorMsg(CSTableRegister.registerTempTable(engineExecutorContext, writer, alias, columns))("Failed to register tmp table:")
engineExecutionContext.appendStdout(
Expand Down Expand Up @@ -178,7 +178,7 @@ object SQLSession extends Logging {
}
.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[\"", "\",\"", "\"]")
case (map: Map[_, _], MapType(kType, vType, _)) =>
map
.map { case (key, value) =>
Expand All @@ -188,7 +188,7 @@ object SQLSession extends Logging {
.sorted
.mkString("{", ",", "}")
case (null, _) => "null"
case (str: String, StringType) => str.replaceAll("\n|\t", " ")
// case (str: String, StringType) => str.replaceAll("\n|\t", " ")
case (double: Double, DoubleType) => nf.format(double)
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
case (other: Any, tpe) => other.toString
Expand All @@ -203,7 +203,7 @@ object SQLSession extends Logging {
}
.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[\"", "\",\"", "\"]")
case (map: Map[_, _], MapType(kType, vType, _)) =>
map
.map { case (key, value) =>
Expand All @@ -213,8 +213,13 @@ object SQLSession extends Logging {
.sorted
.mkString("{", ",", "}")

case (str: String, StringType) => str.replaceAll("\n|\t", " ")
case (double: Double, DoubleType) => nf.format(double)
// case (str: String, StringType) => str.replaceAll("\n|\t", " ")
case (double: Double, DoubleType) =>
if (double.isNaN) {
"NaN"
} else {
nf.format(double)
}
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
case (other: Any, tpe) => other.toString
case _ => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
}
val kind: Kind = getKind
var preCode = code
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}")
)

val isFirstParagraph = (engineExecutorContext.getCurrentParagraph == 1)
if (isFirstParagraph == true) {
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}")
)
}

// Pre-execution hook
var executionHook: SparkPreExecutionHook = null
Utils.tryCatch {
Expand Down Expand Up @@ -138,6 +143,37 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
logger.info("Set jobGroup to " + jobGroup)
sc.setJobGroup(jobGroup, _code, true)

// print job configuration, only the first paragraph
if (isFirstParagraph == true) {
Utils.tryCatch({
val executorNum: Int = sc.getConf.get("spark.executor.instances").toInt
val executorMem: Long =
ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.executor.memory"))
val driverMem: Long = ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.driver.memory"))
val sparkExecutorCores = sc.getConf.get("spark.executor.cores", "2").toInt
val sparkDriverCores = sc.getConf.get("spark.driver.cores", "1").toInt
val queue = sc.getConf.get("spark.yarn.queue")
// with unit if set configuration with unit
// if not set sc get will get the value of spark.yarn.executor.memoryOverhead such as 512(without unit)
val memoryOverhead = sc.getConf.get("spark.executor.memoryOverhead", "1G")

val sb = new StringBuilder
sb.append(s"spark.executor.instances=$executorNum\n")
sb.append(s"spark.executor.memory=${executorMem}G\n")
sb.append(s"spark.driver.memory=${driverMem}G\n")
sb.append(s"spark.executor.cores=$sparkExecutorCores\n")
sb.append(s"spark.driver.cores=$sparkDriverCores\n")
sb.append(s"spark.yarn.queue=$queue\n")
sb.append(s"spark.executor.memoryOverhead=${memoryOverhead}\n")
sb.append("\n")
engineExecutionContext.appendStdout(
LogUtils.generateInfo(s" Your spark job exec with configs:\n${sb.toString()}")
)
})(t => {
logger.warn("Get actual used resource exception", t)
})
}

val response = Utils.tryFinally(runCode(this, _code, engineExecutorContext, jobGroup)) {
// Utils.tryAndWarn(this.engineExecutionContext.pushProgress(1, getProgressInfo("")))
jobGroup = null
Expand Down
Loading
Loading