Skip to content

Commit

Permalink
GEOMESA-3409: Arrow: axis order request parameter (#3224)
Browse files Browse the repository at this point in the history
  • Loading branch information
epyatkevich authored Nov 12, 2024
1 parent 6e0908c commit 21d797d
Show file tree
Hide file tree
Showing 19 changed files with 223 additions and 66 deletions.
7 changes: 7 additions & 0 deletions docs/user/datastores/analytic_queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ following query hints:
+-------------------------------------+--------------------+------------------------------------+
| QueryHints.ARROW_FLATTEN_STRUCT | Boolean (optional) | flattenStruct |
+-------------------------------------+--------------------+------------------------------------+
| QueryHints.FLIP_AXIS_ORDER | Boolean (optional) | flipAxisOrder |
+-------------------------------------+--------------------+------------------------------------+

Explanation of Hints
++++++++++++++++++++
Expand Down Expand Up @@ -336,6 +338,11 @@ ARROW_FLATTEN_STRUCT
This hint will remove the outer struct named after the feature type and will instead return the attribute fields directly
in the RecordBatch. Note that this hint is currently only supported for PostGIS and geoserver native stores.

FLIP_AXIS_ORDER
^^^^^^^^^^^^^^^

This hint flips the axis order of returned coordinates from latitude/longitude (default) to longitude/latitude.

Example Query
+++++++++++++

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ object ArrowAttributeWriter {
case ObjectType.LONG => new ArrowLongWriter(name, metadata, factory)
case ObjectType.FLOAT => new ArrowFloatWriter(name, metadata, factory)
case ObjectType.DOUBLE => new ArrowDoubleWriter(name, metadata, factory)
case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding.geometry, metadata, factory)
case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding, metadata, factory)
case ObjectType.BOOLEAN => new ArrowBooleanWriter(name, metadata, factory)
case ObjectType.LIST => new ArrowListWriter(name, bindings(1), encoding, metadata, factory)
case ObjectType.MAP => new ArrowMapWriter(name, bindings(1), bindings(2), encoding, metadata, factory)
Expand Down Expand Up @@ -234,11 +234,11 @@ object ArrowAttributeWriter {
private def geometry(
name: String,
binding: ObjectType,
encoding: Encoding,
encoding: SimpleFeatureEncoding,
metadata: Map[String, String],
factory: VectorFactory): ArrowGeometryWriter = {
val m = metadata.asJava
val vector = (binding, encoding, factory) match {
val vector = (binding, encoding.geometry, factory) match {
case (ObjectType.POINT, Encoding.Min, FromStruct(c)) => new PointFloatVector(name, c, m)
case (ObjectType.POINT, Encoding.Min, FromAllocator(c)) => new PointFloatVector(name, c, m)
case (ObjectType.POINT, Encoding.Max, FromStruct(c)) => new PointVector(name, c, m)
Expand Down Expand Up @@ -269,7 +269,10 @@ object ArrowAttributeWriter {
case (_, _, FromList(_)) => throw new NotImplementedError("Geometry lists are not supported")
case _ => throw new IllegalArgumentException(s"Unexpected geometry type $binding")
}
new ArrowGeometryWriter(name, vector.asInstanceOf[GeometryVector[Geometry, FieldVector]])
val geometryVector = vector.asInstanceOf[GeometryVector[Geometry, FieldVector]]
geometryVector.setFlipAxisOrder(encoding.flipAxisOrder)

new ArrowGeometryWriter(name, geometryVector)
}

trait ArrowDictionaryWriter extends ArrowAttributeWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,16 @@ object SimpleFeatureVector {
val DescriptorKey = "descriptor"
val OptionsKey = "options"

case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding)
case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding, flipAxisOrder: Boolean)

object SimpleFeatureEncoding {

val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min)
val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max)
val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min, flipAxisOrder = false)
val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max, flipAxisOrder = false)

def min(includeFids: Boolean, proxyFids: Boolean = false): SimpleFeatureEncoding = {
def min(includeFids: Boolean, proxyFids: Boolean = false, flipAxisOrder: Boolean = false): SimpleFeatureEncoding = {
val fids = if (includeFids) { Some(if (proxyFids) { Encoding.Min } else { Encoding.Max }) } else { None }
SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min)
SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min, flipAxisOrder)
}

