Skip to content

Commit

Permalink
[SPARK-48773] Document config "spark.default.parallelism" by config b…
Browse files Browse the repository at this point in the history
…uilder framework

### What changes were proposed in this pull request?

Document config "spark.default.parallelism". This is Spark used config but not documented by config builder framework. This config is already in spark website: https://spark.apache.org/docs/latest/configuration.html.

### Why are the changes needed?

Document Spark's config.

### Does this PR introduce _any_ user-facing change?

NO.

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

N/A

Closes apache#47171 from amaliujia/document_spark_default_paramllel.

Authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
amaliujia authored and cloud-fan committed Jul 11, 2024
1 parent 261dbf4 commit 896c15e
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 23 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.math.log10
import scala.reflect.ClassTag
import scala.util.hashing.byteswap32

import org.apache.spark.internal.config
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{CollectionsUtils, Utils}
Expand Down Expand Up @@ -73,7 +74,7 @@ object Partitioner {
None
}

val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
val defaultNumPartitions = if (rdd.context.conf.contains(config.DEFAULT_PARALLELISM.key)) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ package object config {
private[spark] val SPARK_TASK_PREFIX = "spark.task"
private[spark] val LISTENER_BUS_EVENT_QUEUE_PREFIX = "spark.scheduler.listenerbus.eventqueue"

private[spark] val DEFAULT_PARALLELISM =
ConfigBuilder("spark.default.parallelism")
.doc("Default number of partitions in RDDs returned by transformations like " +
"join, reduceByKey, and parallelize when not set by user. " +
"For distributed shuffle operations like reduceByKey and join, the largest number of " +
"partitions in a parent RDD. For operations like parallelize with no parent RDDs, " +
"it depends on the cluster manager. For example in Local mode, it defaults to the " +
"number of cores on the local machine")
.version("0.5.0")
.intConf
.createOptional

private[spark] val RESOURCES_DISCOVERY_PLUGIN =
ConfigBuilder("spark.resources.discoveryPlugin")
.doc("Comma-separated list of class names implementing" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.ExecutorLogUrlHandler
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -707,7 +707,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
conf.getInt(config.DEFAULT_PARALLELISM.key, math.max(totalCoreCount.get(), 2))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] class LocalSchedulerBackend(
}

override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
scheduler.conf.getInt(config.DEFAULT_PARALLELISM.key, totalCores)

override def killTask(
taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test("SPARK-22357 test binaryFiles minPartitions") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
.set("spark.files.openCostInBytes", "0")
.set("spark.default.parallelism", "1"))
.set(DEFAULT_PARALLELISM.key, "1"))

withTempDir { tempDir =>
val tempDirPath = tempDir.getAbsolutePath
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.math.abs

import org.scalatest.PrivateMethodTester

import org.apache.spark.internal.config._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.StatCounter
Expand Down Expand Up @@ -286,9 +287,9 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
}

test("defaultPartitioner when defaultParallelism is set") {
assert(!sc.conf.contains("spark.default.parallelism"))
assert(!sc.conf.contains(DEFAULT_PARALLELISM.key))
try {
sc.conf.set("spark.default.parallelism", "4")
sc.conf.set(DEFAULT_PARALLELISM.key, "4")

val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
val rdd2 = sc.parallelize(Seq((1, 2), (2, 3), (2, 4), (3, 4)))
Expand Down Expand Up @@ -317,7 +318,7 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
assert(partitioner6.numPartitions == sc.defaultParallelism)
assert(partitioner7.numPartitions == sc.defaultParallelism)
} finally {
sc.conf.remove("spark.default.parallelism")
sc.conf.remove(DEFAULT_PARALLELISM.key)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import org.scalatest.PrivateMethodTester

import org.apache.spark.internal.config
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.scheduler.local.LocalSchedulerBackend
Expand Down Expand Up @@ -123,7 +124,7 @@ class SparkContextSchedulerCreationSuite
}

test("local-default-parallelism") {
val conf = new SparkConf().set("spark.default.parallelism", "16")
val conf = new SparkConf().set(config.DEFAULT_PARALLELISM.key, "16")

val sched = createTaskScheduler("local", conf) { sched =>
sched.backend match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.scalatest.Assertions

import org.apache.spark._
import org.apache.spark.Partitioner
import org.apache.spark.internal.config._
import org.apache.spark.util.ArrayImplicits._

class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
Expand Down Expand Up @@ -332,44 +333,44 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}

test("cogroup between multiple RDD when defaultParallelism is set without proper partitioner") {
assert(!sc.conf.contains("spark.default.parallelism"))
assert(!sc.conf.contains(DEFAULT_PARALLELISM.key))
try {
sc.conf.set("spark.default.parallelism", "4")
sc.conf.set(DEFAULT_PARALLELISM.key, "4")
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)), 10)
val joined = rdd1.cogroup(rdd2)
assert(joined.getNumPartitions == sc.defaultParallelism)
} finally {
sc.conf.remove("spark.default.parallelism")
sc.conf.remove(DEFAULT_PARALLELISM.key)
}
}

