Skip to content

Commit

Permalink
GEOMESA-3311 Add subsampling to GraduatedQueryGuard (#3010)
Browse files Browse the repository at this point in the history
* Configurable subsample percentage based on size
* Configurable subsample attribute name
* Mix duration and percentage/attribute guard modes

Signed-off-by: Austin Heyne <[email protected]>
  • Loading branch information
aheyne authored Nov 16, 2023
1 parent 57205bf commit 71537ca
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 47 deletions.
2 changes: 2 additions & 0 deletions docs/user/datastores/analytic_queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ GeoMesa provides advanced query capabilities through GeoTools query hints. You c
various aspects of query processing or to trigger distributed analytic processing. See :ref:`query_hints`
for details on setting query hints.

.. _feature_sampling:

Feature Sampling
----------------

Expand Down
79 changes: 69 additions & 10 deletions docs/user/datastores/query_interceptor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ to ``true``, where ``<typeName>`` is the name of your feature type.
Graduated Query Guard
+++++++++++++++++++++

The graduated query guard applies different duration limits based on the spatial extent of the query.
As a query becomes larger in space, it can be limited to shorter and shorter time ranges.
A series of rules limit the duration for queries which are at most a given size in square degrees.
This guard also applies the full table scan guard.
The graduated query guard applies different duration limits or result subsampling based on the spatial extent of the
query. As a query becomes larger in space, it can be limited to shorter and shorter time ranges or smaller and smaller
percentages of data. A series of rules limit the duration and percentage for queries which are at most a given size
in square degrees. This guard also applies the full table scan guard.

Percentage subsampling in this guard uses the same statistical sampling as :ref:`feature_sampling`.

To enable the guard, add ``org.locationtech.geomesa.index.planning.guard.GraduatedQueryGuard``
to ``geomesa.query.interceptors`` as indicated above. Configuration is managed via
Expand All @@ -121,12 +123,15 @@ The configuration is under the key ``geomesa.guard.graduated``.

The configuration must satisfy a few conditions:

* there must be a limit on unbounded queries,
* as the size increases, the duration must decrease,
* and a given size limit may not be repeated.
* there must be a limit on unbounded queries
* as the size increases, the duration must decrease
* as the size increases, the percentage must decrease
* once a duration or percentage is defined, all subsequent rules must also define it
* a given size limit may not be repeated

If no size is provided, it is equivalent to an unbounded size.

If no size is provided, it is equivalent to an unbounded size. Durations can be given in a number of days,
hours, or minutes. For example:
Durations can be given in a number of days, hours, or minutes. For example:

.. code-block:: none
Expand All @@ -136,11 +141,65 @@ hours, or minutes. For example:
"sftName" = [
{ size = 1, duration = "60 days" }
{ size = 10, duration = "3 days" }
{ duration = "1 day" }
{ duration = "1 day" }
]
}
}
}
Sampling percentages can be defined in decimal form. e.g. .1 corresponds to 10%. Any query smaller than the first size will
return 100% of the records. For example:

.. code-block:: none
geomesa {
guard {
graduated {
"sftName" = [
{ size = 1, sampling-percentage = .8 }
{ size = 10, sampling-percentage = .5 }
{ sampling-percentage = .1 }
]
}
}
}
It is also possible to specify the sampling attribute to use for the threading key in subsampling:

.. code-block:: none
geomesa {
guard {
graduated {
"sftName" = [
{ size = 1, sampling-percentage = .8, sampling-attribute = "name" }
{ size = 10, sampling-percentage = .5, sampling-attribute = "name" }
{ sampling-percentage = .1, sampling-attribute = "name" }
]
}
}
}
Additionally, it's possible to combine duration and percentage limits, with or without specifying an attribute:

