diff --git a/docs/user/datastores/analytic_queries.rst b/docs/user/datastores/analytic_queries.rst index a68bbf21b24a..8b755661b7f3 100644 --- a/docs/user/datastores/analytic_queries.rst +++ b/docs/user/datastores/analytic_queries.rst @@ -273,6 +273,8 @@ following query hints: +-------------------------------------+--------------------+------------------------------------+ | QueryHints.ARROW_FLATTEN_STRUCT | Boolean (optional) | flattenStruct | +-------------------------------------+--------------------+------------------------------------+ +| QueryHints.FLIP_AXIS_ORDER | Boolean (optional) | flipAxisOrder | ++-------------------------------------+--------------------+------------------------------------+ Explanation of Hints ++++++++++++++++++++ @@ -336,6 +338,11 @@ ARROW_FLATTEN_STRUCT This hint will remove the outer struct named after the feature type and will instead return the attribute fields directly in the RecordBatch. Note that this hint is currently only supported for PostGIS and geoserver native stores. +FLIP_AXIS_ORDER +^^^^^^^^^^^^^^^ + +This hint flips the axis order of returned coordinates from latitude/longitude (default) to longitude/latitude. + Example Query +++++++++++++ diff --git a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala index ace45e7a5ca5..8fad59d91d24 100644 --- a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala +++ b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala @@ -192,7 +192,7 @@ object ArrowAttributeWriter { case ObjectType.LONG => new ArrowLongWriter(name, metadata, factory) case ObjectType.FLOAT => new ArrowFloatWriter(name, metadata, factory) case ObjectType.DOUBLE => new ArrowDoubleWriter(name, metadata, factory) - case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding.geometry, metadata, factory) + case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding, metadata, factory) case ObjectType.BOOLEAN => new ArrowBooleanWriter(name, metadata, factory) case ObjectType.LIST => new ArrowListWriter(name, bindings(1), encoding, metadata, factory) case ObjectType.MAP => new ArrowMapWriter(name, bindings(1), bindings(2), encoding, metadata, factory) @@ -234,11 +234,11 @@ object ArrowAttributeWriter { private def geometry( name: String, binding: ObjectType, - encoding: Encoding, + encoding: SimpleFeatureEncoding, metadata: Map[String, String], factory: VectorFactory): ArrowGeometryWriter = { val m = metadata.asJava - val vector = (binding, encoding, factory) match { + val vector = (binding, encoding.geometry, factory) match { case (ObjectType.POINT, Encoding.Min, FromStruct(c)) => new PointFloatVector(name, c, m) case (ObjectType.POINT, Encoding.Min, FromAllocator(c)) => new PointFloatVector(name, c, m) case (ObjectType.POINT, Encoding.Max, FromStruct(c)) => new PointVector(name, c, m) @@ -269,7 +269,10 @@ object ArrowAttributeWriter { case (_, _, FromList(_)) => throw new NotImplementedError("Geometry lists are not supported") case _ => throw new IllegalArgumentException(s"Unexpected geometry type $binding") } - new ArrowGeometryWriter(name, vector.asInstanceOf[GeometryVector[Geometry, FieldVector]]) + val geometryVector = vector.asInstanceOf[GeometryVector[Geometry, FieldVector]] + geometryVector.setFlipAxisOrder(encoding.flipAxisOrder) + + new ArrowGeometryWriter(name, geometryVector) } trait ArrowDictionaryWriter extends ArrowAttributeWriter { diff --git a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala index 688fe61fba02..fe902368b2cb 100644 --- a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala +++ b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala @@ -132,16 +132,16 @@ object SimpleFeatureVector { val DescriptorKey = "descriptor" val OptionsKey = "options" - case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding) + case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding, flipAxisOrder: Boolean) object SimpleFeatureEncoding { - val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min) - val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max) + val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min, flipAxisOrder = false) + val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max, flipAxisOrder = false) - def min(includeFids: Boolean, proxyFids: Boolean = false): SimpleFeatureEncoding = { + def min(includeFids: Boolean, proxyFids: Boolean = false, flipAxisOrder: Boolean = false): SimpleFeatureEncoding = { val fids = if (includeFids) { Some(if (proxyFids) { Encoding.Min } else { Encoding.Max }) } else { None } - SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min) + SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min, flipAxisOrder) } object Encoding extends Enumeration { @@ -245,7 +245,7 @@ object SimpleFeatureVector { val isLong = dateVector.exists(_.isInstanceOf[BigIntVector]) if (isLong) { Encoding.Max } else { Encoding.Min } } - val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision) + val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision, flipAxisOrder = false) (sft, encoding) } diff --git a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java index 42bdd86dc47e..c187fc1024a4 100644 --- a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java +++ b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java @@ -32,4 +32,7 @@ public interface GeometryVector exten int getNullCount(); void transfer(int fromIndex, int toIndex, GeometryVector to); + + boolean isFlipAxisOrder(); + void setFlipAxisOrder(boolean flip); } diff --git a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java index 33a6cb731553..4068d1609463 100644 --- a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java +++ b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java @@ -29,6 +29,7 @@ public class WKBGeometryVector implements GeometryVector s"$f ${if (r) "DESC" else "ASC" }"}.mkString(", ") @@ -178,5 +180,8 @@ object QueryHints { } } } + + def isFlipAxisOrder: Boolean = + Option(hints.get(FLIP_AXIS_ORDER).asInstanceOf[java.lang.Boolean]).exists(Boolean.unbox) } } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala index 68783c578184..f9eda67c9d08 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala @@ -43,9 +43,10 @@ trait ArrowScan extends AggregatingScan[ArrowScan.ArrowAggregate] { val arrowSft = transform.getOrElse(sft) val includeFids = options(IncludeFidsKey).toBoolean val proxyFids = options.get(ProxyFidsKey).exists(_.toBoolean) + val flipAxisOrder = options.get(FlipAxisOrderKey).exists(_.toBoolean) + val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids, flipAxisOrder) val dictionary = options(DictionaryKey) val sort = options.get(SortKey).map(name => (name, options.get(SortReverseKey).exists(_.toBoolean))) - val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids) val ipcOpts = FormatVersion.options(options(IpcVersionKey)) val dictionaries = dictionary.split(",").filterNot(_.isEmpty) new DeltaAggregate(arrowSft, dictionaries, encoding, ipcOpts, sort, batchSize) @@ -60,12 +61,13 @@ object ArrowScan extends LazyLogging { object Configuration { - val IncludeFidsKey = "fids" - val ProxyFidsKey = "proxy" - val DictionaryKey = "dict" - val IpcVersionKey = "ipc" - val SortKey = "sort" - val SortReverseKey = "sort-rev" + val IncludeFidsKey = "fids" + val ProxyFidsKey = "proxy" + val FlipAxisOrderKey = "flip-axis-order" + val DictionaryKey = "dict" + val IpcVersionKey = "ipc" + val SortKey = "sort" + val SortReverseKey = "sort-rev" } /** @@ -91,9 +93,10 @@ object ArrowScan extends LazyLogging { val arrowSft = hints.getTransformSchema.getOrElse(sft) val includeFids = hints.isArrowIncludeFid val proxyFids = hints.isArrowProxyFid + val flipAxisOrder = hints.isFlipAxisOrder + val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids, flipAxisOrder) val sort = hints.getArrowSort val batchSize = getBatchSize(hints) - val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids) val ipc = hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get) val ipcOpts = FormatVersion.options(ipc) val dictionaryFields = hints.getArrowDictionaryFields @@ -101,12 +104,13 @@ object ArrowScan extends LazyLogging { val config = { val base = AggregatingScan.configure(sft, index, ecql, hints.getTransform, hints.getSampling, batchSize) base ++ AggregatingScan.optionalMap( - IncludeFidsKey -> includeFids.toString, - ProxyFidsKey -> proxyFids.toString, - IpcVersionKey -> ipc, - SortKey -> sort.map(_._1), - SortReverseKey -> sort.map(_._2.toString), - DictionaryKey -> dictionaryFields.mkString(",") + IncludeFidsKey -> includeFids.toString, + ProxyFidsKey -> proxyFids.toString, + FlipAxisOrderKey -> flipAxisOrder.toString, + IpcVersionKey -> ipc, + SortKey -> sort.map(_._1), + SortReverseKey -> sort.map(_._2.toString), + DictionaryKey -> dictionaryFields.mkString(",") ) } @@ -223,6 +227,7 @@ object ArrowScan extends LazyLogging { ReducerConfig.sftSpec(sft), ReducerConfig.DictionariesKey -> StringSerialization.encodeSeq(dictionaryFields), ReducerConfig.encoding(encoding), + ReducerConfig.flipAxisOrder(encoding), ReducerConfig.ipcOption(ipcOpts), ReducerConfig.batch(batchSize), ReducerConfig.sort(sort), @@ -254,15 +259,16 @@ object ArrowScan extends LazyLogging { object ReducerConfig { - val SftKey = "sft" - val SpecKey = "spec" - val DictionariesKey = "dicts" - val EncodingKey = "enc" - val IpcKey = "ipc" - val BatchKey = "batch" - val SortKey = "sort" - val SortedKey = "sorted" - val ProcessKey = "process" + val SftKey = "sft" + val SpecKey = "spec" + val DictionariesKey = "dicts" + val EncodingKey = "enc" + val FlipAxisOrderKey = "flip-axis-order" + val IpcKey = "ipc" + val BatchKey = "batch" + val SortKey = "sort" + val SortedKey = "sorted" + val ProcessKey = "process" def sftName(sft: SimpleFeatureType): (String, String) = SftKey -> sft.getTypeName def sftSpec(sft: SimpleFeatureType): (String, String) = @@ -274,10 +280,14 @@ object ArrowScan extends LazyLogging { def encoding(e: SimpleFeatureEncoding): (String, String) = EncodingKey -> s"${e.fids.getOrElse("")}:${e.geometry}:${e.date}" + def flipAxisOrder(e: SimpleFeatureEncoding): (String, String) = + FlipAxisOrderKey -> s"${e.flipAxisOrder}" + def encoding(options: Map[String, String]): SimpleFeatureEncoding = { val Array(fids, geom, dtg) = options(EncodingKey).split(":") val fidOpt = Option(fids).filterNot(_.isEmpty).map(Encoding.withName) - SimpleFeatureEncoding(fidOpt, Encoding.withName(geom), Encoding.withName(dtg)) + val flipAxisOrder = options.get(FlipAxisOrderKey).exists(_.toBoolean) + SimpleFeatureEncoding(fidOpt, Encoding.withName(geom), Encoding.withName(dtg), flipAxisOrder) } def ipcOption(options: Map[String, String]): IpcOption = FormatVersion.options(options(IpcKey)) diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala index 5aee8795ee4c..a4cbd2d56c4c 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala @@ -264,7 +264,7 @@ object LocalQueryRunner extends LazyLogging { val sort = hints.getArrowSort.map(Seq.fill(1)(_)) val batchSize = ArrowScan.getBatchSize(hints) - val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid) + val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid, hints.isFlipAxisOrder) val ipcOpts = FormatVersion.options(hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)) val (features, arrowSft) = transform match { diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala index 6369644ddf7f..0beee7ea940e 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala @@ -142,7 +142,7 @@ class MergedQueryRunner( val arrowSft = QueryPlanner.extractQueryTransforms(sft, query).map(_._1).getOrElse(sft) val sort = hints.getArrowSort val batchSize = ArrowScan.getBatchSize(hints) - val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid) + val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid, hints.isFlipAxisOrder) val ipcOpts = FormatVersion.options(hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)) val dictionaryFields = hints.getArrowDictionaryFields diff --git a/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala b/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala index cc3ed07f1e33..7c945e6ff030 100644 --- a/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala +++ b/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala @@ -67,7 +67,9 @@ class ArrowConversionProcess extends GeoMesaProcess with LazyLogging { @DescribeParameter(name = "batchSize", description = "Number of features to include in each record batch", min = 0) batchSize: java.lang.Integer, @DescribeParameter(name = "flattenStruct", description = "Removes the outer SFT struct yielding top level feature access", min = 0) - flattenStruct: java.lang.Boolean + flattenStruct: java.lang.Boolean, + @DescribeParameter(name = "flipAxisOrder", description = "Flip the axis order of returned coordinates from latitude/longitude (default) to longitude/latitude", min = 0, defaultValue = "false") + flipAxisOrder: java.lang.Boolean = false ): java.util.Iterator[Array[Byte]] = { import scala.collection.JavaConverters._ @@ -84,7 +86,7 @@ class ArrowConversionProcess extends GeoMesaProcess with LazyLogging { } } - val encoding = SimpleFeatureEncoding.min(includeFids == null || includeFids, proxyFids != null && proxyFids) + val encoding = SimpleFeatureEncoding.min(includeFids == null || includeFids, proxyFids != null && proxyFids, Option(flipAxisOrder).exists(_.booleanValue)) val ipcVersion = Option(formatVersion).getOrElse(FormatVersion.ArrowFormatVersion.get) val reverse = Option(sortReverse).map(_.booleanValue()) val batch = Option(batchSize).map(_.intValue).getOrElse(ArrowProperties.BatchSize.get.toInt) @@ -152,6 +154,7 @@ object ArrowConversionProcess { query.getHints.put(QueryHints.ARROW_BATCH_SIZE, batchSize) query.getHints.put(QueryHints.ARROW_FORMAT_VERSION, ipcVersion) query.getHints.put(QueryHints.ARROW_FLATTEN_STRUCT, flattenStruct) + query.getHints.put(QueryHints.FLIP_AXIS_ORDER, encoding.flipAxisOrder) sortField.foreach(query.getHints.put(QueryHints.ARROW_SORT_FIELD, _)) sortReverse.foreach(query.getHints.put(QueryHints.ARROW_SORT_REVERSE, _)) diff --git a/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala b/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala index 28f7190507cb..8c32cf834ff2 100644 --- a/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala +++ b/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala @@ -17,6 +17,7 @@ import org.junit.runner.RunWith import org.locationtech.geomesa.arrow.ArrowAllocator import org.locationtech.geomesa.arrow.io.SimpleFeatureArrowFileReader import org.locationtech.geomesa.features.ScalaSimpleFeature +import org.locationtech.geomesa.utils.bin.AxisOrder import org.locationtech.geomesa.utils.collection.SelfClosingIterator import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes import org.locationtech.geomesa.utils.io.WithClose @@ -46,6 +47,15 @@ class ArrowConversionProcessTest extends Specification { val collection = new ListFeatureCollection(sft, new Random(-1L).shuffle(features.asInstanceOf[Seq[SimpleFeature]]).asJava) + private val (featuresLatLon: Seq[ScalaSimpleFeature], featuresLonLat: Seq[ScalaSimpleFeature]) = + (0 until 10).map { i => + (ScalaSimpleFeature.create(sft, s"0$i", s"name${i % 2}", s"2017-02-20T00:00:0$i.000Z", s"POINT(45 ${55 + i})"), + ScalaSimpleFeature.create(sft, s"0$i", s"name${i % 2}", s"2017-02-20T00:00:0$i.000Z", s"POINT(${55 + i} 45)")) + }.unzip + + val collectionLatLon = new ListFeatureCollection(sft, new Random(-1L).shuffle(featuresLatLon.asInstanceOf[Seq[SimpleFeature]]).asJava) + val collectionLonLat = new ListFeatureCollection(sft, new Random(-1L).shuffle(featuresLonLat.asInstanceOf[Seq[SimpleFeature]]).asJava) + "ArrowConversionProcess" should { "encode an empty feature collection" in { val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) @@ -147,6 +157,30 @@ class ArrowConversionProcessTest extends Specification { reader.dictionaries.get("name") must beSome } } + + "encode generic feature collection with axis order: default (Lat/Lon)" in { + val bytes = process.execute(collectionLatLon, null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) + WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => + reader.sft mustEqual sft + SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must containTheSameElementsAs(featuresLatLon) + } + } + + "encode generic feature collection with axis order: Lat/Lon" in { + val bytes = process.execute(collectionLatLon, null, null, null, null, null, null, null, null, false).asScala.reduce(_ ++ _) + WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => + reader.sft mustEqual sft + SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must containTheSameElementsAs(featuresLatLon) + } + } + + "encode generic feature collection with axis order: Lon/Lat" in { + val bytes = process.execute(collectionLatLon, null, null, null, null, null, null, null, null, true).asScala.reduce(_ ++ _) + WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => + reader.sft mustEqual sft + SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must containTheSameElementsAs(featuresLonLat) + } + } } step {