test("cogroup between multiple RDD when defaultParallelism is set with proper partitioner") {
assert(!sc.conf.contains("spark.default.parallelism"))
assert(!sc.conf.contains(DEFAULT_PARALLELISM.key))
try {
sc.conf.set("spark.default.parallelism", "4")
sc.conf.set(DEFAULT_PARALLELISM.key, "4")
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
.partitionBy(new HashPartitioner(10))
val joined = rdd1.cogroup(rdd2)
assert(joined.getNumPartitions == rdd2.getNumPartitions)
} finally {
sc.conf.remove("spark.default.parallelism")
sc.conf.remove(DEFAULT_PARALLELISM.key)
}
}

test("cogroup between multiple RDD when defaultParallelism is set; with huge number of " +
"partitions in upstream RDDs") {
assert(!sc.conf.contains("spark.default.parallelism"))
assert(!sc.conf.contains(DEFAULT_PARALLELISM.key))
try {
sc.conf.set("spark.default.parallelism", "4")
sc.conf.set(DEFAULT_PARALLELISM.key, "4")
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)))
.partitionBy(new HashPartitioner(10))
val joined = rdd1.cogroup(rdd2)
assert(joined.getNumPartitions == rdd2.getNumPartitions)
} finally {
sc.conf.remove("spark.default.parallelism")
sc.conf.remove(DEFAULT_PARALLELISM.key)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
test("serialized task larger than max RPC message size") {
val conf = new SparkConf
conf.set(RPC_MESSAGE_MAX_SIZE, 1)
conf.set("spark.default.parallelism", "1")
conf.set(DEFAULT_PARALLELISM.key, "1")
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.TaskState._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.SCHEDULER_REVIVE_INTERVAL
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
Expand Down Expand Up @@ -445,7 +445,7 @@ private[spark] class SingleCoreMockBackend(

val cores = 1

override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores)
override def defaultParallelism(): Int = conf.getInt(config.DEFAULT_PARALLELISM.key, cores)

freeCores = cores
val localExecutorId = SparkContext.DRIVER_IDENTIFIER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.internal.config
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -373,7 +374,7 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
val numEdgePartitions = 4
assert(defaultParallelism != numEdgePartitions)
val conf = new org.apache.spark.SparkConf()
.set("spark.default.parallelism", defaultParallelism.toString)
.set(config.DEFAULT_PARALLELISM.key, defaultParallelism.toString)
val sc = new SparkContext("local", "test", conf)
try {
val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkException
import org.apache.spark.internal.config
import org.apache.spark.paths.SparkPath.{fromUrlString => sp}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.util.Utils
class FileSourceStrategySuite extends QueryTest with SharedSparkSession {
import testImplicits._

protected override def sparkConf = super.sparkConf.set("spark.default.parallelism", "1")
protected override def sparkConf = super.sparkConf.set(config.DEFAULT_PARALLELISM.key, "1")

test("unpartitioned table, single partition") {
val table =
Expand Down

0 comments on commit 896c15e

Please sign in to comment.