Skip to content

Commit

Permalink
Previous fixes for Arrow memory leaks. Still need to review.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Hughes <[email protected]>
  • Loading branch information
jnh5y committed Sep 4, 2020
1 parent adf5cb7 commit 7a32a4c
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 57 deletions.
4 changes: 4 additions & 0 deletions geomesa-arrow/geomesa-arrow-gt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
<artifactId>geomesa-arrow-gt_2.11</artifactId>

<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-arrow-jts</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.locationtech.geomesa.arrow.io

import com.typesafe.scalalogging.LazyLogging
import org.apache.arrow.vector.ipc.message.IpcOption
import org.locationtech.geomesa.arrow.io.records.{RecordBatchLoader, RecordBatchUnloader}
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding
Expand All @@ -17,6 +18,7 @@ import org.locationtech.geomesa.utils.geotools.SimpleFeatureOrdering
import org.locationtech.geomesa.utils.io.CloseWithLogging
import org.opengis.feature.simple.SimpleFeatureType

import scala.collection.mutable
import scala.math.Ordering

object BatchWriter {
Expand Down Expand Up @@ -78,7 +80,7 @@ object BatchWriter {
batchSize: Int,
batches: CloseableIterator[Array[Byte]],
private var writeHeader: Boolean = true
) extends CloseableIterator[Array[Byte]] {
) extends CloseableIterator[Array[Byte]] with LazyLogging {

private val result = SimpleFeatureVector.create(sft, dictionaries, encoding)
private val unloader = new RecordBatchUnloader(result, ipcOpts)
Expand All @@ -96,41 +98,72 @@ object BatchWriter {
}
}

var count = 0L
var totalBatchSize: Long = 0L

// this is lazy to allow the query plan to be instantiated without pulling back all the batches first
private lazy val inputs: Array[(SimpleFeatureVector, (Int, Int) => Unit)] = {
val builder = Array.newBuilder[(SimpleFeatureVector, (Int, Int) => Unit)]
while (batches.hasNext) {
val vector = SimpleFeatureVector.create(sft, dictionaries, encoding)
RecordBatchLoader.load(vector.underlying, batches.next)

val transfers: Seq[(Int, Int) => Unit] = {
val fromVectors = vector.underlying.getChildrenFromFields
val toVectors = result.underlying.getChildrenFromFields
val builder = Seq.newBuilder[(Int, Int) => Unit]
builder.sizeHint(fromVectors.size())
var i = 0
while (i < fromVectors.size()) {
builder += createTransferPair(fromVectors.get(i), toVectors.get(i))
i += 1
var vector: SimpleFeatureVector = null
try {

while (batches.hasNext) {
vector = SimpleFeatureVector.create(sft, dictionaries, encoding)
val batch = batches.next
count += 1
totalBatchSize += batch.length
RecordBatchLoader.load(vector.underlying, batch)

val transfers: Seq[(Int, Int) => Unit] = {
val fromVectors = vector.underlying.getChildrenFromFields
val toVectors = result.underlying.getChildrenFromFields
val builder = Seq.newBuilder[(Int, Int) => Unit]
builder.sizeHint(fromVectors.size())
var i = 0
while (i < fromVectors.size()) {
builder += createTransferPair(fromVectors.get(i), toVectors.get(i))
i += 1
}
builder.result()
}
builder.result()
val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to))
builder += vector -> transfer
}
val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to))
builder += vector -> transfer
builder.result
} catch {
case e: Exception =>
logger.error(s"Caught exception ${e.getClass} while build 'inputs'. Opened $count vectors of total size: $totalBatchSize")
// logger.error("Closing 'result' and 'batches' first")
CloseWithLogging(result, batches)

// Trying to clean up
// logger.error("Closed the intermediate things. Trying to close 'result' and 'batches'")
val cleanup = builder.result()
cleanup.foreach(_._1.close())
// logger.error("Closing the var 'vector'")
if (vector != null) {
vector.close()
}

// logger.error("Ideally done cleaning up.")

throw e
case t: Throwable =>
logger.error(s"Caught a throwable $t")
throw t
}
builder.result
}

