Skip to content

Commit

Permalink
GEOMESA-3362 Adds optional flattened arrow output (#3128)
Browse files Browse the repository at this point in the history
  • Loading branch information
malinsinbigler authored Jul 24, 2024
1 parent 57d446b commit 7c8e818
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 38 deletions.
8 changes: 8 additions & 0 deletions docs/user/datastores/analytic_queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ following query hints:
+-------------------------------------+--------------------+------------------------------------+
| QueryHints.ARROW_PROCESS_DELTAS | Boolean (optional) | processDeltas |
+-------------------------------------+--------------------+------------------------------------+
| QueryHints.ARROW_FLATTEN_STRUCT | Boolean (optional) | flattenStruct |
+-------------------------------------+--------------------+------------------------------------+

Explanation of Hints
++++++++++++++++++++
Expand Down Expand Up @@ -328,6 +330,12 @@ This is an advanced hint, which can be used to disable normal processing on Arro
data will be returned in a custom binary format, and needs to be processed before it can be read by standard
Arrow libraries. When returned un-processed, data can begin streaming back to the client immediately.

ARROW_FLATTEN_STRUCT
^^^^^^^^^^^^^^^^^^^^

This hint will remove the outer struct named after the feature type and will instead return the attribute fields directly
in the RecordBatch. Note that this hint is currently only supported for PostGIS and geoserver native stores.

Example Query
+++++++++++++

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ class ArrowConversionProcessTest extends TestWithFeatureType {

"ArrowConversionProcess" should {
"encode an empty feature collection" in {
val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()) must beEmpty
}
}

"encode an empty accumulo feature collection" in {
val bytes = process.execute(fs.getFeatures(ECQL.toFilter("bbox(geom,20,20,30,30)")), null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(ECQL.toFilter("bbox(geom,20,20,30,30)")), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()) must beEmpty
}
}

