From 09a0a508e1b5792da598b4dc4ba94a3848c253a4 Mon Sep 17 00:00:00 2001 From: 5iFish <35797277+5iFish@users.noreply.github.com> Date: Tue, 16 Nov 2021 01:27:52 +0800 Subject: [PATCH] GEOMESA-3147 Fixed None push-down filter applied in GeoMesaRelation (#2810) * Fixed none push-down geotools filter applied to the relation --- .../geomesa/spark/GeoMesaRelation.scala | 4 +- .../geomesa/spark/SparkSQLDataTest.scala | 49 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/geomesa-spark/geomesa-spark-sql/src/main/scala/org/locationtech/geomesa/spark/GeoMesaRelation.scala b/geomesa-spark/geomesa-spark-sql/src/main/scala/org/locationtech/geomesa/spark/GeoMesaRelation.scala index 5b1231966d68..b09e416328cc 100644 --- a/geomesa-spark/geomesa-spark-sql/src/main/scala/org/locationtech/geomesa/spark/GeoMesaRelation.scala +++ b/geomesa-spark/geomesa-spark-sql/src/main/scala/org/locationtech/geomesa/spark/GeoMesaRelation.scala @@ -279,8 +279,10 @@ object GeoMesaRelation extends LazyLogging { Some(IndexedRDD(rdd)) } } + + val filter = Option(ECQL.toFilter(params.getOrElse("query", "INCLUDE"))) - GeoMesaRelation(sqlContext, sft, schema, params, None, cached, partitioned) + GeoMesaRelation(sqlContext, sft, schema, params, filter, cached, partitioned) } /** diff --git a/geomesa-spark/geomesa-spark-sql/src/test/scala/org/locationtech/geomesa/spark/SparkSQLDataTest.scala b/geomesa-spark/geomesa-spark-sql/src/test/scala/org/locationtech/geomesa/spark/SparkSQLDataTest.scala index 828547ea727a..401769ea9520 100644 --- a/geomesa-spark/geomesa-spark-sql/src/test/scala/org/locationtech/geomesa/spark/SparkSQLDataTest.scala +++ b/geomesa-spark/geomesa-spark-sql/src/test/scala/org/locationtech/geomesa/spark/SparkSQLDataTest.scala @@ -79,6 +79,55 @@ class SparkSQLDataTest extends Specification with LazyLogging { dfIndexed.collect.length mustEqual 3 } + + + "create spatially partitioned relation with date query option" >> { + dfPartitioned = spark.read + .format("geomesa") + .options(dsParams) + .option("geomesa.feature", "chicago") + .option("spatial", "true") + .option("query", "dtg AFTER 2016-01-01T10:00:00.000Z") + .load() + logger.debug(df.schema.treeString) + + dfPartitioned.createOrReplaceTempView("chicagoPartitionedWithQuery") + + spark.sql("select * from chicagoPartitionedWithQuery") + .collect().map{ r=> r.get(0) } mustEqual Array("2", "3") + } + + "create spatially partitioned relation with attribute query option" >> { + dfPartitioned = spark.read + .format("geomesa") + .options(dsParams) + .option("geomesa.feature", "chicago") + .option("spatial", "true") + .option("query", "case_number < 3") + .load() + logger.debug(df.schema.treeString) + + dfPartitioned.createOrReplaceTempView("chicagoPartitionedWithQuery") + + spark.sql("select * from chicagoPartitionedWithQuery") + .collect().map{ r=> r.get(0) } mustEqual Array("1", "2") + } + + "create spatially partitioned relation with spatial query option" >> { + dfPartitioned = spark.read + .format("geomesa") + .options(dsParams) + .option("geomesa.feature", "chicago") + .option("spatial", "true") + .option("query", "BBOX(geom, -76.7, 38.2, -76.2, 38.7)") + .load() + logger.debug(df.schema.treeString) + + dfPartitioned.createOrReplaceTempView("chicagoPartitionedWithQuery") + + spark.sql("select * from chicagoPartitionedWithQuery") + .collect().map{ r=> r.get(0) } mustEqual Array("1") + } "create spatially partitioned relation" >> { dfPartitioned = spark.read