diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 760077af6d3e9..b8a43a03d8b62 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -21,5 +21,17 @@ package org.apache.spark.internal * All structured logging keys should be defined here for standardization. */ object LogKey extends Enumeration { - val EXECUTOR_ID, MIN_SIZE, MAX_SIZE = Value + val APPLICATION_ID = Value + val APPLICATION_STATE = Value + val BUCKET = Value + val CONTAINER_ID = Value + val EXECUTOR_ID = Value + val EXIT_CODE = Value + val MAX_EXECUTOR_FAILURES = Value + val MAX_SIZE = Value + val MIN_SIZE = Value + val REMOTE_ADDRESS = Value + val POD_ID = Value + + type LogKey = Value } diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 5765a6eed5420..84b9debb2afda 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -29,6 +29,7 @@ import org.apache.logging.log4j.core.filter.AbstractFilter import org.slf4j.{Logger, LoggerFactory} import org.apache.spark.internal.Logging.SparkShellLoggingFilter +import org.apache.spark.internal.LogKey.LogKey import org.apache.spark.util.SparkClassUtils /** @@ -36,7 +37,7 @@ import org.apache.spark.util.SparkClassUtils * The values of the MDC will be inline in the log message, while the key-value pairs will be * part of the ThreadContext. */ -case class MDC(key: LogKey.Value, value: String) +case class MDC(key: LogKey, value: Any) /** * Wrapper class for log messages that include a logging context. @@ -102,9 +103,9 @@ trait Logging { val context = new java.util.HashMap[String, String]() args.foreach { mdc => - sb.append(mdc.value) + sb.append(mdc.value.toString) if (Logging.isStructuredLoggingEnabled) { - context.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value) + context.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value.toString) } if (processedParts.hasNext) { diff --git a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala new file mode 100644 index 0000000000000..1ac51e236080c --- /dev/null +++ b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.jdk.CollectionConverters._ + +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXIT_CODE + +class MDCSuite + extends AnyFunSuite // scalastyle:ignore funsuite + with Logging { + + test("check MDC message") { + val log = log"This is a log, exitcode ${MDC(EXIT_CODE, 10086)}" + assert(log.message === "This is a log, exitcode 10086") + assert(log.context === Map("exit_code" -> "10086").asJava) + } + + test("custom object as MDC value") { + val cov = CustomObjectValue("spark", 10086) + val log = log"This is a log, exitcode ${MDC(EXIT_CODE, cov)}" + assert(log.message === "This is a log, exitcode CustomObjectValue: spark, 10086") + assert(log.context === Map("exit_code" -> "CustomObjectValue: spark, 10086").asJava) + } + + case class CustomObjectValue(key: String, value: Int) { + override def toString: String = { + "CustomObjectValue: " + key + ", " + value + } + } +} diff --git a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala index 02895f708ff06..a75e01161d27b 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala @@ -34,11 +34,15 @@ class PatternLoggingSuite extends LoggingSuiteBase with BeforeAndAfterAll { s""".*$level $className: This is a log message\n""" } + override def expectedPatternForBasicMsgWithException(level: Level): String = { + s""".*$level $className: This is a log message\n[\\s\\S]*""" + } + override def expectedPatternForMsgWithMDC(level: Level): String = s""".*$level $className: Lost executor 1.\n""" override def expectedPatternForMsgWithMDCAndException(level: Level): String = - s""".*$level $className: Error in executor 1.\njava.lang.RuntimeException: OOM\n.*""" + s""".*$level $className: Error in executor 1.\njava.lang.RuntimeException: OOM\n[\\s\\S]*""" override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = { val pattern = diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index fe42c7fec990e..6bdd932561b5b 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.util import java.io.File @@ -57,14 +58,20 @@ trait LoggingSuiteBase def msgWithMDCAndException: LogEntry = log"Error in executor ${MDC(EXECUTOR_ID, "1")}." - def expectedPatternForBasicMsg(level: Level): String - def msgWithConcat: LogEntry = log"Min Size: ${MDC(MIN_SIZE, "2")}, " + log"Max Size: ${MDC(MAX_SIZE, "4")}. " + log"Please double check." + // test for basic message (without any mdc) + def expectedPatternForBasicMsg(level: Level): String + + // test for basic message and exception + def expectedPatternForBasicMsgWithException(level: Level): String + + // test for message (with mdc) def expectedPatternForMsgWithMDC(level: Level): String + // test for message and exception def expectedPatternForMsgWithMDCAndException(level: Level): String def verifyMsgWithConcat(level: Level, logOutput: String): Unit @@ -79,6 +86,17 @@ trait LoggingSuiteBase } } + test("Basic logging with Exception") { + val exception = new RuntimeException("OOM") + Seq( + (Level.ERROR, () => logError(basicMsg, exception)), + (Level.WARN, () => logWarning(basicMsg, exception)), + (Level.INFO, () => logInfo(basicMsg, exception))).foreach { case (level, logFunc) => + val logOutput = captureLogOutput(logFunc) + assert(expectedPatternForBasicMsgWithException(level).r.matches(logOutput)) + } + } + test("Logging with MDC") { Seq( (Level.ERROR, () => logError(msgWithMDC)), @@ -98,7 +116,7 @@ trait LoggingSuiteBase (Level.INFO, () => logInfo(msgWithMDCAndException, exception))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) - assert(expectedPatternForMsgWithMDCAndException(level).r.findFirstIn(logOutput).isDefined) + assert(expectedPatternForMsgWithMDCAndException(level).r.matches(logOutput)) } } @@ -137,6 +155,22 @@ class StructuredLoggingSuite extends LoggingSuiteBase { }""") } + override def expectedPatternForBasicMsgWithException(level: Level): String = { + compactAndToRegexPattern( + s""" + { + "ts": "", + "level": "$level", + "msg": "This is a log message", + "exception": { + "class": "java.lang.RuntimeException", + "msg": "OOM", + "stacktrace": "" + }, + "logger": "$className" + }""") + } + override def expectedPatternForMsgWithMDC(level: Level): String = { compactAndToRegexPattern( s""" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index ae0eddb105493..f4d80c24d01ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -33,7 +33,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS @@ -143,7 +143,8 @@ class ExecutorPodsAllocator( snapshotsStore.addSubscriber(podAllocationDelay) { executorPodsSnapshot => onNewSnapshots(applicationId, schedulerBackend, executorPodsSnapshot) if (failureTracker.numFailedExecutors > maxNumExecutorFailures) { - logError(s"Max number of executor failures ($maxNumExecutorFailures) reached") + logError(log"Max number of executor failures " + + log"(${MDC(LogKey.MAX_EXECUTOR_FAILURES, maxNumExecutorFailures)}) reached") stopApplication(EXCEED_MAX_EXECUTOR_FAILURES) } } @@ -532,7 +533,8 @@ class ExecutorPodsAllocator( currentTime - creationTime > executorIdleTimeout } catch { case e: Exception => - logError(s"Cannot get the creationTimestamp of the pod: ${state.pod}", e) + logError(log"Cannot get the creationTimestamp of the pod: " + + log"${MDC(LogKey.POD_ID, state.pod)}", e) true } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala index 6636984256451..5f95b8daa66cb 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube +import org.apache.spark.internal.{LogKey, MDC} import org.apache.spark.internal.config.{ARCHIVES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => @@ -326,7 +327,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => s3client.createBucket(createBucketRequest) } catch { case e: Exception => - logError(s"Failed to create bucket $BUCKET", e) + logError(log"Failed to create bucket ${MDC(LogKey.BUCKET, BUCKET)}", e) throw new SparkException(s"Failed to create bucket $BUCKET.", e) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4f1ba3b9ed248..7864760777fcc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -43,7 +43,8 @@ import org.apache.spark.deploy.{ExecutorFailureTracker, SparkHadoopUtil} import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{EXIT_CODE, REMOTE_ADDRESS} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} @@ -745,9 +746,10 @@ private[spark] class ApplicationMaster( case _: InterruptedException => // Reporter thread can interrupt to stop user class case SparkUserAppException(exitCode) => - val msg = s"User application exited with status $exitCode" + val msg = log"User application exited with status " + + log"${MDC(EXIT_CODE, exitCode)}" logError(msg) - finish(FinalApplicationStatus.FAILED, exitCode, msg) + finish(FinalApplicationStatus.FAILED, exitCode, msg.message) case cause: Throwable => logError("User class threw exception: ", cause) finish(FinalApplicationStatus.FAILED, @@ -854,7 +856,8 @@ private[spark] class ApplicationMaster( logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } else { - logError(s"Driver terminated with exit code ${exitCode}! Shutting down. $remoteAddress") + logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, exitCode)}! " + + log"Shutting down. ${MDC(REMOTE_ADDRESS, remoteAddress)}") finish(FinalApplicationStatus.FAILED, exitCode) } } else { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1d10e85485130..6b1aa5cb44362 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -55,7 +55,8 @@ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.APPLICATION_ID import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} @@ -1198,7 +1199,7 @@ private[spark] class Client( getApplicationReport() } catch { case e: ApplicationNotFoundException => - logError(s"Application $appId not found.") + logError(log"Application ${MDC(APPLICATION_ID, appId)} not found.") cleanupStagingDir() return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None) case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 4616417cba314..1660db8903c73 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -40,7 +40,8 @@ import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONTAINER_ID, EXECUTOR_ID} import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -789,7 +790,8 @@ private[yarn] class YarnAllocator( getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() launchingExecutorContainerIds.remove(containerId) if (NonFatal(e)) { - logError(s"Failed to launch executor $executorId on container $containerId", e) + logError(log"Failed to launch executor ${MDC(EXECUTOR_ID, executorId)} " + + log"on container ${MDC(CONTAINER_ID, containerId)}", e) // Assigned container should be released immediately // to avoid unnecessary resource occupation. amClient.releaseAssignedContainer(containerId) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index af41d30c2cdb8..806a73eda76c5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -26,7 +26,8 @@ import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicati import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.APPLICATION_STATE import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -116,8 +117,8 @@ private[spark] class YarnClientSchedulerBackend( try { val YarnAppReport(_, state, diags) = client.monitorApplication(logApplicationReport = false) - logError(s"YARN application has exited unexpectedly with state $state! " + - "Check the YARN application logs for more details.") + logError(log"YARN application has exited unexpectedly with state " + + log"${MDC(APPLICATION_STATE, state)}! Check the YARN application logs for more details.") diags.foreach { err => logError(s"Diagnostics message: $err") }