From 0d1f88293cdcc895406ee535e48072c7d0c90447 Mon Sep 17 00:00:00 2001 From: Jim Hughes Date: Thu, 15 Oct 2020 16:52:01 -0400 Subject: [PATCH] BatchWriter memory leak. Signed-off-by: Jim Hughes --- .../geomesa/arrow/io/BatchWriter.scala | 46 +++++++++------ .../geomesa/arrow/io/BatchWriterTest.scala | 59 +++++++++++++++---- 2 files changed, 77 insertions(+), 28 deletions(-) diff --git a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/io/BatchWriter.scala b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/io/BatchWriter.scala index 2cb45b6add64..dbd849645e48 100644 --- a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/io/BatchWriter.scala +++ b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/io/BatchWriter.scala @@ -101,26 +101,38 @@ object BatchWriter { // this is lazy to allow the query plan to be instantiated without pulling back all the batches first private lazy val inputs: Array[(SimpleFeatureVector, (Int, Int) => Unit)] = { val builder = Array.newBuilder[(SimpleFeatureVector, (Int, Int) => Unit)] - while (batches.hasNext) { - val vector = SimpleFeatureVector.create(sft, dictionaries, encoding) - RecordBatchLoader.load(vector.underlying, batches.next) - - val transfers: Seq[(Int, Int) => Unit] = { - val fromVectors = vector.underlying.getChildrenFromFields - val toVectors = result.underlying.getChildrenFromFields - val builder = Seq.newBuilder[(Int, Int) => Unit] - builder.sizeHint(fromVectors.size()) - var i = 0 - while (i < fromVectors.size()) { - builder += createTransferPair(fromVectors.get(i), toVectors.get(i)) - i += 1 + var vector: SimpleFeatureVector = null + try { + while (batches.hasNext) { + vector = SimpleFeatureVector.create(sft, dictionaries, encoding) + val batch = batches.next + RecordBatchLoader.load(vector.underlying, batch) + + val transfers: Seq[(Int, Int) => Unit] = { + val fromVectors = vector.underlying.getChildrenFromFields + val toVectors = result.underlying.getChildrenFromFields + val builder = Seq.newBuilder[(Int, Int) => Unit] + builder.sizeHint(fromVectors.size()) + var i = 0 + while (i < fromVectors.size()) { + builder += createTransferPair(fromVectors.get(i), toVectors.get(i)) + i += 1 + } + builder.result() } - builder.result() + val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to)) + builder += vector -> transfer } - val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to)) - builder += vector -> transfer + builder.result + } catch { + case t: Throwable => + CloseWithLogging(result, batches) + builder.result().foreach(_._1.close()) + if (vector != null) { + vector.close() + } + throw t } - builder.result } // we do a merge sort of each batch diff --git a/geomesa-arrow/geomesa-arrow-gt/src/test/scala/org/locationtech/geomesa/arrow/io/BatchWriterTest.scala b/geomesa-arrow/geomesa-arrow-gt/src/test/scala/org/locationtech/geomesa/arrow/io/BatchWriterTest.scala index 3c30a7ea6253..3a4d687c75bc 100644 --- a/geomesa-arrow/geomesa-arrow-gt/src/test/scala/org/locationtech/geomesa/arrow/io/BatchWriterTest.scala +++ b/geomesa-arrow/geomesa-arrow-gt/src/test/scala/org/locationtech/geomesa/arrow/io/BatchWriterTest.scala @@ -13,6 +13,7 @@ import java.util.Date import org.apache.arrow.vector.ipc.message.IpcOption import org.junit.runner.RunWith +import org.locationtech.geomesa.arrow.ArrowAllocator import org.locationtech.geomesa.arrow.io.records.RecordBatchUnloader import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding import org.locationtech.geomesa.arrow.vector.{ArrowDictionary, SimpleFeatureVector} @@ -24,6 +25,7 @@ import org.specs2.runner.JUnitRunner @RunWith(classOf[JUnitRunner]) class BatchWriterTest extends Specification { + sequential val sft = SimpleFeatureTypes.createType("test", "name:String,foo:String,dtg:Date,*geom:Point:srid=4326") @@ -41,17 +43,7 @@ class BatchWriterTest extends Specification { "merge sort arrow batches" >> { val encoding = SimpleFeatureEncoding.min(includeFids = true) val dictionaries = Map.empty[String, ArrowDictionary] - val batches = WithClose(SimpleFeatureVector.create(sft, dictionaries, encoding)) { vector => - val unloader = new RecordBatchUnloader(vector, new IpcOption()) - Seq(features0, features1, features2).map { features => - var i = 0 - while (i < features.length) { - vector.writer.set(i, features(i)) - i += 1 - } - unloader.unload(i) - } - } + val batches: Seq[Array[Byte]] = buildBatches(encoding, dictionaries) val bytes = WithClose(BatchWriter.reduce(sft, dictionaries, encoding, new IpcOption(), Some("dtg" -> false), sorted = false, 10, batches.iterator))(_.reduceLeft(_ ++ _)) @@ -64,5 +56,50 @@ class BatchWriterTest extends Specification { features.map(_.getAttributes) mustEqual (features0 ++ features1 ++ features2).sortBy(_.getAttribute("dtg").asInstanceOf[Date]).map(_.getAttributes) } + + "not leak memory when iteration dies" >> { + val encoding = SimpleFeatureEncoding.min(includeFids = true) + val dictionaries = Map.empty[String, ArrowDictionary] + val batches: Seq[Array[Byte]] = buildBatches(encoding, dictionaries) + + val iterator: Iterator[Array[Byte]] = new Iterator[Array[Byte]] { + private val internal = batches.iterator + override def hasNext: Boolean = { + if (internal.hasNext) { + true + } else { + throw new Exception("No more elements!") + } + } + + override def next(): Array[Byte] = { + internal.next() + } + } + + try { + WithClose(BatchWriter.reduce(sft, dictionaries, encoding, new IpcOption(), Some("dtg" -> false), sorted = false, 10, iterator))(_.reduceLeft(_ ++ _)) + } catch { + case _: Exception => + // The iterator passed in throws an exception. That stops the BatchWriter. + // The goal of this test is to show that off-heap memory is not leaked when that happens. + } + ArrowAllocator.getAllocatedMemory("test") mustEqual 0 + } + } + + private def buildBatches(encoding: SimpleFeatureEncoding, dictionaries: Map[String, ArrowDictionary]) = { + val batches = WithClose(SimpleFeatureVector.create(sft, dictionaries, encoding)) { vector => + val unloader = new RecordBatchUnloader(vector, new IpcOption()) + Seq(features0, features1, features2).map { features => + var i = 0 + while (i < features.length) { + vector.writer.set(i, features(i)) + i += 1 + } + unloader.unload(i) + } + } + batches } }