Skip to content

Commit

Permalink
BatchWriter memory leak.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Hughes <[email protected]>
  • Loading branch information
jnh5y committed Oct 16, 2020
1 parent 8421461 commit 0d1f882
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,26 +101,38 @@ object BatchWriter {
// this is lazy to allow the query plan to be instantiated without pulling back all the batches first
private lazy val inputs: Array[(SimpleFeatureVector, (Int, Int) => Unit)] = {
val builder = Array.newBuilder[(SimpleFeatureVector, (Int, Int) => Unit)]
while (batches.hasNext) {
val vector = SimpleFeatureVector.create(sft, dictionaries, encoding)
RecordBatchLoader.load(vector.underlying, batches.next)

val transfers: Seq[(Int, Int) => Unit] = {
val fromVectors = vector.underlying.getChildrenFromFields
val toVectors = result.underlying.getChildrenFromFields
val builder = Seq.newBuilder[(Int, Int) => Unit]
builder.sizeHint(fromVectors.size())
var i = 0
while (i < fromVectors.size()) {
builder += createTransferPair(fromVectors.get(i), toVectors.get(i))
i += 1
var vector: SimpleFeatureVector = null
try {
while (batches.hasNext) {
vector = SimpleFeatureVector.create(sft, dictionaries, encoding)
val batch = batches.next
RecordBatchLoader.load(vector.underlying, batch)

val transfers: Seq[(Int, Int) => Unit] = {
val fromVectors = vector.underlying.getChildrenFromFields
val toVectors = result.underlying.getChildrenFromFields
val builder = Seq.newBuilder[(Int, Int) => Unit]
builder.sizeHint(fromVectors.size())
var i = 0
while (i < fromVectors.size()) {
builder += createTransferPair(fromVectors.get(i), toVectors.get(i))
i += 1
}
builder.result()
}
builder.result()
val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to))
builder += vector -> transfer
}
val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to))
builder += vector -> transfer
builder.result
} catch {
case t: Throwable =>
CloseWithLogging(result, batches)
builder.result().foreach(_._1.close())
if (vector != null) {
vector.close()
}
throw t
}
builder.result
}

// we do a merge sort of each batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.Date

import org.apache.arrow.vector.ipc.message.IpcOption
import org.junit.runner.RunWith
import org.locationtech.geomesa.arrow.ArrowAllocator
import org.locationtech.geomesa.arrow.io.records.RecordBatchUnloader
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding
import org.locationtech.geomesa.arrow.vector.{ArrowDictionary, SimpleFeatureVector}
Expand All @@ -24,6 +25,7 @@ import org.specs2.runner.JUnitRunner

@RunWith(classOf[JUnitRunner])
class BatchWriterTest extends Specification {
sequential

val sft = SimpleFeatureTypes.createType("test", "name:String,foo:String,dtg:Date,*geom:Point:srid=4326")

Expand All @@ -41,17 +43,7 @@ class BatchWriterTest extends Specification {
"merge sort arrow batches" >> {
val encoding = SimpleFeatureEncoding.min(includeFids = true)
val dictionaries = Map.empty[String, ArrowDictionary]
val batches = WithClose(SimpleFeatureVector.create(sft, dictionaries, encoding)) { vector =>
val unloader = new RecordBatchUnloader(vector, new IpcOption())
Seq(features0, features1, features2).map { features =>
var i = 0
while (i < features.length) {
vector.writer.set(i, features(i))
i += 1
}
unloader.unload(i)
}
}
val batches: Seq[Array[Byte]] = buildBatches(encoding, dictionaries)

val bytes = WithClose(BatchWriter.reduce(sft, dictionaries, encoding, new IpcOption(), Some("dtg" -> false), sorted = false, 10, batches.iterator))(_.reduceLeft(_ ++ _))

Expand All @@ -64,5 +56,50 @@ class BatchWriterTest extends Specification {
features.map(_.getAttributes) mustEqual
(features0 ++ features1 ++ features2).sortBy(_.getAttribute("dtg").asInstanceOf[Date]).map(_.getAttributes)
}

"not leak memory when iteration dies" >> {
val encoding = SimpleFeatureEncoding.min(includeFids = true)
val dictionaries = Map.empty[String, ArrowDictionary]
val batches: Seq[Array[Byte]] = buildBatches(encoding, dictionaries)

val iterator: Iterator[Array[Byte]] = new Iterator[Array[Byte]] {
private val internal = batches.iterator
override def hasNext: Boolean = {
if (internal.hasNext) {
true
} else {
throw new Exception("No more elements!")
}
}

override def next(): Array[Byte] = {
internal.next()
}
}

try {
WithClose(BatchWriter.reduce(sft, dictionaries, encoding, new IpcOption(), Some("dtg" -> false), sorted = false, 10, iterator))(_.reduceLeft(_ ++ _))
} catch {
case _: Exception =>
// The iterator passed in throws an exception. That stops the BatchWriter.
// The goal of this test is to show that off-heap memory is not leaked when that happens.
}
ArrowAllocator.getAllocatedMemory("test") mustEqual 0
}
}

private def buildBatches(encoding: SimpleFeatureEncoding, dictionaries: Map[String, ArrowDictionary]) = {
val batches = WithClose(SimpleFeatureVector.create(sft, dictionaries, encoding)) { vector =>
val unloader = new RecordBatchUnloader(vector, new IpcOption())
Seq(features0, features1, features2).map { features =>
var i = 0
while (i < features.length) {
vector.writer.set(i, features(i))
i += 1
}
unloader.unload(i)
}
}
batches
}
}

0 comments on commit 0d1f882

Please sign in to comment.