object Encoding extends Enumeration {
Expand Down Expand Up @@ -245,7 +245,7 @@ object SimpleFeatureVector {
val isLong = dateVector.exists(_.isInstanceOf[BigIntVector])
if (isLong) { Encoding.Max } else { Encoding.Min }
}
val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision)
val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision, flipAxisOrder = false)

(sft, encoding)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ public interface GeometryVector<T extends Geometry, V extends FieldVector> exten
int getNullCount();

void transfer(int fromIndex, int toIndex, GeometryVector<T, V> to);

boolean isFlipAxisOrder();
void setFlipAxisOrder(boolean flip);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WKBGeometryVector implements GeometryVector<Geometry, VarBinaryVect
private VarBinaryVector vector;
private WKBWriter writer = null;
private WKBReader reader = null;
private boolean flipAxisOrder = false;

public static final Field field = Field.nullablePrimitive("wkb", ArrowType.Binary.INSTANCE);

Expand Down Expand Up @@ -104,6 +105,16 @@ public void transfer(int fromIndex, int toIndex, GeometryVector<Geometry, VarBin
to.set(toIndex, get(fromIndex));
}

@Override
public boolean isFlipAxisOrder() {
return flipAxisOrder;
}

@Override
public void setFlipAxisOrder(boolean flip) {
flipAxisOrder = flip;
}

