diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala index 9e3541268a01..734694200cf3 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloIndexAdapter.scala @@ -239,11 +239,12 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD val indexIter = if (index.name == Z3Index.name) { strategy.values.toSeq.map { case v: Z3IndexValues => val offset = index.keySpace.sharding.length + index.keySpace.sharing.length - Z3Iterator.configure(v, offset, hints.getFilterCompatibility, ZIterPriority) + Z3Iterator.configure(v, offset, hints, ZIterPriority) } } else if (index.name == Z2Index.name) { strategy.values.toSeq.map { case v: Z2IndexValues => - Z2Iterator.configure(v, index.keySpace.sharding.length + index.keySpace.sharing.length, ZIterPriority) + val offset = index.keySpace.sharding.length + index.keySpace.sharing.length + Z2Iterator.configure(v, offset, hints, ZIterPriority) } } else if (index.name == S3Index.name) { strategy.values.toSeq.map { case v: S3IndexValues => diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z2Iterator.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z2Iterator.scala index f2517b90c52e..13b8bf677954 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z2Iterator.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z2Iterator.scala @@ -9,16 +9,22 @@ package org.locationtech.geomesa.accumulo.iterators import org.apache.accumulo.core.client.IteratorSetting +import org.geotools.util.factory.Hints import org.locationtech.geomesa.index.filters.Z2Filter import org.locationtech.geomesa.index.index.z2.Z2IndexValues class Z2Iterator extends RowFilterIterator[Z2Filter](Z2Filter) object Z2Iterator { - def configure(values: Z2IndexValues, offset: Int, priority: Int): IteratorSetting = { + + @deprecated + def configure(values: Z2IndexValues, offset: Int, priority: Int): IteratorSetting = + configure(values, offset, new Hints(), priority) + + def configure(values: Z2IndexValues, offset: Int, hints: Hints, priority: Int): IteratorSetting = { val is = new IteratorSetting(priority, "z2", classOf[Z2Iterator]) // index space values for comparing in the iterator - Z2Filter.serializeToStrings(Z2Filter(values)).foreach { case (k, v) => is.addOption(k, v) } + Z2Filter.serializeToStrings(Z2Filter(values, hints)).foreach { case (k, v) => is.addOption(k, v) } // account for shard and table sharing bytes is.addOption(RowFilterIterator.RowOffsetKey, offset.toString) is diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z3Iterator.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z3Iterator.scala index ab68b809dde2..664379c00d65 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z3Iterator.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/iterators/Z3Iterator.scala @@ -9,7 +9,8 @@ package org.locationtech.geomesa.accumulo.iterators import org.apache.accumulo.core.client.IteratorSetting -import org.locationtech.geomesa.index.conf.FilterCompatibility +import org.geotools.util.factory.Hints +import org.locationtech.geomesa.index.conf.{FilterCompatibility, QueryHints} import org.locationtech.geomesa.index.conf.FilterCompatibility.FilterCompatibility import org.locationtech.geomesa.index.filters.Z3Filter import org.locationtech.geomesa.index.index.z3.Z3IndexValues @@ -18,24 +19,37 @@ class Z3Iterator extends RowFilterIterator[Z3Filter](Z3Filter) object Z3Iterator { + import org.locationtech.geomesa.index.conf.QueryHints.RichHints + + @deprecated + def configure( + values: Z3IndexValues, + offset: Int, + compatibility: Option[FilterCompatibility], + priority: Int): IteratorSetting = { + val hints = new Hints() + compatibility.foreach(c => hints.put(QueryHints.FILTER_COMPAT, c.toString)) + configure(values, offset, hints, priority) + } + /** * Configure the iterator * * @param values index values * @param offset offset for z-value in each row - * @param compatibility compatibility mode + * @param hints query hints * @param priority iterator priority * @return */ def configure( values: Z3IndexValues, offset: Int, - compatibility: Option[FilterCompatibility], + hints: Hints, priority: Int): IteratorSetting = { - val opts = compatibility match { + val opts = hints.getFilterCompatibility match { case None => - Z3Filter.serializeToStrings(Z3Filter(values)) + (RowFilterIterator.RowOffsetKey -> offset.toString) + Z3Filter.serializeToStrings(Z3Filter(values, hints)) + (RowFilterIterator.RowOffsetKey -> offset.toString) case Some(FilterCompatibility.`1.3`) => val Z3IndexValues(sfc, _, bounds, _, times, _) = values diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/Z3IteratorTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/Z3IteratorTest.scala index 7032843b7be9..90e5dbd39f01 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/Z3IteratorTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/iterators/Z3IteratorTest.scala @@ -12,9 +12,10 @@ import org.apache.accumulo.core.data.{ByteSequence, Key, Range, Value} import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator} import org.apache.hadoop.io.Text import org.geotools.filter.text.ecql.ECQL +import org.geotools.util.factory.Hints import org.junit.runner.RunWith import org.locationtech.geomesa.index.api.ShardStrategy.NoShardStrategy -import org.locationtech.geomesa.index.conf.FilterCompatibility +import org.locationtech.geomesa.index.conf.{FilterCompatibility, QueryHints} import org.locationtech.geomesa.index.index.z3.Z3IndexKeySpace import org.locationtech.geomesa.index.index.z3.legacy.Z3IndexV4.Z3IndexKeySpaceV4 import org.locationtech.geomesa.index.utils.ExplainNull @@ -57,7 +58,7 @@ class Z3IteratorTest extends Specification { } val iter = new Z3Iterator - iter.init(srcIter, Z3Iterator.configure(indexValues, 0, None, 25).getOptions, null) + iter.init(srcIter, Z3Iterator.configure(indexValues, 0, new Hints(), 25).getOptions, null) iter.seek(new org.apache.accumulo.core.data.Range(new Key(new Text(k)), null), null, inclusive = false) iter } @@ -101,7 +102,9 @@ class Z3IteratorTest extends Specification { val keySpace = new Z3IndexKeySpaceV4(sft, Array.empty, NoShardStrategy, "geom", "dtg") val filter = "bbox(geom,0,-70,50,-50) and dtg during 2015-06-06T00:00:00.000Z/2015-06-08T00:00:00.000Z" val values = keySpace.getIndexValues(ECQL.toFilter(filter), ExplainNull) - val config = Z3Iterator.configure(values, 2, Some(FilterCompatibility.`1.3`), 23) + val hints = new Hints() + hints.put(QueryHints.FILTER_COMPAT, "1.3") + val config = Z3Iterator.configure(values, 2, hints, 23) config.getIteratorClass mustEqual classOf[Z3Iterator].getName config.getPriority mustEqual 23 // expected values taken from a 1.3 install diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z2Filter.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z2Filter.scala index 75b14ad59a95..324998463da8 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z2Filter.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z2Filter.scala @@ -8,30 +8,68 @@ package org.locationtech.geomesa.index.filters +import com.typesafe.scalalogging.StrictLogging import org.geotools.util.factory.Hints -import org.locationtech.geomesa.index.filters.RowFilter.RowFilterFactory -import org.locationtech.geomesa.index.filters.Z3Filter._ +import org.locationtech.geomesa.index.filters.RowFilter.{FilterResult, RowFilterFactory} import org.locationtech.geomesa.index.index.z2.Z2IndexValues import org.locationtech.geomesa.utils.index.ByteArrays import org.locationtech.sfcurve.zorder.Z2 import java.nio.ByteBuffer -class Z2Filter(val xy: Array[Array[Int]]) extends RowFilter { +class Z2Filter(val xy: Array[Array[Int]], val seek: Boolean) extends RowFilter with StrictLogging { - override def inBounds(buf: Array[Byte], offset: Int): Boolean = { - val z = ByteArrays.readLong(buf, offset) + private val zBounds: Array[Array[Long]] = Z2Filter.zBounds(xy) + + override def filter(row: Array[Byte], offset: Int): FilterResult = { + val z = ByteArrays.readLong(row, offset) val x = Z2(z).d0 val y = Z2(z).d1 var i = 0 while (i < xy.length) { val xyi = xy(i) if (x >= xyi(0) && x <= xyi(2) && y >= xyi(1) && y <= xyi(3)) { - return true + return FilterResult.InBounds } i += 1 } - false + + if (seek) { + nextJumpIn(z) + } else { + FilterResult.OutOfBounds + } + } + + // noinspection ScalaDeprecation + override def inBounds(row: Array[Byte], offset: Int): Boolean = { + filter(row, offset) match { + case FilterResult.InBounds => true + case _ => false + } + } + + private def nextJumpIn(z: Long): FilterResult = { + var nextZ = Long.MaxValue + + var i = 0 + while (i < zBounds.length) { + val Array(zmin, zmax) = zBounds(i) + if (z < zmin) { + if (zmin < nextZ) { + nextZ = zmin + } + } else if (z < zmax) { + val next = Z2.zdivide(z, zmin, zmax)._2 + if (next < nextZ) { + nextZ = next + } + } + i += 1 + } + + logger.trace(s"Seeking ahead from $z to $nextZ") + FilterResult.SkipAhead(ByteArrays.toBytes(nextZ)) } override def toString: String = Z2Filter.serializeToStrings(this).toSeq.sortBy(_._1).mkString(",") @@ -41,6 +79,8 @@ object Z2Filter extends RowFilterFactory[Z2Filter] { import org.locationtech.geomesa.index.conf.QueryHints.RichHints + private val ZBoundsOrdering = Ordering.by[Array[Long], Long](_.head) + private val RangeSeparator = ":" private val TermSeparator = ";" @@ -54,16 +94,17 @@ object Z2Filter extends RowFilterFactory[Z2Filter] { Array(sfc.lon.normalize(xmin), sfc.lat.normalize(ymin), sfc.lon.normalize(xmax), sfc.lat.normalize(ymax)) }.toArray - new Z2Filter(xy) + new Z2Filter(xy, seek) } override def serializeToBytes(filter: Z2Filter): Array[Byte] = { - // 4 bytes for length plus 16 bytes for each xy val (4 ints) - val xyLength = 4 + filter.xy.length * 16 + // 4 bytes for length plus 1 byte for seek plus 16 bytes for each xy val (4 ints) + val xyLength = 5 + filter.xy.length * 16 val buffer = ByteBuffer.allocate(xyLength) buffer.putInt(filter.xy.length) filter.xy.foreach(bounds => bounds.foreach(buffer.putInt)) + buffer.put(if (filter.seek) { 1.toByte } else { 0.toByte }) buffer.array() } @@ -71,16 +112,26 @@ object Z2Filter extends RowFilterFactory[Z2Filter] { override def deserializeFromBytes(serialized: Array[Byte]): Z2Filter = { val buffer = ByteBuffer.wrap(serialized) val xy = Array.fill(buffer.getInt())(Array.fill(4)(buffer.getInt)) - new Z2Filter(xy) + val seek = if (buffer.hasRemaining) { buffer.get() > 0 } else { true } + new Z2Filter(xy, seek) } override def serializeToStrings(filter: Z2Filter): Map[String, String] = { val xy = filter.xy.map(bounds => bounds.mkString(RangeSeparator)).mkString(TermSeparator) - Map(XYKey -> xy) + Map(Z3Filter.XYKey -> xy, Z3Filter.SeekKey -> java.lang.Boolean.toString(filter.seek)) } override def deserializeFromStrings(serialized: scala.collection.Map[String, String]): Z2Filter = { - val xy = serialized(XYKey).split(TermSeparator).map(_.split(RangeSeparator).map(_.toInt)) - new Z2Filter(xy) + val xy = serialized(Z3Filter.XYKey).split(TermSeparator).map(_.split(RangeSeparator).map(_.toInt)) + new Z2Filter(xy, serialized.get(Z3Filter.SeekKey).forall(_.toBoolean)) + } + + /** + * Gets z-low and z-hi for a bounding box in normalized space + */ + private def zBounds(xy: Array[Array[Int]]): Array[Array[Long]] = { + val bounds = xy.map { case Array(xmin, ymin, xmax, ymax) => Array(Z2(xmin, ymin).z, Z2(xmax, ymax).z) } + java.util.Arrays.sort(bounds, ZBoundsOrdering) + bounds } } \ No newline at end of file diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z3Filter.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z3Filter.scala index 4211f48f07f1..a2f990bb166e 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z3Filter.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/filters/Z3Filter.scala @@ -52,7 +52,8 @@ class Z3Filter( val bounds = xy.map { case Array(xmin, ymin, xmax, ymax) => Z3Filter.zBound(xmin, ymin, 0, xmax, ymax, maxTime) } - ZBounds(bounds, bounds.map(_.head).min) + java.util.Arrays.sort(bounds, Z3Filter.ZBoundsOrdering) + ZBounds(bounds, bounds.head.head) } private val zBounds: Array[ZBounds] = if (!seek) { null } else { @@ -64,7 +65,8 @@ class Z3Filter( Z3Filter.zBound(xmin, ymin, tmin, xmax, ymax, tmax) } } - ZBounds(bounds, bounds.map(_.head).min) + java.util.Arrays.sort(bounds, Z3Filter.ZBoundsOrdering) + ZBounds(bounds, bounds.head.head) } } @@ -168,6 +170,8 @@ object Z3Filter extends RowFilterFactory[Z3Filter] { import org.locationtech.geomesa.index.conf.QueryHints.RichHints + private val ZBoundsOrdering = Ordering.by[Array[Long], Long](_.head) + private val RangeSeparator = ":" private val TermSeparator = ";" private val EpochSeparator = ","