Skip to content

Commit

Permalink
GEOMESA-3405 Postgis - support query interceptors (#3219)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Oct 18, 2024
1 parent ff83fb0 commit fe52183
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 11 deletions.
4 changes: 4 additions & 0 deletions geomesa-gt/geomesa-gt-partitioning/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-filter_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.geotools.jdbc</groupId>
<artifactId>gt-jdbc-postgis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.gt.partition.postgis.dialect

import com.typesafe.scalalogging.StrictLogging
import org.geotools.api.data.Query
import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
Expand All @@ -22,8 +23,9 @@ import org.locationtech.geomesa.gt.partition.postgis.dialect.functions.{LogClean
import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures._
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables._
import org.locationtech.geomesa.gt.partition.postgis.dialect.triggers.{DeleteTrigger, InsertTrigger, UpdateTrigger, WriteAheadTrigger}
import org.locationtech.geomesa.index.planning.QueryInterceptor.QueryInterceptorFactory
import org.locationtech.geomesa.utils.geotools.{Conversions, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose}
import org.locationtech.jts.geom._

import java.sql.{Connection, DatabaseMetaData, ResultSet, Types}
Expand Down Expand Up @@ -55,6 +57,12 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto
override def initialValue(): Boolean = false
}

private val interceptors = {
val factory = QueryInterceptorFactory(store)
sys.addShutdownHook(CloseWithLogging(factory)) // we don't have any API hooks to dispose of things...
factory
}

/**
* Re-create the PLPG/SQL procedures associated with a feature type. This can be used
* to 'upgrade in place' if the code is changed.
Expand Down Expand Up @@ -240,7 +248,10 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto

override def splitFilter(filter: Filter, schema: SimpleFeatureType): Array[Filter] = {
import PartitionedPostgisDialect.Config.ConfigConversions
super.splitFilter(SplitFilterVisitor(filter, schema.isFilterWholeWorld), schema)
val simplified = SplitFilterVisitor(filter, schema.isFilterWholeWorld)
val query = new Query(schema.getTypeName, simplified)
interceptors(schema).foreach(_.rewrite(query))
super.splitFilter(query.getFilter, schema)
}

override def registerClassToSqlMappings(mappings: java.util.Map[Class[_], Integer]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ class UserDataTable extends Sql {
s"INSERT INTO ${table.quoted} (type_name, key, value) VALUES (?, ?, ?) " +
s"ON CONFLICT (type_name, key) DO UPDATE SET value = EXCLUDED.value;"

def insert(config: String, value: Option[String]): Unit =
value.foreach(v => ex.executeUpdate(insertSql, Seq(info.typeName, config, v)))
def insert(config: String, value: String): Unit =
ex.executeUpdate(insertSql, Seq(info.typeName, config, value))

insert(SimpleFeatureTypes.Configs.DefaultDtgField, Some(info.cols.dtg.raw))
insert(Config.IntervalHours, Some(Integer.toString(info.partitions.hoursPerPartition)))
insert(Config.PagesPerRange, Some(Integer.toString(info.partitions.pagesPerRange)))
insert(Config.MaxPartitions, info.partitions.maxPartitions.map(Integer.toString))
insert(Config.CronMinute, info.partitions.cronMinute.map(Integer.toString))
insert(Config.FilterWholeWorld, info.userData.get(Config.FilterWholeWorld))
insert(SimpleFeatureTypes.Configs.DefaultDtgField, info.cols.dtg.raw)
insert(Config.IntervalHours, Integer.toString(info.partitions.hoursPerPartition))
insert(Config.PagesPerRange, Integer.toString(info.partitions.pagesPerRange))
info.partitions.maxPartitions.map(Integer.toString).foreach(insert(Config.MaxPartitions, _))
info.partitions.cronMinute.map(Integer.toString).foreach(insert(Config.CronMinute, _))
Seq(Config.FilterWholeWorld, SimpleFeatureTypes.Configs.QueryInterceptors).foreach { key =>
info.userData.get(key).foreach(insert(key, _))
}
}

override def drop(info: TypeInfo)(implicit ex: ExecutionContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,48 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
}
}

"support query interceptors" in {
val sft = SimpleFeatureTypes.renameSft(this.sft, "interceptor")
sft.getUserData.put(SimpleFeatureTypes.Configs.QueryInterceptors, classOf[TestQueryInterceptor].getName)

val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
ds must beAnInstanceOf[JDBCDataStore]

ds.getTypeNames.toSeq must not(contain(sft.getTypeName))
ds.createSchema(sft)

val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null)
schema must not(beNull)
schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq)
logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}")

val Array(left, right) = ds.asInstanceOf[JDBCDataStore].getSQLDialect.splitFilter(Filter.EXCLUDE, schema)
left mustEqual Filter.INCLUDE
right mustEqual Filter.INCLUDE

// write some data
WithClose(new DefaultTransaction()) { tx =>
WithClose(ds.getFeatureWriterAppend(sft.getTypeName, tx)) { writer =>
features.foreach { feature =>
FeatureUtils.write(writer, feature, useProvidedFid = true)
}
}
tx.commit()
}

// verify that filter is re-written to be Filter.INCLUDE
WithClose(ds.getFeatureReader(new Query(sft.getTypeName, ECQL.toFilter("IN('1')")), Transaction.AUTO_COMMIT)) { reader =>
val result = SelfClosingIterator(reader).toList
result.map(compFromDb) must containTheSameElementsAs(features.map(compWithFid(_, sft)))
}
} finally {
ds.dispose()
}
}

"support idle_in_transaction_session_timeout" in {
val sft = SimpleFeatureTypes.renameSft(this.sft, "timeout")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.gt.partition.postgis

import org.geotools.api.data.{DataStore, Query}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.index.planning.QueryInterceptor

class TestQueryInterceptor extends QueryInterceptor {

var sft: SimpleFeatureType = _

override def init(ds: DataStore, sft: SimpleFeatureType): Unit = this.sft = sft

override def rewrite(query: Query): Unit = query.setFilter(Filter.INCLUDE)

override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object QueryInterceptor extends LazyLogging {
classes.split(",").toSeq.flatMap { c =>
var interceptor: QueryInterceptor = null
try {
interceptor = Class.forName(c).newInstance().asInstanceOf[QueryInterceptor]
interceptor = Class.forName(c).getDeclaredConstructor().newInstance().asInstanceOf[QueryInterceptor]
interceptor.init(ds, sft)
Seq(interceptor)
} catch {
Expand Down

0 comments on commit fe52183

Please sign in to comment.