// we do a merge sort of each batch
// sorted queue of [(current batch value, current index in that batch, number of the batch)]
private lazy val queue = {
private lazy val queue: mutable.PriorityQueue[(AnyRef, Int, Int)] = {
// populate with the first element from each batch
// note: need to flip ordering here as highest sorted values come off the queue first
val order = if (reverse) { ordering } else { ordering.reverse }
val heads = scala.collection.mutable.PriorityQueue.empty[(AnyRef, Int, Int)](order)
var i = 0
while (i < inputs.length) {
val vector = inputs(i)._1
val vector: SimpleFeatureVector = inputs(i)._1
if (vector.reader.getValueCount > 0) {
heads.+=((getSortAttribute(vector, 0), 0, i))
} else {
Expand All @@ -143,36 +176,44 @@ object BatchWriter {

// gets the next record batch to write - returns null if no further records
private def nextBatch(): Array[Byte] = {
if (queue.isEmpty) { null } else {
result.clear()
var resultIndex = 0
// copy the next sorted value and then queue and sort the next element out of the batch we copied from
while (queue.nonEmpty && resultIndex < batchSize) {
val (_, i, batch) = queue.dequeue()
val (vector, transfer) = inputs(batch)
transfer.apply(i, resultIndex)
result.underlying.setIndexDefined(resultIndex)
resultIndex += 1
val nextBatchIndex = i + 1
if (vector.reader.getValueCount > nextBatchIndex) {
val value = getSortAttribute(vector, nextBatchIndex)
queue.+=((value, nextBatchIndex, batch))
try {
if (queue.isEmpty) { null } else {
result.clear()
var resultIndex = 0

// copy the next sorted value and then queue and sort the next element out of the batch we copied from
while (queue.nonEmpty && resultIndex < batchSize) {
val (_, i, batch) = queue.dequeue()
val (vector: SimpleFeatureVector, transfer) = inputs(batch)
transfer.apply(i, resultIndex)
result.underlying.setIndexDefined(resultIndex)
resultIndex += 1
val nextBatchIndex = i + 1
if (vector.reader.getValueCount > nextBatchIndex) {
val value = getSortAttribute(vector, nextBatchIndex)
queue.+=((value, nextBatchIndex, batch))
} else {
CloseWithLogging(vector)
}
}

if (writeHeader) {
writeHeader = false
writeHeaderAndFirstBatch(result, dictionaries, ipcOpts, Some(sortBy -> reverse), resultIndex)
} else {
CloseWithLogging(vector)
unloader.unload(resultIndex)
}
}
if (writeHeader) {
writeHeader = false
writeHeaderAndFirstBatch(result, dictionaries, ipcOpts, Some(sortBy -> reverse), resultIndex)
} else {
unloader.unload(resultIndex)
}
} catch {
case e: Exception =>
// logger.error(s"Caught exception in the BatchWriter", e)
throw e
}
}

override def hasNext: Boolean = {
if (batch == null) {
batch = nextBatch()
batch = nextBatch()
}
batch != null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ object ConcatenatedFileWriter {
// NB: This is not a WithClose situation.
// If there is an empty/non-exceptional iterator, we wish to use it.
// If there are any issues, we wish to close the iterator to free memory.
try {
// try {
if (files.hasNext) {
files
} else {
generateEmptyResponse(sft, dictionaryFields, encoding, ipcOpts, sort)
}
} catch {
case t: Throwable =>
files.close()
throw t
}
// } catch {
// case t: Throwable =>
// files.close()
// throw t
// }
}

private def generateEmptyResponse(sft: SimpleFeatureType, dictionaryFields: Seq[String], encoding: SimpleFeatureEncoding, ipcOpts: IpcOption, sort: Option[(String, Boolean)]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ object DeltaWriter extends StrictLogging {
deltas: CloseableIterator[Array[Byte]]
) extends CloseableIterator[Array[Byte]] {

private lazy val reduced = {
private lazy val reduced: CloseableIterator[Array[Byte]] = {
try {
val grouped = scala.collection.mutable.Map.empty[Long, scala.collection.mutable.ArrayBuilder[Array[Byte]]]
while (deltas.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import java.io.{Closeable, Flushable, OutputStream}
import java.nio.channels.Channels

import com.typesafe.scalalogging.LazyLogging
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.{Dictionary, DictionaryProvider}
import org.apache.arrow.vector.ipc.ArrowStreamWriter
import org.apache.arrow.vector.ipc.message.IpcOption
Expand Down Expand Up @@ -39,7 +40,7 @@ class SimpleFeatureArrowFileWriter private (
) extends Closeable with Flushable with LazyLogging {

private val metadata = sort.map { case (field, reverse) => getSortAsMetadata(field, reverse) }.orNull
private val root = createRoot(vector.underlying, metadata)
private val root: VectorSchemaRoot = createRoot(vector.underlying, metadata)
private val writer = new ArrowStreamWriter(root, provider, Channels.newChannel(os), ipcOpts)

private var index = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ package org.locationtech.geomesa

import java.util.concurrent.atomic.AtomicInteger

import java.util.concurrent.{Executors, TimeUnit}

import com.typesafe.scalalogging.LazyLogging
import io.netty.util.internal.PlatformDependent
import org.apache.arrow.memory.{AllocationListener, BufferAllocator, RootAllocator}
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding
import org.locationtech.geomesa.features.serialization.ObjectType.ObjectType
Expand Down Expand Up @@ -54,12 +58,80 @@ package object arrow {
}
}

object ArrowAllocator {


private val root = new RootAllocator(DelegatingAllocationListener, Long.MaxValue)
object ArrowAllocator extends LazyLogging {

val listener: AllocationListener = new AllocationListener with LazyLogging {
// override def onPreAllocation(size: Long): Unit = {
// println(s"Root is being called with onPreAllocation with argument $size")
// }
//
// override def onAllocation(size: Long): Unit = {
// println(s"Root is being called with onAllocation with argument $size")
// }
// override def onRelease(size: Long): Unit = {
// println(s"Root is being called with onRelease with argument $size")
// }
// override def onFailedAllocation(size: Long, outcome: AllocationOutcome): Boolean = {
// println(s"onFailedAllocation has been called")
// super.onFailedAllocation(size, outcome)
// }

override def onChildAdded(parentAllocator: BufferAllocator, childAllocator: BufferAllocator): Unit = {
logger.info(s"child allocator ${childAllocator.getName} has been added to ${parentAllocator.getName} in thread ${Thread.currentThread.getName}")
// val e = new Exception("Get Allocation Stack")
// logger.info(s"Creating allocator ${childAllocator.getName} in thread ${Thread.currentThread.getName} with stack ${e.getStackTrace.take(50).mkString("\n\t")}")

}

override def onChildRemoved(parentAllocator: BufferAllocator, childAllocator: BufferAllocator): Unit = {
logger.info(s"child allocator ${childAllocator.getName} has been removed from ${parentAllocator.getName} in thread ${Thread.currentThread.getName} ")
// if (childAllocator.getName.startsWith("simple-feature-vector")) {
// val e = new Exception("Get Removal Stack")
// logger.info(s"Removing allocator ${childAllocator.getName} in thread ${Thread.currentThread.getName} with stack ${e.getStackTrace.take(50).mkString("\n\t")}")
// }
}
}

sys.addShutdownHook(CloseWithLogging(root))
private val root = new RootAllocator(DelegatingAllocationListener, Long.MaxValue) // JNH: With lots of logging.
//private val root = new RootAllocator(Long.MaxValue)

sys.addShutdownHook({
logger.error(s"At shutdown root arrow status: ${root.toVerboseString}")
//println(s"Root arrow status: ${root.toVerboseString}")
CloseWithLogging(root)
})


private val es = Executors.newSingleThreadScheduledExecutor()
es.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
logger.error(s"Direct Memory status: MAX_DIRECT_MEMORY: ${PlatformDependent.maxDirectMemory()} DIRECT_MEMORY_COUNTER: ${getNettyMemoryCounter}")

logger.error(s"Root arrow status: ${root.toVerboseString}")

}
}, 0, 1, TimeUnit.MINUTES)

//>>>>>>> e4049c4204... Previous fixes for Arrow memory leaks. Still need to review.

def getNettyMemoryCounter: Long = {
try {
val clazz = try {
Class.forName("io.netty.util.internal.PlatformDependent")
} catch {
case _: Throwable =>
Class.forName("org.locationtech.geomesa.accumulo.shade.io.netty.util.internal.PlatformDependent")
}
val field = clazz.getDeclaredField("DIRECT_MEMORY_COUNTER")
field.setAccessible(true)
field.get(clazz).asInstanceOf[java.util.concurrent.atomic.AtomicLong].get()
} catch {
case t: Throwable =>
logger.error("failed to get DIRECT_MEMORY_COUNTER", t)
-1
}
}

/**
* Gets a new allocator from the root allocator. Allocator should be `close`d after use.
Expand All @@ -70,8 +142,6 @@ package object arrow {
* @param name name of the allocator, for bookkeeping
* @return
*/
//def apply(name: String): BufferAllocator = root.newChildAllocator(name, 0L, Long.MaxValue)

def apply(name: String): BufferAllocator = {
root.newChildAllocator(s"$name-${id.getAndIncrement()}", 0L, Long.MaxValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.arrow.vector

import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Date}

import org.apache.arrow.memory.BufferAllocator
Expand Down Expand Up @@ -46,6 +47,7 @@ class SimpleFeatureVector private [arrow] (
val encoding: SimpleFeatureEncoding,
allocator: Option[BufferAllocator]
) extends Closeable {
private val closed: AtomicBoolean = new AtomicBoolean(false)

// note: writer creates the map child vectors based on the sft, and should be instantiated before the reader
val writer = new Writer()
Expand All @@ -56,7 +58,24 @@ class SimpleFeatureVector private [arrow] (
*/
def clear(): Unit = underlying.setValueCount(0)

override def close(): Unit = CloseWithLogging.raise(Seq(underlying) ++ allocator)
override def close(): Unit = {
if(!closed.get()) {
closed.set(true)
CloseWithLogging.raise(Seq(underlying) ++ allocator)
} else {
println("Closed has been called twice!")
}
}


override def finalize(): Unit = {
if (!closed.get()) {
println(s"GOT AN UNCLOSED SimpleFeatureVector in a finalize. Underlying allocator's name: ${underlying.getAllocator.getName} Allocator:? ${allocator.map(_.getName)}")
} else {
println(s"Finalizing a closed SimpleFeatureVector. Underlying allocator's name: ${underlying.getAllocator.getName} Allocator:? ${allocator.map(_.getName)}")
}
super.finalize()
}

class Writer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ object LocalQueryRunner {
ArrowScan.createDictionaries(stats, sft, filter, dictionaryFields, providedDictionaries, cachedDictionaries)
}

// JNH: Wrap in WithClose approach?
val vector = SimpleFeatureVector.create(arrowSft, dictionaries, encoding)
val batchWriter = new RecordBatchUnloader(vector, ipcOpts)

Expand Down

0 comments on commit 7a32a4c

Please sign in to comment.