Skip to content

Commit

Permalink
Previous fixes for Arrow memory leaks. Still need to review.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Hughes <[email protected]>
  • Loading branch information
jnh5y committed Sep 2, 2020
1 parent bdba41c commit e4049c4
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 50 deletions.
4 changes: 4 additions & 0 deletions geomesa-arrow/geomesa-arrow-gt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
<artifactId>geomesa-arrow-gt_2.11</artifactId>

<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-arrow-jts</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.locationtech.geomesa.arrow.io

import com.typesafe.scalalogging.LazyLogging
import org.apache.arrow.vector.ipc.message.IpcOption
import org.locationtech.geomesa.arrow.io.records.{RecordBatchLoader, RecordBatchUnloader}
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding
Expand All @@ -17,6 +18,7 @@ import org.locationtech.geomesa.utils.geotools.SimpleFeatureOrdering
import org.locationtech.geomesa.utils.io.CloseWithLogging
import org.opengis.feature.simple.SimpleFeatureType

import scala.collection.mutable
import scala.math.Ordering

object BatchWriter {
Expand Down Expand Up @@ -78,7 +80,7 @@ object BatchWriter {
batchSize: Int,
batches: CloseableIterator[Array[Byte]],
private var writeHeader: Boolean = true
) extends CloseableIterator[Array[Byte]] {
) extends CloseableIterator[Array[Byte]] with LazyLogging {

private val result = SimpleFeatureVector.create(sft, dictionaries, encoding)
private val unloader = new RecordBatchUnloader(result, ipcOpts)
Expand All @@ -96,41 +98,72 @@ object BatchWriter {
}
}

var count = 0L
var totalBatchSize: Long = 0L

// this is lazy to allow the query plan to be instantiated without pulling back all the batches first
private lazy val inputs: Array[(SimpleFeatureVector, (Int, Int) => Unit)] = {
val builder = Array.newBuilder[(SimpleFeatureVector, (Int, Int) => Unit)]
while (batches.hasNext) {
val vector = SimpleFeatureVector.create(sft, dictionaries, encoding)
RecordBatchLoader.load(vector.underlying, batches.next)

val transfers: Seq[(Int, Int) => Unit] = {
val fromVectors = vector.underlying.getChildrenFromFields
val toVectors = result.underlying.getChildrenFromFields
val builder = Seq.newBuilder[(Int, Int) => Unit]
builder.sizeHint(fromVectors.size())
var i = 0
while (i < fromVectors.size()) {
builder += createTransferPair(fromVectors.get(i), toVectors.get(i))
i += 1
var vector: SimpleFeatureVector = null
try {

while (batches.hasNext) {
vector = SimpleFeatureVector.create(sft, dictionaries, encoding)
val batch = batches.next
count += 1
totalBatchSize += batch.length
RecordBatchLoader.load(vector.underlying, batch)

val transfers: Seq[(Int, Int) => Unit] = {
val fromVectors = vector.underlying.getChildrenFromFields
val toVectors = result.underlying.getChildrenFromFields
val builder = Seq.newBuilder[(Int, Int) => Unit]
builder.sizeHint(fromVectors.size())
var i = 0
while (i < fromVectors.size()) {
builder += createTransferPair(fromVectors.get(i), toVectors.get(i))
i += 1
}
builder.result()
}
builder.result()
val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to))
builder += vector -> transfer
}
val transfer: (Int, Int) => Unit = (from, to) => transfers.foreach(_.apply(from, to))
builder += vector -> transfer
builder.result
} catch {
case e: Exception =>
logger.error(s"Caught exception ${e.getClass} while build 'inputs'. Opened $count vectors of total size: $totalBatchSize")
// logger.error("Closing 'result' and 'batches' first")
CloseWithLogging(result, batches)

// Trying to clean up
// logger.error("Closed the intermediate things. Trying to close 'result' and 'batches'")
val cleanup = builder.result()
cleanup.foreach(_._1.close())
// logger.error("Closing the var 'vector'")
if (vector != null) {
vector.close()
}

// logger.error("Ideally done cleaning up.")

throw e
case t: Throwable =>
logger.error(s"Caught a throwable $t")
throw t
}
builder.result
}

// we do a merge sort of each batch
// sorted queue of [(current batch value, current index in that batch, number of the batch)]
private lazy val queue = {
private lazy val queue: mutable.PriorityQueue[(AnyRef, Int, Int)] = {
// populate with the first element from each batch
// note: need to flip ordering here as highest sorted values come off the queue first
val order = if (reverse) { ordering } else { ordering.reverse }
val heads = scala.collection.mutable.PriorityQueue.empty[(AnyRef, Int, Int)](order)
var i = 0
while (i < inputs.length) {
val vector = inputs(i)._1
val vector: SimpleFeatureVector = inputs(i)._1
if (vector.reader.getValueCount > 0) {
heads.+=((getSortAttribute(vector, 0), 0, i))
} else {
Expand All @@ -143,36 +176,44 @@ object BatchWriter {

// gets the next record batch to write - returns null if no further records
private def nextBatch(): Array[Byte] = {
if (queue.isEmpty) { null } else {
result.clear()
var resultIndex = 0
// copy the next sorted value and then queue and sort the next element out of the batch we copied from
while (queue.nonEmpty && resultIndex < batchSize) {
val (_, i, batch) = queue.dequeue()
val (vector, transfer) = inputs(batch)
transfer.apply(i, resultIndex)
result.underlying.setIndexDefined(resultIndex)
resultIndex += 1
val nextBatchIndex = i + 1
if (vector.reader.getValueCount > nextBatchIndex) {
val value = getSortAttribute(vector, nextBatchIndex)
queue.+=((value, nextBatchIndex, batch))
try {
if (queue.isEmpty) { null } else {
result.clear()
var resultIndex = 0

// copy the next sorted value and then queue and sort the next element out of the batch we copied from
while (queue.nonEmpty && resultIndex < batchSize) {
val (_, i, batch) = queue.dequeue()
val (vector: SimpleFeatureVector, transfer) = inputs(batch)
transfer.apply(i, resultIndex)
result.underlying.setIndexDefined(resultIndex)
resultIndex += 1
val nextBatchIndex = i + 1
if (vector.reader.getValueCount > nextBatchIndex) {
val value = getSortAttribute(vector, nextBatchIndex)
queue.+=((value, nextBatchIndex, batch))
} else {
CloseWithLogging(vector)
}
}

if (writeHeader) {
writeHeader = false
writeHeaderAndFirstBatch(result, dictionaries, ipcOpts, Some(sortBy -> reverse), resultIndex)
} else {
CloseWithLogging(vector)
unloader.unload(resultIndex)
}
}
if (writeHeader) {
writeHeader = false
writeHeaderAndFirstBatch(result, dictionaries, ipcOpts, Some(sortBy -> reverse), resultIndex)
} else {
unloader.unload(resultIndex)
}
} catch {
case e: Exception =>
// logger.error(s"Caught exception in the BatchWriter", e)
throw e
}
}

override def hasNext: Boolean = {
if (batch == null) {
batch = nextBatch()
batch = nextBatch()
}
batch != null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ object DeltaWriter extends StrictLogging {
deltas: CloseableIterator[Array[Byte]]
) extends CloseableIterator[Array[Byte]] {

private lazy val reduced = {
private lazy val reduced: CloseableIterator[Array[Byte]] = {
try {
val grouped = scala.collection.mutable.Map.empty[Long, scala.collection.mutable.ArrayBuilder[Array[Byte]]]
while (deltas.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import java.io.{Closeable, Flushable, OutputStream}
import java.nio.channels.Channels

import com.typesafe.scalalogging.LazyLogging
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.{Dictionary, DictionaryProvider}
import org.apache.arrow.vector.ipc.ArrowStreamWriter
import org.apache.arrow.vector.ipc.message.IpcOption
Expand Down Expand Up @@ -39,7 +40,7 @@ class SimpleFeatureArrowFileWriter private (
) extends Closeable with Flushable with LazyLogging {

private val metadata = sort.map { case (field, reverse) => getSortAsMetadata(field, reverse) }.orNull
private val root = createRoot(vector.underlying, metadata)
private val root: VectorSchemaRoot = createRoot(vector.underlying, metadata)
private val writer = new ArrowStreamWriter(root, provider, Channels.newChannel(os), ipcOpts)

private var index = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

package org.locationtech.geomesa

import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executors, TimeUnit}

import com.typesafe.scalalogging.LazyLogging
import io.netty.util.internal.PlatformDependent
import org.apache.arrow.memory.{AllocationListener, BufferAllocator, RootAllocator}
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding
import org.locationtech.geomesa.features.serialization.ObjectType.ObjectType
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty
Expand All @@ -18,15 +23,83 @@ import org.opengis.feature.simple.SimpleFeatureType

package object arrow {

val id = new AtomicInteger(0)
// need to be lazy to avoid class loading issues before init is called
lazy val ArrowEncodedSft: SimpleFeatureType =
SimpleFeatureTypes.createType("arrow", "batch:Bytes,*geom:Point:srid=4326")

object ArrowAllocator {
object ArrowAllocator extends LazyLogging {

val listener: AllocationListener = new AllocationListener with LazyLogging {
// override def onPreAllocation(size: Long): Unit = {
// println(s"Root is being called with onPreAllocation with argument $size")
// }
//
// override def onAllocation(size: Long): Unit = {
// println(s"Root is being called with onAllocation with argument $size")
// }
// override def onRelease(size: Long): Unit = {
// println(s"Root is being called with onRelease with argument $size")
// }
// override def onFailedAllocation(size: Long, outcome: AllocationOutcome): Boolean = {
// println(s"onFailedAllocation has been called")
// super.onFailedAllocation(size, outcome)
// }

override def onChildAdded(parentAllocator: BufferAllocator, childAllocator: BufferAllocator): Unit = {
logger.info(s"child allocator ${childAllocator.getName} has been added to ${parentAllocator.getName} in thread ${Thread.currentThread.getName}")
// val e = new Exception("Get Allocation Stack")
// logger.info(s"Creating allocator ${childAllocator.getName} in thread ${Thread.currentThread.getName} with stack ${e.getStackTrace.take(50).mkString("\n\t")}")

}

override def onChildRemoved(parentAllocator: BufferAllocator, childAllocator: BufferAllocator): Unit = {
logger.info(s"child allocator ${childAllocator.getName} has been removed from ${parentAllocator.getName} in thread ${Thread.currentThread.getName} ")
// if (childAllocator.getName.startsWith("simple-feature-vector")) {
// val e = new Exception("Get Removal Stack")
// logger.info(s"Removing allocator ${childAllocator.getName} in thread ${Thread.currentThread.getName} with stack ${e.getStackTrace.take(50).mkString("\n\t")}")
// }
}
}

private val root = new RootAllocator(listener, Long.MaxValue) // JNH: With lots of logging.
//private val root = new RootAllocator(Long.MaxValue)

sys.addShutdownHook({
logger.error(s"At shutdown root arrow status: ${root.toVerboseString}")
//println(s"Root arrow status: ${root.toVerboseString}")
CloseWithLogging(root)
})


private val es = Executors.newSingleThreadScheduledExecutor()
es.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
logger.error(s"Direct Memory status: MAX_DIRECT_MEMORY: ${PlatformDependent.maxDirectMemory()} DIRECT_MEMORY_COUNTER: ${getNettyMemoryCounter}")

logger.error(s"Root arrow status: ${root.toVerboseString}")

}
}, 0, 1, TimeUnit.MINUTES)

private val root = new RootAllocator(Long.MaxValue)

sys.addShutdownHook(CloseWithLogging(root))
def getNettyMemoryCounter: Long = {
try {
val clazz = try {
Class.forName("io.netty.util.internal.PlatformDependent")
} catch {
case _: Throwable =>
Class.forName("org.locationtech.geomesa.accumulo.shade.io.netty.util.internal.PlatformDependent")
}
val field = clazz.getDeclaredField("DIRECT_MEMORY_COUNTER")
field.setAccessible(true)
field.get(clazz).asInstanceOf[java.util.concurrent.atomic.AtomicLong].get()
} catch {
case t: Throwable =>
logger.error("failed to get DIRECT_MEMORY_COUNTER", t)
-1
}
}

/**
* Gets a new allocator from the root allocator. Allocator should be `close`d after use.
Expand All @@ -37,7 +110,9 @@ package object arrow {
* @param name name of the allocator, for bookkeeping
* @return
*/
def apply(name: String): BufferAllocator = root.newChildAllocator(name, 0L, Long.MaxValue)
def apply(name: String): BufferAllocator = {
root.newChildAllocator(s"$name-${id.getAndIncrement()}", 0L, Long.MaxValue)
}

/**
* Forwards the getAllocatedMemory from the root Arrow Allocator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.arrow.vector

import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Date}

import org.apache.arrow.memory.BufferAllocator
Expand Down Expand Up @@ -46,6 +47,7 @@ class SimpleFeatureVector private [arrow] (
val encoding: SimpleFeatureEncoding,
allocator: Option[BufferAllocator]
) extends Closeable {
private val closed: AtomicBoolean = new AtomicBoolean(false)

// note: writer creates the map child vectors based on the sft, and should be instantiated before the reader
val writer = new Writer()
Expand All @@ -56,7 +58,24 @@ class SimpleFeatureVector private [arrow] (
*/
def clear(): Unit = underlying.setValueCount(0)

override def close(): Unit = CloseWithLogging.raise(Seq(underlying) ++ allocator)
override def close(): Unit = {
if(!closed.get()) {
closed.set(true)
CloseWithLogging.raise(Seq(underlying) ++ allocator)
} else {
println("Closed has been called twice!")
}
}


override def finalize(): Unit = {
if (!closed.get()) {
println(s"GOT AN UNCLOSED SimpleFeatureVector in a finalize. Underlying allocator's name: ${underlying.getAllocator.getName} Allocator:? ${allocator.map(_.getName)}")
} else {
println(s"Finalizing a closed SimpleFeatureVector. Underlying allocator's name: ${underlying.getAllocator.getName} Allocator:? ${allocator.map(_.getName)}")
}
super.finalize()
}

class Writer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ object LocalQueryRunner {
ArrowScan.createDictionaries(stats, sft, filter, dictionaryFields, providedDictionaries, cachedDictionaries)
}

// JNH: Wrap in WithClose approach?
val vector = SimpleFeatureVector.create(arrowSft, dictionaries, encoding)
val batchWriter = new RecordBatchUnloader(vector, ipcOpts)

Expand Down

0 comments on commit e4049c4

Please sign in to comment.