Skip to content

Commit

Permalink
GEOMESA-2973 Fix indexing for yearly z3 epoch (#2673)
Browse files Browse the repository at this point in the history
* GEOMESA-2973 Fix indexing for yearly z3 epoch

* Does not fix z3 index versions < 5 (created with GeoMesa versions < 2.0.0)

Signed-off-by: Emilio Lahr-Vivaz <[email protected]>
  • Loading branch information
elahrvivaz authored Jan 7, 2021
1 parent b444b29 commit 4b3669f
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.util.Date
import com.typesafe.scalalogging.LazyLogging
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.curve.BinnedTime.TimeToBinnedTime
import org.locationtech.geomesa.curve.{BinnedTime, XZ3SFC}
import org.locationtech.geomesa.curve.{BinnedTime, TimePeriod, XZ3SFC}
import org.locationtech.geomesa.filter.FilterValues
import org.locationtech.geomesa.index.api.IndexKeySpace.IndexKeySpaceFactory
import org.locationtech.geomesa.index.api.ShardStrategy.{NoShardStrategy, ZShardStrategy}
Expand Down Expand Up @@ -45,7 +45,11 @@ class XZ3IndexKeySpace(val sft: SimpleFeatureType, val sharding: ShardStrategy,
protected val geomIndex: Int = sft.indexOf(geomField)
protected val dtgIndex: Int = sft.indexOf(dtgField)

protected val sfc = XZ3SFC(sft.getXZPrecision, sft.getZ3Interval)
// noinspection ScalaDeprecation
protected val sfc: XZ3SFC = sft.getZ3Interval match {
case TimePeriod.Year => new org.locationtech.geomesa.curve.LegacyYearXZ3SFC(sft.getXZPrecision)
case p => XZ3SFC(sft.getXZPrecision, p)
}
protected val timeToIndex: TimeToBinnedTime = BinnedTime.timeToBinnedTime(sft.getZ3Interval)

private val dateToIndex = BinnedTime.dateToBinnedTime(sft.getZ3Interval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.util.Date
import com.typesafe.scalalogging.LazyLogging
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.curve.BinnedTime.TimeToBinnedTime
import org.locationtech.geomesa.curve.{BinnedTime, Z3SFC}
import org.locationtech.geomesa.curve.{BinnedTime, TimePeriod, Z3SFC}
import org.locationtech.geomesa.filter.FilterValues
import org.locationtech.geomesa.index.api.IndexKeySpace.IndexKeySpaceFactory
import org.locationtech.geomesa.index.api.ShardStrategy.{NoShardStrategy, ZShardStrategy}
Expand Down Expand Up @@ -45,7 +45,11 @@ class Z3IndexKeySpace(val sft: SimpleFeatureType,
s"Expected field $dtgField to have a date binding, but instead it has: " +
sft.getDescriptor(dtgField).getType.getBinding.getSimpleName)

protected val sfc = Z3SFC(sft.getZ3Interval)
// noinspection ScalaDeprecation
protected val sfc: Z3SFC = sft.getZ3Interval match {
case TimePeriod.Year => new org.locationtech.geomesa.curve.LegacyYearZ3SFC()
case p => Z3SFC(p)
}

protected val geomIndex: Int = sft.indexOf(geomField)
protected val dtgIndex: Int = sft.indexOf(dtgField)
Expand Down Expand Up @@ -245,7 +249,16 @@ class Z3IndexKeySpace(val sft: SimpleFeatureType,
val looseBBox = Option(hints.get(LOOSE_BBOX)).map(Boolean.unbox).getOrElse(config.forall(_.looseBBox))
def unboundedDates: Boolean = values.exists(_.temporalUnbounded.nonEmpty)
def complexGeoms: Boolean = values.exists(_.geometries.values.exists(g => !GeometryUtils.isRectangular(g)))
!looseBBox || unboundedDates || complexGeoms
val base = !looseBBox || unboundedDates || complexGeoms

base || values.exists { v =>
// fix to handle incorrect yearly z values - use full filter if querying the collapsed days
sft.getZ3Interval == TimePeriod.Year && v.intervals.exists { bounds =>
(bounds.lower.value.toSeq ++ bounds.upper.value).exists { date =>
dateToIndex(date).offset.toDouble >= sfc.time.max
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/***********************************************************************
* Copyright (c) 2013-2020 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.index.index

import com.typesafe.scalalogging.LazyLogging
import org.geotools.data.{Query, Transaction}
import org.geotools.filter.text.ecql.ECQL
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.index.TestGeoMesaDataStore
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.io.WithClose
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

@RunWith(classOf[JUnitRunner])
class XZ3IndexTest extends Specification with LazyLogging {

"XZ3Index" should {
"index and query yearly epochs correctly" in {
val spec =
"name:String,track:String,dtg:Date,*geom:LineString:srid=4326;" +
"geomesa.z3.interval=year,geomesa.indices.enabled=xz3:geom:dtg"

val sft = SimpleFeatureTypes.createType("test", spec)

val ds = new TestGeoMesaDataStore(false) // requires strict bbox...

// note: 2020 was a leap year
val features =
(0 until 10).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track1", s"2020-12-07T0$i:00:00.000Z", s"LINESTRING(4$i 60, 4$i 61)")
} ++ (10 until 20).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track2", s"2020-12-${i}T$i:00:00.000Z", s"LINESTRING(4${i - 10} 60, 4${i - 10} 61)")
} ++ (20 until 30).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track3", s"2020-12-${i}T${i-10}:00:00.000Z", s"LINESTRING(6${i - 20} 60, 6${i - 20} 61)")
} ++ (30 until 32).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track4", s"2020-12-${i}T${i-10}:00:00.000Z", s"LINESTRING(${i - 20} 60, ${i - 20} 61)")
}

ds.createSchema(sft)
WithClose(ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer =>
features.foreach { f =>
FeatureUtils.copyToWriter(writer, f, useProvidedFid = true)
writer.write()
}
}

val filter = ECQL.toFilter("bbox(geom,0,55,70,65) AND dtg during 2020-12-01T00:00:00.000Z/2020-12-31T23:59:59.999Z")

SelfClosingIterator(ds.getFeatureReader(new Query("test", filter), Transaction.AUTO_COMMIT)).toList must
containTheSameElementsAs(features)

val lastDayFilter = ECQL.toFilter("bbox(geom,0,55,70,65) AND dtg during 2020-12-31T00:00:00.000Z/2020-12-31T23:59:59.999Z")

val lastDayResults =
SelfClosingIterator(ds.getFeatureReader(new Query("test", lastDayFilter), Transaction.AUTO_COMMIT)).toList

lastDayResults mustEqual Seq(features.last)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/***********************************************************************
* Copyright (c) 2013-2020 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.index.index

import com.typesafe.scalalogging.LazyLogging
import org.geotools.data.{Query, Transaction}
import org.geotools.filter.text.ecql.ECQL
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.index.TestGeoMesaDataStore
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.io.WithClose
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

@RunWith(classOf[JUnitRunner])
class Z3IndexTest extends Specification with LazyLogging {

"Z3Index" should {
"index and query yearly epochs correctly" in {
val spec =
"name:String,track:String,dtg:Date,*geom:Point:srid=4326;" +
"geomesa.z3.interval=year,geomesa.indices.enabled=z3:geom:dtg"

val sft = SimpleFeatureTypes.createType("test", spec)

val ds = new TestGeoMesaDataStore(false) // requires strict bbox...

// note: 2020 was a leap year
val features =
(0 until 10).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track1", s"2020-12-07T0$i:00:00.000Z", s"POINT(4$i 60)")
} ++ (10 until 20).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track2", s"2020-12-${i}T$i:00:00.000Z", s"POINT(4${i - 10} 60)")
} ++ (20 until 30).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track3", s"2020-12-${i}T${i-10}:00:00.000Z", s"POINT(6${i - 20} 60)")
} ++ (30 until 32).map { i =>
ScalaSimpleFeature.create(sft, s"$i", s"name$i", "track4", s"2020-12-${i}T${i-10}:00:00.000Z", s"POINT(${i - 20} 60)")
}

ds.createSchema(sft)
WithClose(ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer =>
features.foreach { f =>
FeatureUtils.copyToWriter(writer, f, useProvidedFid = true)
writer.write()
}
}

val filter = ECQL.toFilter("bbox(geom,0,55,70,65) AND dtg during 2020-12-01T00:00:00.000Z/2020-12-31T23:59:59.999Z")

SelfClosingIterator(ds.getFeatureReader(new Query("test", filter), Transaction.AUTO_COMMIT)).toList must
containTheSameElementsAs(features)

val lastDayFilter = ECQL.toFilter("bbox(geom,9,59,12,61) AND dtg during 2020-12-31T00:00:00.000Z/2020-12-31T23:59:59.999Z")

val lastDayResults =
SelfClosingIterator(ds.getFeatureReader(new Query("test", lastDayFilter), Transaction.AUTO_COMMIT)).toList

lastDayResults mustEqual Seq(features.last)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ object BinnedTime {
case TimePeriod.Day => ChronoUnit.DAYS.getDuration.toMillis
case TimePeriod.Week => ChronoUnit.WEEKS.getDuration.toMillis / 1000L
case TimePeriod.Month => (ChronoUnit.DAYS.getDuration.toMillis / 1000L) * 31L
case TimePeriod.Year => ChronoUnit.WEEKS.getDuration.toMinutes * 52L
// based on 365 days + 1 leap day, with a fudge factor of 10 minutes to account for leap seconds added each year
case TimePeriod.Year => (ChronoUnit.DAYS.getDuration.toMinutes * 366L) + 10L
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/***********************************************************************
* Copyright (c) 2013-2020 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.curve

import java.time.temporal.ChronoUnit

/**
* XZ3SFC with a legacy, incorrect max time value of 52 weeks. The max value is kept the same to ensure that
* index keys and query ranges are consistent. Any dates that exceed the original max time will be dropped into
* the last time bin, potentially degrading results for the last day or two of the year.
*
* @param g resolution level of the curve - i.e. how many times the space will be recursively quartered
*/
@deprecated("XZ3SFC", "3.2.0")
class LegacyYearXZ3SFC(g: Short)
extends XZ3SFC(g, (-180.0, 180.0), (-90.0, 90.0), (0.0, ChronoUnit.WEEKS.getDuration.toMinutes * 52d)) {

// the correct max time duration
private val maxTime = BinnedTime.maxOffset(TimePeriod.Year).toDouble
// the incorrect max time duration
private val zHi = zBounds._2

override protected def normalize(
xmin: Double,
ymin: Double,
zmin: Double,
xmax: Double,
ymax: Double,
zmax: Double,
lenient: Boolean): (Double, Double, Double, Double, Double, Double) = {
if (zmax > zHi && zmax <= maxTime) {
super.normalize(xmin, ymin, math.min(zmin, zHi), xmax, ymax, zHi, lenient)
} else {
super.normalize(xmin, ymin, zmin, xmax, ymax, zmax, lenient)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/***********************************************************************
* Copyright (c) 2013-2020 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.curve

import java.time.temporal.ChronoUnit

import org.locationtech.geomesa.curve.NormalizedDimension.NormalizedTime
import org.locationtech.sfcurve.zorder.Z3

/**
* Z3SFC with a legacy, incorrect max time value of 52 weeks. The max value is kept the same to ensure that
* index keys and query ranges are consistent. Any dates that exceed the original max time will be dropped into
* the last time bin, potentially degrading results for the last day or two of the year.
*
* @param precision bits used per dimension - note all precisions must sum to less than 64
*/
@deprecated("Z3SFC", "3.2.0")
class LegacyYearZ3SFC(precision: Int = 21) extends {
// need to use early instantiation here to prevent errors in creating parent class
// legacy incorrect time max duration
override val time: NormalizedDimension =
NormalizedTime(precision, ChronoUnit.WEEKS.getDuration.toMinutes * 52d)
} with Z3SFC(TimePeriod.Year, precision) {

// the correct max time duration
private val maxTime = BinnedTime.maxOffset(TimePeriod.Year)

override def index(x: Double, y: Double, t: Long, lenient: Boolean = false): Z3 = {
if (t > time.max && t <= maxTime) {
super.index(x, y, time.max.toLong, lenient)
} else {
super.index(x, y, t, lenient)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,14 @@ class XZ3SFC(val g: Short, val xBounds: (Double, Double), val yBounds: (Double,
* @param lenient standardize boundaries to valid values, or raise an exception
* @return
*/
private def normalize(xmin: Double,
ymin: Double,
zmin: Double,
xmax: Double,
ymax: Double,
zmax: Double,
lenient: Boolean): (Double, Double, Double, Double, Double, Double) = {
protected def normalize(
xmin: Double,
ymin: Double,
zmin: Double,
xmax: Double,
ymax: Double,
zmax: Double,
lenient: Boolean): (Double, Double, Double, Double, Double, Double) = {
require(xmin <= xmax && ymin <= ymax && zmin <= zmax,
s"Bounds must be ordered: [$xmin $xmax] [$ymin $ymax] [$zmin $zmax]")

Expand Down

0 comments on commit 4b3669f

Please sign in to comment.