"encode an accumulo feature collection in distributed fashion" in {
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must
Expand All @@ -70,7 +70,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType {

"encode an accumulo feature collection in distributed fashion with calculated dictionary values" in {
val filter = ECQL.toFilter("name = 'name0'")
val bytes = process.execute(fs.getFeatures(filter), null, null, null, Collections.singletonList("name"), null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(filter), null, null, null, Collections.singletonList("name"), null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must
Expand All @@ -81,15 +81,15 @@ class ArrowConversionProcessTest extends TestWithFeatureType {
}

"sort and encode an accumulo feature collection in distributed fashion" in {
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toList mustEqual features
}
}

"reverse sort and encode an accumulo feature collection in distributed fashion" in {
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", Boolean.box(true), null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", Boolean.box(true), null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toList mustEqual features.reverse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class DictionaryBuildingWriter(
dictionaries: Seq[String],
encoding: SimpleFeatureEncoding,
ipcOpts: IpcOption,
maxSize: Int = Short.MaxValue
maxSize: Int = Short.MaxValue,
flattenStruct: Boolean = false
) extends Closeable {

import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor
Expand All @@ -63,7 +64,13 @@ class DictionaryBuildingWriter(

private val root = {
val fields = Collections.singletonList[Field](underlying.getField)
new VectorSchemaRoot(fields, Collections.singletonList[FieldVector](underlying), 0)

val potentialRoot = new VectorSchemaRoot(fields, Collections.singletonList[FieldVector](underlying), 0)
if(flattenStruct){
new VectorSchemaRoot(potentialRoot.getVector(sft.getTypeName))
} else {
potentialRoot
}
}

private var index = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.arrow.io

import com.typesafe.scalalogging.LazyLogging
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.{Dictionary, DictionaryProvider}
import org.apache.arrow.vector.ipc.ArrowStreamWriter
import org.apache.arrow.vector.ipc.message.IpcOption
Expand All @@ -35,11 +36,19 @@ class SimpleFeatureArrowFileWriter private (
provider: DictionaryProvider with Closeable,
os: OutputStream,
ipcOpts: IpcOption,
sort: Option[(String, Boolean)]
sort: Option[(String, Boolean)],
flattenStruct: Boolean = false
) extends Closeable with Flushable with LazyLogging {

private val metadata = sort.map { case (field, reverse) => getSortAsMetadata(field, reverse) }.orNull
private val root = createRoot(vector.underlying, metadata)
private val root = {
val potentialRoot = createRoot(vector.underlying, metadata)

if (flattenStruct) {
new VectorSchemaRoot(potentialRoot.getVector(sft.getTypeName))
} else {
potentialRoot
}
}
private val writer = new ArrowStreamWriter(root, provider, Channels.newChannel(os), ipcOpts)

private var index = 0
Expand Down Expand Up @@ -106,7 +115,8 @@ object SimpleFeatureArrowFileWriter {
dictionaries: Map[String, ArrowDictionary],
encoding: SimpleFeatureEncoding,
ipcOpts: IpcOption,
sort: Option[(String, Boolean)]): SimpleFeatureArrowFileWriter = {
sort: Option[(String, Boolean)],
flattenStruct: Boolean = false): SimpleFeatureArrowFileWriter = {
val vector = SimpleFeatureVector.create(sft, dictionaries, encoding)
// convert the dictionary values into arrow vectors
// make sure we load dictionaries before instantiating the stream writer
Expand All @@ -116,7 +126,7 @@ object SimpleFeatureArrowFileWriter {
override def getDictionaryIds: java.util.Set[java.lang.Long] = dictionaries.keys.map(Long.box).toSet.asJava
override def close(): Unit = CloseWithLogging(dictionaries.values)
}
new SimpleFeatureArrowFileWriter(vector, provider, os, ipcOpts, sort)
new SimpleFeatureArrowFileWriter(vector, provider, os, ipcOpts, sort, flattenStruct)
}

// convert the dictionary values into arrow vectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package org.locationtech.geomesa.arrow.io

import org.apache.arrow.vector.ipc.message.IpcOption
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VarCharVector
import org.apache.arrow.vector.ipc.{ArrowFileReader, ArrowStreamReader}
import org.apache.arrow.vector.ipc.message.{ArrowBlock, IpcOption}
import org.geotools.filter.text.ecql.ECQL
import org.junit.runner.RunWith
import org.locationtech.geomesa.arrow.vector.ArrowDictionary
Expand All @@ -22,7 +25,7 @@ import org.specs2.matcher.MatchResult
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

import java.io.{File, FileInputStream, FileOutputStream}
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -102,6 +105,34 @@ class SimpleFeatureArrowFileTest extends Specification {
}
}
}
"write and read values with flatten" >> {
withTestFile("simple") { file =>
WithClose(SimpleFeatureArrowFileWriter(new FileOutputStream(file), sft, Map.empty, SimpleFeatureEncoding.Max, ipcOpts, None, true)) { writer =>
features0.foreach(writer.add)
writer.flush()
features1.foreach(writer.add)
}

var totalRecords = 0
val rootAllocator = new RootAllocator()

val bytes = Files.readAllBytes(file.toPath)
val reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), rootAllocator)

while (reader.loadNextBatch()) {
val root = reader.getVectorSchemaRoot

// Flatten removes the outer StructVector which is the SFT name
root.getFieldVectors.get(0) must haveClass[VarCharVector]
totalRecords += root.getRowCount
}

reader.close()
rootAllocator.close()

totalRecords mustEqual (features0.size + features1.size)
}
}
"optimize queries for sorted files" >> {
withTestFile("sorted") { file =>
WithClose(SimpleFeatureArrowFileWriter(new FileOutputStream(file), sft, Map.empty, SimpleFeatureEncoding.Max, ipcOpts, Some(("dtg", false)))) { writer =>
Expand Down Expand Up @@ -166,6 +197,35 @@ class SimpleFeatureArrowFileTest extends Specification {
}
}
}
"write and read dictionary encoded values with flatten" >> {
val dictionaries = Map("foo:String" -> ArrowDictionary.create(sft.getTypeName, 0, Array("foo0", "foo1", "foo2")))
withTestFile("dictionary") { file =>
WithClose(SimpleFeatureArrowFileWriter(new FileOutputStream(file), sft, dictionaries, SimpleFeatureEncoding.Max, ipcOpts, None, true)) { writer =>
features0.foreach(writer.add)
writer.flush()
features1.foreach(writer.add)
}

var totalRecords = 0
val rootAllocator = new RootAllocator()

val bytes = Files.readAllBytes(file.toPath)
val reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), rootAllocator)

while (reader.loadNextBatch()) {
val root = reader.getVectorSchemaRoot

// Flatten removes the outer StructVector which is the SFT name
root.getFieldVectors.get(0) must haveClass[VarCharVector]
totalRecords += root.getRowCount
}

reader.close()
rootAllocator.close()

totalRecords mustEqual (features0.size + features1.size)
}
}
"write and read dictionary encoded ints" >> {
val dictionaries = Map("age" -> ArrowDictionary.create(sft.getTypeName, 0, Array(0, 1, 2, 3, 4, 5).map(Int.box)))
withTestFile("dictionary-int") { file =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ArrowExporter(out: OutputStream, hints: Hints) extends FeatureExporter {
private lazy val ipc = hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)
private lazy val batchSize = hints.getArrowBatchSize.getOrElse(ArrowProperties.BatchSize.get.toInt)
private lazy val dictionaryFields = hints.getArrowDictionaryFields
private lazy val flattenFields = hints.isArrowFlatten

private var delegate: FeatureExporter = _

Expand All @@ -37,12 +38,12 @@ class ArrowExporter(out: OutputStream, hints: Hints) extends FeatureExporter {
new EncodedDelegate(out)
} else if (dictionaryFields.isEmpty) {
// note: features should be sorted already, even if arrow encoding wasn't performed
new BatchDelegate(out, encoding, FormatVersion.options(ipc), sort, batchSize, Map.empty)
new BatchDelegate(out, encoding, FormatVersion.options(ipc), sort, batchSize, Map.empty, flattenFields)
} else {
if (sort.isDefined) {
throw new NotImplementedError("Sorting and calculating dictionaries at the same time is not supported")
}
new DictionaryDelegate(out, dictionaryFields, encoding, FormatVersion.options(ipc), batchSize)
new DictionaryDelegate(out, dictionaryFields, encoding, FormatVersion.options(ipc), batchSize, flattenFields)
}
delegate.start(sft)
}
Expand Down Expand Up @@ -72,14 +73,15 @@ object ArrowExporter {
dictionaryFields: Seq[String],
encoding: SimpleFeatureEncoding,
ipcOpts: IpcOption,
batchSize: Int
batchSize: Int,
flattenStruct: Boolean = false
) extends FeatureExporter {

private var writer: DictionaryBuildingWriter = _
private var count = 0L

override def start(sft: SimpleFeatureType): Unit = {
writer = new DictionaryBuildingWriter(sft, dictionaryFields, encoding, ipcOpts)
writer = new DictionaryBuildingWriter(sft, dictionaryFields, encoding, ipcOpts, flattenStruct = flattenStruct)
}

override def export(features: Iterator[SimpleFeature]): Option[Long] = {
Expand Down Expand Up @@ -112,14 +114,15 @@ object ArrowExporter {
ipcOpts: IpcOption,
sort: Option[(String, Boolean)],
batchSize: Int,
dictionaries: Map[String, ArrowDictionary]
dictionaries: Map[String, ArrowDictionary],
flattenStruct: Boolean = false
) extends FeatureExporter {

private var writer: SimpleFeatureArrowFileWriter = _
private var count = 0L

override def start(sft: SimpleFeatureType): Unit = {
writer = SimpleFeatureArrowFileWriter(os, sft, dictionaries, encoding, ipcOpts, sort)
writer = SimpleFeatureArrowFileWriter(os, sft, dictionaries, encoding, ipcOpts, sort, flattenStruct = flattenStruct)
}

override def export(features: Iterator[SimpleFeature]): Option[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object QueryHints {
val ARROW_FORMAT_VERSION = new ClassKey(classOf[String])
val ARROW_DICTIONARY_FIELDS = new ClassKey(classOf[java.lang.String])
val ARROW_PROCESS_DELTAS = new ClassKey(classOf[java.lang.Boolean])
val ARROW_FLATTEN_STRUCT = new ClassKey(classOf[java.lang.Boolean])

val LAMBDA_QUERY_PERSISTENT = new ClassKey(classOf[java.lang.Boolean])
val LAMBDA_QUERY_TRANSIENT = new ClassKey(classOf[java.lang.Boolean])
Expand Down Expand Up @@ -141,6 +142,7 @@ object QueryHints {
def getArrowFormatVersion: Option[String] = Option(hints.get(ARROW_FORMAT_VERSION).asInstanceOf[String])
def isArrowProcessDeltas: Boolean =
Option(hints.get(ARROW_PROCESS_DELTAS).asInstanceOf[java.lang.Boolean]).forall(Boolean.unbox)
def isArrowFlatten: Boolean = Option(hints.get(ARROW_FLATTEN_STRUCT).asInstanceOf[java.lang.Boolean]).exists(Boolean.unbox)

def isStatsQuery: Boolean = hints.containsKey(STATS_STRING)
def getStatsQuery: String = hints.get(STATS_STRING).asInstanceOf[String]
Expand Down
Loading

0 comments on commit 7c8e818

Please sign in to comment.