From 0aee601dcc9875928fc526c821985d2890bdc42f Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 14 Nov 2024 15:55:06 +0100 Subject: [PATCH] [SPARK-50153][SQL] Add `name` to `RuleExecutor` to make printing `QueryExecutionMetrics`'s logs clearer ### What changes were proposed in this pull request? The pr aims to add `name` to `RuleExecutor` to make printing `QueryExecutionMetrics`'s logs clearer. Otherwise, the following printing is meaningless (without knowing that `RuleExecutor`'s metric is being output) ```shell 24/10/29 15:12:33 WARN PlanChangeLogger: === Metrics of Executed Rules === Total number of runs: 100 Total time: 0.8585 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms 24/10/29 15:12:33 WARN PlanChangeLogger: === Metrics of Executed Rules === Total number of runs: 196 Total time: 0.78946 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms ``` ### Why are the changes needed? There are many `similar outputs` printed in the log, but it seems difficult for `spark developers` to know which `RuleExecutor` generated them. - Before: ```shell === Metrics of Executed Rules === Total number of runs: 199 Total time: 1.394873 ms Total number of effective runs: 2 Total time of effective runs: 0.916459 ms === Metrics of Executed Rules === Total number of runs: 196 Total time: 0.525134 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms === Metrics of Executed Rules === Total number of runs: 1 Total time: 0.00175 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms === Metrics of Executed Rules === Total number of runs: 166 Total time: 0.876414 ms Total number of effective runs: 1 Total time of effective runs: 0.130166 ms === Metrics of Executed Rules === Total number of runs: 1 Total time: 0.007375 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms ``` - After: ```shell === Metrics of Executed Rules org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 === Total number of runs: 199 Total time: 32.982158 ms Total number of effective runs: 2 Total time of effective runs: 32.067459 ms === Metrics of Executed Rules org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 === Total number of runs: 196 Total time: 0.630705 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms === Metrics of Executed Rules org.apache.spark.sql.catalyst.expressions.codegen.package$ExpressionCanonicalizer === Total number of runs: 1 Total time: 0.105459 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms === Metrics of Executed Rules org.apache.spark.sql.hive.HiveSessionStateBuilder$$anon$1 === Total number of runs: 166 Total time: 2.308457 ms Total number of effective runs: 1 Total time of effective runs: 1.22025 ms === Metrics of Executed Rules org.apache.spark.sql.catalyst.expressions.codegen.package$ExpressionCanonicalizer === Total number of runs: 1 Total time: 0.009166 ms Total number of effective runs: 0 Total time of effective runs: 0.0 ms ``` ### Does this PR introduce _any_ user-facing change? Yes, When `Spark developers` observe the logs printed by `PlanChangeLogger#logMetrics`, their meaning becomes clearer. ### How was this patch tested? Manually check ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48688 from panbingkun/SPARK-50153. Authored-by: panbingkun Signed-off-by: Max Gekk --- .../scala/org/apache/spark/internal/LogKey.scala | 1 + .../spark/sql/catalyst/rules/RuleExecutor.scala | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 12d456a371d07..c365797cec690 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -692,6 +692,7 @@ private[spark] object LogKeys { case object RPC_ENDPOINT_REF extends LogKey case object RPC_MESSAGE_CAPACITY extends LogKey case object RPC_SSL_ENABLED extends LogKey + case object RULE_EXECUTOR_NAME extends LogKey case object RULE_NAME extends LogKey case object RUN_ID extends LogKey case object SCALA_VERSION extends LogKey diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 256e1440122d8..76d36fab2096a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -87,13 +87,13 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { } } - def logMetrics(metrics: QueryExecutionMetrics): Unit = { + def logMetrics(name: String, metrics: QueryExecutionMetrics): Unit = { val totalTime = metrics.time / NANOS_PER_MILLIS.toDouble val totalTimeEffective = metrics.timeEffective / NANOS_PER_MILLIS.toDouble // scalastyle:off line.size.limit val message: MessageWithContext = log""" - |=== Metrics of Executed Rules === + |=== Metrics of Executed Rules ${MDC(RULE_EXECUTOR_NAME, name)} === |Total number of runs: ${MDC(NUM_RULE_OF_RUNS, metrics.numRuns)} |Total time: ${MDC(TOTAL_TIME, totalTime)} ms |Total number of effective runs: ${MDC(NUM_EFFECTIVE_RULE_OF_RUNS, metrics.numEffectiveRuns)} @@ -118,6 +118,12 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { + /** Name for this rule executor, automatically inferred based on class name. */ + protected def name: String = { + val className = getClass.getName + if (className endsWith "$") className.dropRight(1) else className + } + /** * An execution strategy for rules that indicates the maximum number of executions. If the * execution reaches fix point (i.e. converge) before maxIterations, it will stop. @@ -307,7 +313,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan) } - planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics) + planChangeLogger.logMetrics(name, RuleExecutor.getCurrentMetrics() - beforeMetrics) curPlan }