@Override
public void close() throws Exception {
vector.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public abstract class AbstractGeometryVector<T extends Geometry, U extends Field

private V ordinal;
protected U vector;
private boolean flipAxisOrder = false;

protected AbstractGeometryVector(U vector) {
this.vector = vector;
Expand Down Expand Up @@ -51,6 +52,16 @@ public int getNullCount() {
return Math.max(count, 0);
}

@Override
public boolean isFlipAxisOrder() {
return flipAxisOrder;
}

@Override
public void setFlipAxisOrder(boolean flip) {
flipAxisOrder = flip;
}

@Override
public void close() throws Exception {
vector.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ public void set(int index, LineString geom) {
for (int i = 0; i < geom.getNumPoints(); i++) {
final Coordinate p = geom.getCoordinateN(i);
tuples.setNotNull(position + i);
writeOrdinal((position + i) * 2, p.y);
writeOrdinal((position + i) * 2 + 1, p.x);
if (isFlipAxisOrder()) {
writeOrdinal((position + i) * 2, p.x);
writeOrdinal((position + i) * 2 + 1, p.y);
} else {
writeOrdinal((position + i) * 2, p.y);
writeOrdinal((position + i) * 2 + 1, p.x);
}
}
vector.endValue(index, geom.getNumPoints());
}
Expand All @@ -81,8 +86,14 @@ public LineString get(int index) {
final int offsetEnd = vector.getOffsetBuffer().getInt((index + 1) * ListVector.OFFSET_WIDTH);
final Coordinate[] coordinates = new Coordinate[offsetEnd - offsetStart];
for (int i = 0; i < coordinates.length; i++) {
final double y = readOrdinal((offsetStart + i) * 2);
final double x = readOrdinal((offsetStart + i) * 2 + 1);
final double y, x;
if (isFlipAxisOrder()) {
y = readOrdinal((offsetStart + i) * 2 + 1);
x = readOrdinal((offsetStart + i) * 2);
} else {
y = readOrdinal((offsetStart + i) * 2);
x = readOrdinal((offsetStart + i) * 2 + 1);
}
coordinates[i] = new Coordinate(x, y);
}
return factory.createLineString(coordinates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ public void set(int index, MultiLineString geom) {
for (int j = 0; j < line.getNumPoints(); j++) {
final Coordinate p = line.getCoordinateN(j);
tuples.setNotNull(position + j);
writeOrdinal((position + j) * 2, p.y);
writeOrdinal((position + j) * 2 + 1, p.x);
if (isFlipAxisOrder()) {
writeOrdinal((position + j) * 2, p.x);
writeOrdinal((position + j) * 2 + 1, p.y);
} else {
writeOrdinal((position + j) * 2, p.y);
writeOrdinal((position + j) * 2 + 1, p.x);
}
}
innerVector.endValue(innerIndex + i, line.getNumPoints());
}
Expand All @@ -94,8 +99,14 @@ public MultiLineString get(int index) {
final int offsetEnd = innerVector.getOffsetBuffer().getInt((outerOffsetStart + j + 1) * ListVector.OFFSET_WIDTH);
final Coordinate[] coordinates = new Coordinate[offsetEnd - offsetStart];
for (int i = 0; i < coordinates.length; i++) {
final double y = readOrdinal((offsetStart + i) * 2);
final double x = readOrdinal((offsetStart + i) * 2 + 1);
final double y, x;
if (isFlipAxisOrder()) {
y = readOrdinal((offsetStart + i) * 2 + 1);
x = readOrdinal((offsetStart + i) * 2);
} else {
y = readOrdinal((offsetStart + i) * 2);
x = readOrdinal((offsetStart + i) * 2 + 1);
}
coordinates[i] = new Coordinate(x, y);
}
lines[j] = factory.createLineString(coordinates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ public void set(int index, MultiPoint geom) {
for (int i = 0; i < geom.getNumPoints(); i++) {
final Point p = (Point) geom.getGeometryN(i);
tuples.setNotNull(position + i);
writeOrdinal((position + i) * 2, p.getY());
writeOrdinal((position + i) * 2 + 1, p.getX());
if (isFlipAxisOrder()) {
writeOrdinal((position + i) * 2, p.getX());
writeOrdinal((position + i) * 2 + 1, p.getY());
} else {
writeOrdinal((position + i) * 2, p.getY());
writeOrdinal((position + i) * 2 + 1, p.getX());
}
}
vector.endValue(index, geom.getNumPoints());
}
Expand All @@ -82,8 +87,14 @@ public MultiPoint get(int index) {
final int offsetEnd = vector.getOffsetBuffer().getInt((index + 1) * ListVector.OFFSET_WIDTH);
final Coordinate[] coordinates = new Coordinate[offsetEnd - offsetStart];
for (int i = 0; i < coordinates.length; i++) {
final double y = readOrdinal((offsetStart + i) * 2);
final double x = readOrdinal((offsetStart + i) * 2 + 1);
final double y, x;
if (isFlipAxisOrder()) {
y = readOrdinal((offsetStart + i) * 2 + 1);
x = readOrdinal((offsetStart + i) * 2);
} else {
y = readOrdinal((offsetStart + i) * 2);
x = readOrdinal((offsetStart + i) * 2 + 1);
}
coordinates[i] = new Coordinate(x, y);
}
return factory.createMultiPointFromCoords(coordinates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,13 @@ public void set(int index, MultiPolygon geom) {
for (int k = 0; k < line.getNumPoints(); k++) {
Coordinate p = line.getCoordinateN(k);
tuples.setNotNull(position + k);
writeOrdinal((position + k) * 2, p.y);
writeOrdinal((position + k) * 2 + 1, p.x);
if (isFlipAxisOrder()) {
writeOrdinal((position + k) * 2, p.x);
writeOrdinal((position + k) * 2 + 1, p.y);
} else {
writeOrdinal((position + k) * 2, p.y);
writeOrdinal((position + k) * 2 + 1, p.x);
}
}
innerInnerVector.endValue(innerInnerIndex + j, line.getNumPoints());
}
Expand Down Expand Up @@ -109,8 +114,14 @@ public MultiPolygon get(int index) {
final int offsetEnd = innerInnerVector.getOffsetBuffer().getInt((outerOffsetStart + j + 1) * ListVector.OFFSET_WIDTH);
final Coordinate[] coordinates = new Coordinate[offsetEnd - offsetStart];
for (int i = 0; i < coordinates.length; i++) {
final double y = readOrdinal((offsetStart + i) * 2);
final double x = readOrdinal((offsetStart + i) * 2 + 1);
final double y, x;
if (isFlipAxisOrder()) {
y = readOrdinal((offsetStart + i) * 2 + 1);
x = readOrdinal((offsetStart + i) * 2);
} else {
y = readOrdinal((offsetStart + i) * 2);
x = readOrdinal((offsetStart + i) * 2 + 1);
}
coordinates[i] = new Coordinate(x, y);
}
final LinearRing ring = factory.createLinearRing(coordinates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ public void set(int index, Point geom) {
vector.setNull(index);
} else {
vector.setNotNull(index);
writeOrdinal(index * 2, geom.getY());
writeOrdinal(index * 2 + 1, geom.getX());
if (isFlipAxisOrder()) {
writeOrdinal(index * 2, geom.getX());
writeOrdinal(index * 2 + 1, geom.getY());
} else {
writeOrdinal(index * 2, geom.getY());
writeOrdinal(index * 2 + 1, geom.getX());
}
}
}

Expand All @@ -63,8 +68,14 @@ public Point get(int index) {
if (vector.isNull(index)) {
return null;
} else {
final double y = readOrdinal(index * 2);
final double x = readOrdinal(index * 2 + 1);
final double y, x;
if (isFlipAxisOrder()) {
y = readOrdinal(index * 2 + 1);
x = readOrdinal(index * 2);
} else {
y = readOrdinal(index * 2);
x = readOrdinal(index * 2 + 1);
}
return factory.createPoint(new Coordinate(x, y));
}
}
Expand All @@ -76,8 +87,13 @@ public void transfer(int fromIndex, int toIndex, GeometryVector<Point, FixedSize
((FixedSizeListVector) typed.vector).setNull(toIndex);
} else {
((FixedSizeListVector) typed.vector).setNotNull(toIndex);
typed.writeOrdinal(toIndex * 2, readOrdinal(fromIndex * 2));
typed.writeOrdinal(toIndex * 2 + 1, readOrdinal(fromIndex * 2 + 1));
if (isFlipAxisOrder() != typed.isFlipAxisOrder()) {
typed.writeOrdinal(toIndex * 2, readOrdinal(fromIndex * 2 + 1));
typed.writeOrdinal(toIndex * 2 + 1, readOrdinal(fromIndex * 2));
} else {
typed.writeOrdinal(toIndex * 2, readOrdinal(fromIndex * 2));
typed.writeOrdinal(toIndex * 2 + 1, readOrdinal(fromIndex * 2 + 1));
}
}
}

Expand All @@ -89,7 +105,6 @@ public void transfer(int fromIndex, int toIndex, GeometryVector<Point, FixedSize
*/
public double getCoordinateY(int index) {
return readOrdinal(index * 2);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ public void set(int index, Polygon geom) {
for (int j = 0; j < line.getNumPoints(); j++) {
final Coordinate p = line.getCoordinateN(j);
tuples.setNotNull(position + j);
writeOrdinal((position + j) * 2, p.y);
writeOrdinal((position + j) * 2 + 1, p.x);
if (isFlipAxisOrder()) {
writeOrdinal((position + j) * 2, p.x);
writeOrdinal((position + j) * 2 + 1, p.y);
} else {
writeOrdinal((position + j) * 2, p.y);
writeOrdinal((position + j) * 2 + 1, p.x);
}
}
innerVector.endValue(innerIndex + i, line.getNumPoints());
}
Expand All @@ -96,8 +101,14 @@ public Polygon get(int index) {
final int offsetEnd = innerVector.getOffsetBuffer().getInt((outerOffsetStart + j + 1) * ListVector.OFFSET_WIDTH);
final Coordinate[] coordinates = new Coordinate[offsetEnd - offsetStart];
for (int i = 0; i < coordinates.length; i++) {
final double y = readOrdinal((offsetStart + i) * 2);
final double x = readOrdinal((offsetStart + i) * 2 + 1);
final double y, x;
if (isFlipAxisOrder()) {
y = readOrdinal((offsetStart + i) * 2 + 1);
x = readOrdinal((offsetStart + i) * 2);
} else {
y = readOrdinal((offsetStart + i) * 2);
x = readOrdinal((offsetStart + i) * 2 + 1);
}
coordinates[i] = new Coordinate(x, y);
}
final LinearRing ring = factory.createLinearRing(coordinates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ArrowExporter(out: OutputStream, hints: Hints) extends FeatureExporter {
import org.locationtech.geomesa.index.conf.QueryHints.RichHints

private lazy val sort = hints.getArrowSort
private lazy val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid)
private lazy val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid, hints.isFlipAxisOrder)
private lazy val ipc = hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)
private lazy val batchSize = hints.getArrowBatchSize.getOrElse(ArrowProperties.BatchSize.get.toInt)
private lazy val dictionaryFields = hints.getArrowDictionaryFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ object QueryHints {

val FILTER_COMPAT = new ClassKey(classOf[java.lang.String])

val FLIP_AXIS_ORDER = new ClassKey(classOf[java.lang.Boolean])

def sortReadableString(sort: Seq[(String, Boolean)]): String =
sort.map { case (f, r) => s"$f ${if (r) "DESC" else "ASC" }"}.mkString(", ")

Expand Down Expand Up @@ -178,5 +180,8 @@ object QueryHints {
}
}
}

def isFlipAxisOrder: Boolean =
Option(hints.get(FLIP_AXIS_ORDER).asInstanceOf[java.lang.Boolean]).exists(Boolean.unbox)
}
}
Loading

0 comments on commit 21d797d

Please sign in to comment.