Skip to content

Commit

Permalink
Add z2 filter seeking
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Dec 17, 2021
1 parent e163475 commit bafaed6
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,12 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends IndexAdapter[AccumuloD
val indexIter = if (index.name == Z3Index.name) {
strategy.values.toSeq.map { case v: Z3IndexValues =>
val offset = index.keySpace.sharding.length + index.keySpace.sharing.length
Z3Iterator.configure(v, offset, hints.getFilterCompatibility, ZIterPriority)
Z3Iterator.configure(v, offset, hints, ZIterPriority)
}
} else if (index.name == Z2Index.name) {
strategy.values.toSeq.map { case v: Z2IndexValues =>
Z2Iterator.configure(v, index.keySpace.sharding.length + index.keySpace.sharing.length, ZIterPriority)
val offset = index.keySpace.sharding.length + index.keySpace.sharing.length
Z2Iterator.configure(v, offset, hints, ZIterPriority)
}
} else if (index.name == S3Index.name) {
strategy.values.toSeq.map { case v: S3IndexValues =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@
package org.locationtech.geomesa.accumulo.iterators

import org.apache.accumulo.core.client.IteratorSetting
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.index.filters.Z2Filter
import org.locationtech.geomesa.index.index.z2.Z2IndexValues

class Z2Iterator extends RowFilterIterator[Z2Filter](Z2Filter)

object Z2Iterator {
def configure(values: Z2IndexValues, offset: Int, priority: Int): IteratorSetting = {

@deprecated
def configure(values: Z2IndexValues, offset: Int, priority: Int): IteratorSetting =
configure(values, offset, new Hints(), priority)

def configure(values: Z2IndexValues, offset: Int, hints: Hints, priority: Int): IteratorSetting = {
val is = new IteratorSetting(priority, "z2", classOf[Z2Iterator])
// index space values for comparing in the iterator
Z2Filter.serializeToStrings(Z2Filter(values)).foreach { case (k, v) => is.addOption(k, v) }
Z2Filter.serializeToStrings(Z2Filter(values, hints)).foreach { case (k, v) => is.addOption(k, v) }
// account for shard and table sharing bytes
is.addOption(RowFilterIterator.RowOffsetKey, offset.toString)
is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
package org.locationtech.geomesa.accumulo.iterators

import org.apache.accumulo.core.client.IteratorSetting
import org.locationtech.geomesa.index.conf.FilterCompatibility
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.index.conf.{FilterCompatibility, QueryHints}
import org.locationtech.geomesa.index.conf.FilterCompatibility.FilterCompatibility
import org.locationtech.geomesa.index.filters.Z3Filter
import org.locationtech.geomesa.index.index.z3.Z3IndexValues
Expand All @@ -18,24 +19,37 @@ class Z3Iterator extends RowFilterIterator[Z3Filter](Z3Filter)

object Z3Iterator {

import org.locationtech.geomesa.index.conf.QueryHints.RichHints

@deprecated
def configure(
values: Z3IndexValues,
offset: Int,
compatibility: Option[FilterCompatibility],
priority: Int): IteratorSetting = {
val hints = new Hints()
compatibility.foreach(c => hints.put(QueryHints.FILTER_COMPAT, c.toString))
configure(values, offset, hints, priority)
}

/**
* Configure the iterator
*
* @param values index values
* @param offset offset for z-value in each row
* @param compatibility compatibility mode
* @param hints query hints
* @param priority iterator priority
* @return
*/
def configure(
values: Z3IndexValues,
offset: Int,
compatibility: Option[FilterCompatibility],
hints: Hints,
priority: Int): IteratorSetting = {

val opts = compatibility match {
val opts = hints.getFilterCompatibility match {
case None =>
Z3Filter.serializeToStrings(Z3Filter(values)) + (RowFilterIterator.RowOffsetKey -> offset.toString)
Z3Filter.serializeToStrings(Z3Filter(values, hints)) + (RowFilterIterator.RowOffsetKey -> offset.toString)

case Some(FilterCompatibility.`1.3`) =>
val Z3IndexValues(sfc, _, bounds, _, times, _) = values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import org.apache.accumulo.core.data.{ByteSequence, Key, Range, Value}
import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator}
import org.apache.hadoop.io.Text
import org.geotools.filter.text.ecql.ECQL
import org.geotools.util.factory.Hints
import org.junit.runner.RunWith
import org.locationtech.geomesa.index.api.ShardStrategy.NoShardStrategy
import org.locationtech.geomesa.index.conf.FilterCompatibility
import org.locationtech.geomesa.index.conf.{FilterCompatibility, QueryHints}
import org.locationtech.geomesa.index.index.z3.Z3IndexKeySpace
import org.locationtech.geomesa.index.index.z3.legacy.Z3IndexV4.Z3IndexKeySpaceV4
import org.locationtech.geomesa.index.utils.ExplainNull
Expand Down Expand Up @@ -57,7 +58,7 @@ class Z3IteratorTest extends Specification {
}

val iter = new Z3Iterator
iter.init(srcIter, Z3Iterator.configure(indexValues, 0, None, 25).getOptions, null)
iter.init(srcIter, Z3Iterator.configure(indexValues, 0, new Hints(), 25).getOptions, null)
iter.seek(new org.apache.accumulo.core.data.Range(new Key(new Text(k)), null), null, inclusive = false)
iter
}
Expand Down Expand Up @@ -101,7 +102,9 @@ class Z3IteratorTest extends Specification {
val keySpace = new Z3IndexKeySpaceV4(sft, Array.empty, NoShardStrategy, "geom", "dtg")
val filter = "bbox(geom,0,-70,50,-50) and dtg during 2015-06-06T00:00:00.000Z/2015-06-08T00:00:00.000Z"
val values = keySpace.getIndexValues(ECQL.toFilter(filter), ExplainNull)
val config = Z3Iterator.configure(values, 2, Some(FilterCompatibility.`1.3`), 23)
val hints = new Hints()
hints.put(QueryHints.FILTER_COMPAT, "1.3")
val config = Z3Iterator.configure(values, 2, hints, 23)
config.getIteratorClass mustEqual classOf[Z3Iterator].getName
config.getPriority mustEqual 23
// expected values taken from a 1.3 install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,68 @@

package org.locationtech.geomesa.index.filters

import com.typesafe.scalalogging.StrictLogging
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.index.filters.RowFilter.RowFilterFactory
import org.locationtech.geomesa.index.filters.Z3Filter._
import org.locationtech.geomesa.index.filters.RowFilter.{FilterResult, RowFilterFactory}
import org.locationtech.geomesa.index.index.z2.Z2IndexValues
import org.locationtech.geomesa.utils.index.ByteArrays
import org.locationtech.sfcurve.zorder.Z2

import java.nio.ByteBuffer

class Z2Filter(val xy: Array[Array[Int]]) extends RowFilter {
class Z2Filter(val xy: Array[Array[Int]], val seek: Boolean) extends RowFilter with StrictLogging {

override def inBounds(buf: Array[Byte], offset: Int): Boolean = {
val z = ByteArrays.readLong(buf, offset)
private val zBounds: Array[Array[Long]] = Z2Filter.zBounds(xy)

override def filter(row: Array[Byte], offset: Int): FilterResult = {
val z = ByteArrays.readLong(row, offset)
val x = Z2(z).d0
val y = Z2(z).d1
var i = 0
while (i < xy.length) {
val xyi = xy(i)
if (x >= xyi(0) && x <= xyi(2) && y >= xyi(1) && y <= xyi(3)) {
return true
return FilterResult.InBounds
}
i += 1
}
false

if (seek) {
nextJumpIn(z)
} else {
FilterResult.OutOfBounds
}
}

// noinspection ScalaDeprecation
override def inBounds(row: Array[Byte], offset: Int): Boolean = {
filter(row, offset) match {
case FilterResult.InBounds => true
case _ => false
}
}

private def nextJumpIn(z: Long): FilterResult = {
var nextZ = Long.MaxValue

var i = 0
while (i < zBounds.length) {
val Array(zmin, zmax) = zBounds(i)
if (z < zmin) {
if (zmin < nextZ) {
nextZ = zmin
}
} else if (z < zmax) {
val next = Z2.zdivide(z, zmin, zmax)._2
if (next < nextZ) {
nextZ = next
}
}
i += 1
}

logger.trace(s"Seeking ahead from $z to $nextZ")
FilterResult.SkipAhead(ByteArrays.toBytes(nextZ))
}

override def toString: String = Z2Filter.serializeToStrings(this).toSeq.sortBy(_._1).mkString(",")
Expand All @@ -41,6 +79,8 @@ object Z2Filter extends RowFilterFactory[Z2Filter] {

import org.locationtech.geomesa.index.conf.QueryHints.RichHints

private val ZBoundsOrdering = Ordering.by[Array[Long], Long](_.head)

private val RangeSeparator = ":"
private val TermSeparator = ";"

Expand All @@ -54,33 +94,44 @@ object Z2Filter extends RowFilterFactory[Z2Filter] {
Array(sfc.lon.normalize(xmin), sfc.lat.normalize(ymin), sfc.lon.normalize(xmax), sfc.lat.normalize(ymax))
}.toArray

new Z2Filter(xy)
new Z2Filter(xy, seek)
}

override def serializeToBytes(filter: Z2Filter): Array[Byte] = {
// 4 bytes for length plus 16 bytes for each xy val (4 ints)
val xyLength = 4 + filter.xy.length * 16
// 4 bytes for length plus 1 byte for seek plus 16 bytes for each xy val (4 ints)
val xyLength = 5 + filter.xy.length * 16
val buffer = ByteBuffer.allocate(xyLength)

buffer.putInt(filter.xy.length)
filter.xy.foreach(bounds => bounds.foreach(buffer.putInt))
buffer.put(if (filter.seek) { 1.toByte } else { 0.toByte })

buffer.array()
}

override def deserializeFromBytes(serialized: Array[Byte]): Z2Filter = {
val buffer = ByteBuffer.wrap(serialized)
val xy = Array.fill(buffer.getInt())(Array.fill(4)(buffer.getInt))
new Z2Filter(xy)
val seek = if (buffer.hasRemaining) { buffer.get() > 0 } else { true }
new Z2Filter(xy, seek)
}

override def serializeToStrings(filter: Z2Filter): Map[String, String] = {
val xy = filter.xy.map(bounds => bounds.mkString(RangeSeparator)).mkString(TermSeparator)
Map(XYKey -> xy)
Map(Z3Filter.XYKey -> xy, Z3Filter.SeekKey -> java.lang.Boolean.toString(filter.seek))
}

override def deserializeFromStrings(serialized: scala.collection.Map[String, String]): Z2Filter = {
val xy = serialized(XYKey).split(TermSeparator).map(_.split(RangeSeparator).map(_.toInt))
new Z2Filter(xy)
val xy = serialized(Z3Filter.XYKey).split(TermSeparator).map(_.split(RangeSeparator).map(_.toInt))
new Z2Filter(xy, serialized.get(Z3Filter.SeekKey).forall(_.toBoolean))
}

/**
* Gets z-low and z-hi for a bounding box in normalized space
*/
private def zBounds(xy: Array[Array[Int]]): Array[Array[Long]] = {
val bounds = xy.map { case Array(xmin, ymin, xmax, ymax) => Array(Z2(xmin, ymin).z, Z2(xmax, ymax).z) }
java.util.Arrays.sort(bounds, ZBoundsOrdering)
bounds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class Z3Filter(
val bounds = xy.map { case Array(xmin, ymin, xmax, ymax) =>
Z3Filter.zBound(xmin, ymin, 0, xmax, ymax, maxTime)
}
ZBounds(bounds, bounds.map(_.head).min)
java.util.Arrays.sort(bounds, Z3Filter.ZBoundsOrdering)
ZBounds(bounds, bounds.head.head)
}

private val zBounds: Array[ZBounds] = if (!seek) { null } else {
Expand All @@ -64,7 +65,8 @@ class Z3Filter(
Z3Filter.zBound(xmin, ymin, tmin, xmax, ymax, tmax)
}
}
ZBounds(bounds, bounds.map(_.head).min)
java.util.Arrays.sort(bounds, Z3Filter.ZBoundsOrdering)
ZBounds(bounds, bounds.head.head)
}
}

Expand Down Expand Up @@ -168,6 +170,8 @@ object Z3Filter extends RowFilterFactory[Z3Filter] {

import org.locationtech.geomesa.index.conf.QueryHints.RichHints

private val ZBoundsOrdering = Ordering.by[Array[Long], Long](_.head)

private val RangeSeparator = ":"
private val TermSeparator = ";"
private val EpochSeparator = ","
Expand Down

0 comments on commit bafaed6

Please sign in to comment.