From add94b943824811dc804c658a042cca04841d757 Mon Sep 17 00:00:00 2001 From: Emilio Date: Wed, 23 Oct 2024 11:34:53 -0400 Subject: [PATCH] GEOMESA-3406 Postgis - support filtering on list-type attributes (#3221) --- geomesa-gt/geomesa-gt-partitioning/pom.xml | 6 ++ .../dialect/PartitionedPostgisPsDialect.scala | 87 ++++++++++++++++--- .../PartitionedPostgisDataStoreTest.scala | 66 +++++++++++++- 3 files changed, 147 insertions(+), 12 deletions(-) diff --git a/geomesa-gt/geomesa-gt-partitioning/pom.xml b/geomesa-gt/geomesa-gt-partitioning/pom.xml index 5828e402c617..7073290c7e8b 100644 --- a/geomesa-gt/geomesa-gt-partitioning/pom.xml +++ b/geomesa-gt/geomesa-gt-partitioning/pom.xml @@ -53,6 +53,12 @@ org.testcontainers testcontainers + + + javax.media + jai_core + test + diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala index 7ff46d6c5ba3..d90e7f1d5a98 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisPsDialect.scala @@ -12,8 +12,13 @@ import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache} import org.geotools.api.feature.`type`.AttributeDescriptor import org.geotools.api.feature.simple.SimpleFeatureType import org.geotools.api.filter.Filter -import org.geotools.data.postgis.PostGISPSDialect -import org.geotools.jdbc.JDBCDataStore +import org.geotools.api.filter.expression.{Expression, PropertyName} +import org.geotools.data.postgis.{PostGISPSDialect, PostgisPSFilterToSql} +import org.geotools.feature.AttributeTypeBuilder +import org.geotools.feature.simple.SimpleFeatureTypeBuilder +import org.geotools.jdbc.{JDBCDataStore, PreparedFilterToSQL} +import org.geotools.util.Version +import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisPsDialect.PartitionedPostgisPsFilterToSql import java.lang.invoke.{MethodHandle, MethodHandles, MethodType} import java.sql.{Connection, DatabaseMetaData, PreparedStatement, Types} @@ -47,6 +52,15 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos MethodHandles.lookup.findSpecial(classOf[PostGISPSDialect], "setValue", methodType, classOf[PartitionedPostgisPsDialect]) } + override def createPreparedFilterToSQL: PreparedFilterToSQL = { + val fts = new PartitionedPostgisPsFilterToSql(this, delegate.getPostgreSQLVersion(null)) + fts.setFunctionEncodingEnabled(delegate.isFunctionEncodingEnabled) + fts.setLooseBBOXEnabled(delegate.isLooseBBOXEnabled) + fts.setEncodeBBOXFilterAsEnvelope(delegate.isEncodeBBOXFilterAsEnvelope) + fts.setEscapeBackslash(delegate.isEscapeBackslash) + fts + } + override def setValue( value: AnyRef, binding: Class[_], @@ -57,7 +71,7 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos // json columns are string type in geotools, but we have to use setObject or else we get a binding error if (binding == classOf[String] && jsonColumns.get(new PreparedStatementKey(ps, column))) { ps.setObject(column, value, Types.OTHER) - } else if (binding == classOf[java.util.List[_]]) { + } else if (binding.isArray || binding == classOf[java.util.List[_]]) { // handle bug in jdbc store not calling setArrayValue in update statements value match { case null => @@ -77,9 +91,8 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos setArray(array, ps, column, cx) } - case _ => - // this will almost certainly fail... - super.setValue(value, binding, att, ps, column, cx) + case singleton => + setArray(Array(singleton), ps, column, cx) } } else { super.setValue(value, binding, att, ps, column, cx) @@ -92,7 +105,7 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos // json columns are string type in geotools, but we have to use setObject or else we get a binding error if (binding == classOf[String] && jsonColumns.get(new PreparedStatementKey(ps, column))) { ps.setObject(column, value, Types.OTHER) - } else if (binding == classOf[java.util.List[_]]) { + } else if (binding.isArray || binding == classOf[java.util.List[_]]) { // handle bug in jdbc store not calling setArrayValue in update statements value match { case null => @@ -112,9 +125,8 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos setArray(array, ps, column, cx) } - case _ => - // this will almost certainly fail... - superSetValue.invoke(this, value, binding, ps, column, cx) + case singleton => + setArray(Array(singleton), ps, column, cx) } } else { superSetValue.invoke(this, value, binding, ps, column, cx) @@ -164,6 +176,61 @@ class PartitionedPostgisPsDialect(store: JDBCDataStore, delegate: PartitionedPos object PartitionedPostgisPsDialect { + class PartitionedPostgisPsFilterToSql(dialect: PartitionedPostgisPsDialect, pgVersion: Version) + extends PostgisPSFilterToSql(dialect, pgVersion) { + + import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor + + import scala.collection.JavaConverters._ + + override def setFeatureType(featureType: SimpleFeatureType): Unit = { + // convert List-type attributes to Array-types so that prepared statement bindings work correctly + if (featureType.getAttributeDescriptors.asScala.exists(_.getType.getBinding == classOf[java.util.List[_]])) { + val builder = new SimpleFeatureTypeBuilder() { + override def init(`type`: SimpleFeatureType): Unit = { + super.init(`type`) + attributes().clear() + } + } + builder.init(featureType) + featureType.getAttributeDescriptors.asScala.foreach { descriptor => + val ab = new AttributeTypeBuilder(builder.getFeatureTypeFactory) + ab.init(descriptor) + if (descriptor.getType.getBinding == classOf[java.util.List[_]]) { + ab.setBinding(java.lang.reflect.Array.newInstance(Option(descriptor.getListType()).getOrElse(classOf[String]), 0).getClass) + } + builder.add(ab.buildDescriptor(descriptor.getLocalName)) + } + this.featureType = builder.buildFeatureType() + this.featureType.getUserData.putAll(featureType.getUserData) + } else { + this.featureType = featureType + } + } + + // note: this would be a cleaner solution, but it doesn't get invoked due to explicit calls to + // super.getExpressionType in PostgisPSFilterToSql :/ + override def getExpressionType(expression: Expression): Class[_] = { + val result = Option(expression).collect { case p: PropertyName => p }.flatMap { p => + Option(p.evaluate(featureType).asInstanceOf[AttributeDescriptor]).map { descriptor => + val binding = descriptor.getType.getBinding + if (binding == classOf[java.util.List[_]]) { + val listType = descriptor.getListType() + if (listType == null) { + classOf[Array[String]] + } else { + java.lang.reflect.Array.newInstance(listType, 0).getClass + } + } else { + binding + } + } + } + + result.getOrElse(super.getExpressionType(expression)) + } + } + // uses eq on the prepared statement to ensure that we compute json fields exactly once per prepared statement/col private class PreparedStatementKey(val ps: PreparedStatement, val column: Int) { diff --git a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala index 9a49c35cf4d6..bee8bdc20d8f 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/test/scala/org/locationtech/geomesa/gt/partition/postgis/PartitionedPostgisDataStoreTest.scala @@ -12,7 +12,9 @@ import com.typesafe.scalalogging.LazyLogging import org.geotools.api.data._ import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter +import org.geotools.api.filter.MultiValuedFilter.MatchAction import org.geotools.data._ +import org.geotools.factory.CommonFactoryFinder import org.geotools.feature.simple.SimpleFeatureBuilder import org.geotools.filter.text.ecql.ECQL import org.geotools.jdbc.JDBCDataStore @@ -67,7 +69,7 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll lazy val features = Seq.tabulate(10) { i => val builder = new SimpleFeatureBuilder(sft) - builder.set("name", Collections.singletonList(s"name$i")) + builder.set("name", java.util.List.of(s"name$i", s"alt$i")) builder.set("age", i) builder.set("props", s"""["name$i"]""") builder.set("dtg", new java.util.Date(now - ((i + 1) * 20 * 60 * 1000))) // 20 minutes @@ -91,6 +93,8 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll lazy val host = Option(container).map(_.getHost).getOrElse("localhost") lazy val port = Option(container).map(_.getFirstMappedPort).getOrElse(5432).toString + lazy val fif = CommonFactoryFinder.getFilterFactory + override def beforeAll(): Unit = { val image = DockerImageName.parse("ghcr.io/geomesa/postgis-cron") @@ -109,6 +113,7 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll } "PartitionedPostgisDataStore" should { + "fail with a useful error message if type name is too long" in { val ds = DataStoreFinder.getDataStore(params.asJava) ds must not(beNull) @@ -201,7 +206,64 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll } finally { ds.dispose() } - ok + } + + "filter on list elements" in { + val ds = DataStoreFinder.getDataStore(params.asJava) + ds must not(beNull) + + try { + ds must beAnInstanceOf[JDBCDataStore] + + val sft = SimpleFeatureTypes.renameSft(this.sft, "list-filters") + ds.getTypeNames.toSeq must not(contain(sft.getTypeName)) + ds.createSchema(sft) + + val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null) + schema must not(beNull) + schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq) + logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}") + + // write some data + WithClose(new DefaultTransaction()) { tx => + WithClose(ds.getFeatureWriterAppend(sft.getTypeName, tx)) { writer => + features.foreach { feature => + FeatureUtils.write(writer, feature, useProvidedFid = true) + } + } + tx.commit() + } + + val filters = Seq( + fif.equals(fif.property("name"), fif.literal("name0")), + fif.equal(fif.property("name"), fif.literal("name0"), false, MatchAction.ANY), + fif.equals(fif.property("name"), fif.literal(Collections.singletonList("name0"))), + fif.equal(fif.property("name"), fif.literal(Collections.singletonList("name0")), false, MatchAction.ANY), + fif.equal(fif.property("name"), fif.literal(java.util.List.of("name0", "alt0")), false, MatchAction.ANY), + fif.equal(fif.property("name"), fif.literal(java.util.List.of("name0", "alt0")), false, MatchAction.ALL), + ECQL.toFilter("name = 'name0'"), + ) + foreach(filters) { filter => + WithClose(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)) { reader => + val result = SelfClosingIterator(reader).toList + result must haveLength(1) + compFromDb(result.head) mustEqual compWithFid(features.head, sft) + } + } + + val nonMatchingFilters = Seq( + fif.equal(fif.property("name"), fif.literal("name0"), false, MatchAction.ALL), + fif.equal(fif.property("name"), fif.literal(Collections.singletonList("name0")), false, MatchAction.ALL), + ) + foreach(nonMatchingFilters) { filter => + WithClose(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)) { reader => + val result = SelfClosingIterator(reader).toList + result must beEmpty + } + } + } finally { + ds.dispose() + } } "age-off" in {