Skip to content

Commit

Permalink
[SPARK-17790][SPARKR] Support for parallelizing R data.frame larger t…
Browse files Browse the repository at this point in the history
…han 2GB

## What changes were proposed in this pull request?
If the R data structure that is being parallelized is larger than `INT_MAX` we use files to transfer data to JVM. The serialization protocol mimics Python pickling. This allows us to simply call `PythonRDD.readRDDFromFile` to create the RDD.

I tested this on my MacBook. Following code works with this patch:
```R
intMax <- .Machine$integer.max
largeVec <- 1:intMax
rdd <- SparkR:::parallelize(sc, largeVec, 2)
```

## How was this patch tested?
* [x] Unit tests

Author: Hossein <[email protected]>

Closes #15375 from falaki/SPARK-17790.
  • Loading branch information
falaki authored and Felix Cheung committed Oct 12, 2016
1 parent d5580eb commit 5cc503f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 3 deletions.
45 changes: 43 additions & 2 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ objectFile <- function(sc, path, minPartitions = NULL) {
#' in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster.
#'
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function
#' will write it to disk and send the file name to JVM. Also to make sure each slice is not
#' larger than that limit, number of slices may be increased.
#'
#' @param sc SparkContext to use
#' @param coll collection to parallelize
#' @param numSlices number of partitions to create in the RDD
Expand Down Expand Up @@ -120,6 +124,11 @@ parallelize <- function(sc, coll, numSlices = 1) {
coll <- as.list(coll)
}

sizeLimit <- getMaxAllocationLimit(sc)
objectSize <- object.size(coll)

# For large objects we make sure the size of each slice is also smaller than sizeLimit
numSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
if (numSlices > length(coll))
numSlices <- length(coll)

Expand All @@ -130,12 +139,44 @@ parallelize <- function(sc, coll, numSlices = 1) {
# 2-tuples of raws
serializedSlices <- lapply(slices, serialize, connection = NULL)

jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
"createRDDFromArray", sc, serializedSlices)
# The PRC backend cannot handle arguments larger than 2GB (INT_MAX)
# If serialized data is safely less than that threshold we send it over the PRC channel.
# Otherwise, we write it to a file and send the file name
if (objectSize < sizeLimit) {
jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices)
} else {
fileName <- writeToTempFile(serializedSlices)
jrdd <- tryCatch(callJStatic(
"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)),
finally = {
file.remove(fileName)
})
}

RDD(jrdd, "byte")
}

getMaxAllocationLimit <- function(sc) {
conf <- callJMethod(sc, "getConf")
as.numeric(
callJMethod(conf,
"get",
"spark.r.maxAllocationLimit",
toString(.Machine$integer.max / 10) # Default to a safe value: 200MB
))
}

writeToTempFile <- function(serializedSlices) {
fileName <- tempfile()
conn <- file(fileName, "wb")
for (slice in serializedSlices) {
writeBin(as.integer(length(slice)), conn, endian = "big")
writeBin(slice, conn, endian = "big")
}
close(conn)
fileName
}

#' Include this specified package on all workers
#'
#' This function can be used to include a package on all workers before the
Expand Down
11 changes: 11 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ test_that("create DataFrame from RDD", {
unsetHiveContext()
})

test_that("createDataFrame uses files for large objects", {
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
df <- createDataFrame(iris)

# Resetting the conf back to default value
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
expect_equal(dim(df), dim(iris))
})

test_that("read/write csv as DataFrame", {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
mockLinesCsv <- c("year,make,model,comment,blank",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[r] class RBackendHandler(server: RBackend)
}
} catch {
case e: Exception =>
logError(s"$methodName on $objId failed")
logError(s"$methodName on $objId failed", e)
writeInt(dos, -1)
// Writing the error message of the cause for the exception. This will be returned
// to user in the R process.
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.PythonRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -140,4 +141,16 @@ private[r] object RRDD {
def createRDDFromArray(jsc: JavaSparkContext, arr: Array[Array[Byte]]): JavaRDD[Array[Byte]] = {
JavaRDD.fromRDD(jsc.sc.parallelize(arr, arr.length))
}

/**
* Create an RRDD given a temporary file name. This is used to create RRDD when parallelize is
* called on large R objects.
*
* @param fileName name of temporary file on driver machine
* @param parallelism number of slices defaults to 4
*/
def createRDDFromFile(jsc: JavaSparkContext, fileName: String, parallelism: Int):
JavaRDD[Array[Byte]] = {
PythonRDD.readRDDFromFile(jsc, fileName, parallelism)
}
}

0 comments on commit 5cc503f

Please sign in to comment.