.. code-block:: none
geomesa {
guard {
graduated {
"sftName" = [
{ size = 1, duration = "60 days" }
{ size = 10, duration = "3 days", sampling-percentage = .5 }
{ duration = "1 day", sampling-percentage = .1, sampling-attribute = "name" }
]
}
}
}
In the above example, Any query with area less than 1 square degree will return all results and allow for any time
range (given the time range doesn't trigger a full table scan). A query larger than 1 but less than 10 will block any
query longer than 3 days AND only 50% of the results will be returned. Finally, any query larger than 10 must be less
than 1 day in length AND only 10% of the results for any "name" will be returned.

To disable the guard on a per-environment basis, set the system property ``geomesa.guard.graduated.<typeName>.disable``
to ``true``, where ``<typeName>`` is the name of your feature type.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ package org.locationtech.geomesa.index.planning.guard
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging
import org.geotools.data.{DataStore, Query}
import org.geotools.filter.visitor.ExtractBoundsFilterVisitor
import org.locationtech.geomesa.index.api.QueryStrategy
import org.locationtech.geomesa.index.conf.QueryHints
import org.locationtech.geomesa.index.index.{SpatialIndexValues, SpatioTemporalIndex, TemporalIndexValues}
import org.locationtech.geomesa.index.planning.QueryInterceptor
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty
import org.locationtech.jts.geom.Envelope
import org.opengis.feature.simple.SimpleFeatureType

import scala.concurrent.duration.Duration
Expand All @@ -23,7 +26,7 @@ class GraduatedQueryGuard extends QueryInterceptor with LazyLogging {

import org.locationtech.geomesa.index.planning.guard.GraduatedQueryGuard._

private var guardLimits: Seq[SizeAndDuration] = _
private var guardLimits: Seq[SizeAndLimits] = _
private var disabled: Boolean = false

override def init(ds: DataStore, sft: SimpleFeatureType): Unit = {
Expand All @@ -32,12 +35,25 @@ class GraduatedQueryGuard extends QueryInterceptor with LazyLogging {
logger.info(s"This guard is disabled for schema '${sft.getTypeName}' via system property")
}
// let any errors bubble up and disable this guard
guardLimits = buildLimits(ConfigFactory.load().getConfigList(s"$ConfigPath.${sft.getTypeName}"))
guardLimits = buildLimits(ConfigFactory.load().getConfigList(s"$ConfigPath.${sft.getTypeName}"), sft)
// this should be ensured during the loading of limits, but just double check so we can use `.last` safely below
require(guardLimits.nonEmpty)
}

override def rewrite(query: Query): Unit = {}
override def rewrite(query: Query): Unit = {
if (!disabled) {
val bounds = query.getFilter.accept(ExtractBoundsFilterVisitor.BOUNDS_VISITOR, null).asInstanceOf[Envelope]
val spatialExtent = (bounds.getMaxX - bounds.getMinX) * (bounds.getMaxY - bounds.getMinY)
val limit = guardLimits.find(_.sizeLimit >= spatialExtent).getOrElse{
logger.warn(s"Invalid extents/limits: ${query.getFilter.toString} / ${guardLimits.mkString(", ")}")
guardLimits.last
}
limit.percentageLimit.map{ percentage =>
query.getHints.put(QueryHints.SAMPLING, percentage.toFloat)
limit.sampleAttribute.map(attr => query.getHints.put(QueryHints.SAMPLE_BY, attr))
}
}
}

override def guard(strategy: QueryStrategy): Option[IllegalArgumentException] = {
val msg = if (disabled || !strategy.index.isInstanceOf[SpatioTemporalIndex[_, _]]) { None } else {
Expand All @@ -56,8 +72,9 @@ class GraduatedQueryGuard extends QueryInterceptor with LazyLogging {
logger.warn(s"Invalid extents/limits: ${s.mkString(", ")} / ${guardLimits.mkString(", ")}")
guardLimits.last
}
if (validate(i, limit.durationLimit)) { None } else {
Some(s"Query exceeds maximum allowed filter duration of ${limit.durationLimit} at ${limit.sizeLimit} degrees")
limit.durationLimit.flatMap { l =>
if (!validate(i, l)) Some(s"Query exceeds maximum allowed filter duration of ${limit.durationLimit} " +
s"at ${limit.sizeLimit} degrees") else None
}
}
}
Expand All @@ -75,8 +92,18 @@ object GraduatedQueryGuard extends LazyLogging {
*
* @param sizeLimit in square degrees
* @param durationLimit Maximum duration for a query at or below the spatial size
* @param samplePercent Percentage of total records to return
* @param sampleAttribute Attribute name to use a threading key for subsampling
*/
case class SizeAndDuration(sizeLimit: Int, durationLimit: Duration)
class SizeAndLimits(val sizeLimit: Int, val durationLimit: Option[Duration],
samplePercent: Option[Double], val sampleAttribute: Option[String]) {
require(samplePercent match {
case Some(p) => 0 < p && p <= 1
case None => true
}, "Graduated query guard percentages must be in range (0,1]")

val percentageLimit: Option[Double] = samplePercent
}

def disabled(typeName: String): Boolean =
SystemProperty(s"geomesa.guard.graduated.$typeName.disable").toBoolean.contains(true)
Expand All @@ -85,19 +112,52 @@ object GraduatedQueryGuard extends LazyLogging {
* This function checks conditions on the limits.
* 1. Sizes must not be repeated.
* 2. Smaller sizes must allow for longer durations.
* 3. If a percentage is included it must decrease with larger sizes.
* @param limits Sequence of limits
* @return Sorted list of limits or throws an IllegalArgumentException
*/
private def evaluateLimits(limits: Seq[SizeAndDuration]): Seq[SizeAndDuration] = {
private def evaluateLimits(limits: Seq[SizeAndLimits], sft: SimpleFeatureType): Seq[SizeAndLimits] = {
val candidate = limits.sortBy(_.sizeLimit)
if (candidate.size > 1) {
// Once a duration or percentage is specified, all following configurations must specify it.
var hasDuration = false
var hasPercentage = false
candidate.sliding(2).foreach { case Seq(first, second) =>
if (first.sizeLimit == second.sizeLimit) {
throw new IllegalArgumentException(s"Graduated query guard configuration " +
s"has repeated size: ${first.sizeLimit}")
} else if (first.durationLimit.compareTo(second.durationLimit) <= 0) {
throw new IllegalArgumentException(s"Graduated query guard configuration " +
s"has durations out of order: ${first.durationLimit} is less than ${second.durationLimit}")
}

if (first.durationLimit.isDefined || hasDuration) {
hasDuration = true
if (second.durationLimit.isEmpty) {
throw new IllegalArgumentException(s"Graduated query guard configuration " +
s"has missing duration in size = ${second.sizeLimit}")
}
if (first.durationLimit.get.compareTo(second.durationLimit.get) <= 0) {
throw new IllegalArgumentException(s"Graduated query guard configuration " +
s"has durations out of order: ${first.durationLimit.get} is less than ${second.durationLimit.get}")
}
}

if (first.percentageLimit.isDefined || hasPercentage) {
hasPercentage = true
if (second.percentageLimit.isEmpty) {
throw new IllegalArgumentException(s"Graduated query guard configuration " +
s"has missing percentage in size = ${second.sizeLimit}")
}
if (first.percentageLimit.get < second.percentageLimit.get) {
throw new IllegalArgumentException(s"Graduated query guard configuration " +
s"has percentages out of order or missing: " +
s"${first.percentageLimit.get} is less than ${second.durationLimit.get}")
}
}

Seq(first.sampleAttribute, second.sampleAttribute).foreach{
case Some(attr) if sft.indexOf(attr) == -1 =>
throw new IllegalArgumentException(s"Graduated query guard configuration " +
s"has invalid attribute name for filter. ${first.sampleAttribute.get} not in sft ${sft.getTypeName}")
case _ =>
}
}
} else if (candidate.isEmpty) {
Expand All @@ -110,17 +170,29 @@ object GraduatedQueryGuard extends LazyLogging {
candidate
}

def buildLimits(guardConfig: java.util.List[_ <: Config]): Seq[SizeAndDuration] = {
def buildLimits(guardConfig: java.util.List[_ <: Config], sft: SimpleFeatureType): Seq[SizeAndLimits] = {
import scala.collection.JavaConverters._
val confs = guardConfig.asScala.map { durationConfig =>
val size: Int = if (durationConfig.hasPath("size")) {
durationConfig.getInt("size")
val confs = guardConfig.asScala.map { limitsConfig =>
val size: Int = if (limitsConfig.hasPath("size")) {
limitsConfig.getInt("size")
} else {
Int.MaxValue
}
val duration = Duration(durationConfig.getString("duration"))
SizeAndDuration(size, duration)

val duration: Option[Duration] = if (limitsConfig.hasPath("duration")) {
Some(Duration(limitsConfig.getString("duration")))
} else None

val percentage: Option[Double] = if (limitsConfig.hasPath("sampling-percentage")) {
Some(limitsConfig.getDouble("sampling-percentage"))
} else None

val percentageAttr: Option[String] = if (limitsConfig.hasPath("sampling-attribute")) {
Some(limitsConfig.getString("sampling-attribute"))
} else None

new SizeAndLimits(size, duration, percentage, percentageAttr)
}
evaluateLimits(confs.toSeq)
evaluateLimits(confs.toSeq, sft)
}
}
6 changes: 3 additions & 3 deletions geomesa-index-api/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ geomesa {
guard {
graduated {
"cea650aea6284b5281ee84c784cb56a7" = [
{ size = 1, duration = "60 days" }
{ size = 10, duration = "3 days" }
{ duration = "1 day" }
{ size = 1, duration = "60 days" }
{ size = 10, duration = "3 days", sampling-percentage = .5 }
{ duration = "1 day", sampling-percentage = .1, sampling-attribute = "name" }
]
}
}
Expand Down
Loading

0 comments on commit 71537ca

Please sign in to comment.