diff --git a/geomesa-fs/geomesa-fs-spark-runtime/src/test/scala/org/locationtech/geomesa/fs/spark/FileSystemDSAlignmentTest.scala b/geomesa-fs/geomesa-fs-spark-runtime/src/test/scala/org/locationtech/geomesa/fs/spark/FileSystemDSAlignmentTest.scala index fb89b9bda950..b4b12bb3e37f 100644 --- a/geomesa-fs/geomesa-fs-spark-runtime/src/test/scala/org/locationtech/geomesa/fs/spark/FileSystemDSAlignmentTest.scala +++ b/geomesa-fs/geomesa-fs-spark-runtime/src/test/scala/org/locationtech/geomesa/fs/spark/FileSystemDSAlignmentTest.scala @@ -12,7 +12,6 @@ import java.nio.file.{Files, Path} import com.typesafe.scalalogging.LazyLogging import org.apache.commons.io.FileUtils -import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster} import org.apache.spark.sql.{SQLContext, SQLTypes, SparkSession} import org.geotools.data.simple.SimpleFeatureSource import org.geotools.data.{DataStore, DataStoreFinder, Query, Transaction} @@ -24,6 +23,7 @@ import org.locationtech.geomesa.spark.SparkSQLTestUtils import org.locationtech.geomesa.utils.collection.CloseableIterator import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes} import org.locationtech.geomesa.utils.io.WithClose +import org.opengis.feature.simple.SimpleFeature import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner @@ -33,6 +33,9 @@ import scala.collection.JavaConverters._ class FileSystemDSAlignmentTest extends Specification with LazyLogging { sequential + // TODO count is based on metadata which we don't currently calculate + val countWithMetadata = false // use metadata for count checks or run queries and counr results + val tempDir: Path = Files.createTempDirectory("fsAlignmentTest") lazy val directory1: String = tempDir + "/data/first" @@ -56,11 +59,16 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging { // val formats = Seq("orc", "parquet") val formats = Seq("parquet") + def runQuery(fs: SimpleFeatureSource, q: Query): List[SimpleFeature] = + CloseableIterator(fs.getFeatures(q).features()).toList + + def getCount(fs: SimpleFeatureSource, q: Query): Int = + if (countWithMetadata) { fs.getCount(q) } else { runQuery(fs, q).size } + "FileSystem DataStore" should { "Write data to directory1 using the GT FSDS" >> { - println(s" ***: Temporary directory is ${tempDir}. Directory is $directory1.") + println(s" ***: Temporary directory is $tempDir. Directory is $directory1.") - // TODO: This writing approach writes out the column data as "case_5fnumber" formats.foreach { format => val sft = SimpleFeatureTypes.createType(format, "arrest:String,case_number:Int:index=full:cardinality=high,dtg:Date,*geom:Point:srid=4326") @@ -81,19 +89,17 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging { ok } - // Commented out since this works! -// "Query directory2 with GeoMesa's Spark integration" >> { -// foreach(formats) { format => -// queryWithSpark(format, directory1) -// } -// } + "Query directory2 with GeoMesa's Spark integration" >> { + foreach(formats) { format => + queryWithSpark(format, directory1) + } + } - // Commented out since this works! -// "Query directory1 with the GM FSDS DS" >> { -// foreach(formats) { format => -// queryWithGeoTools(format, directory1) -// } -// } + "Query directory1 with the GM FSDS DS" >> { + foreach(formats) { format => + queryWithGeoTools(format, directory1) + } + } "write data to directory2 using Spark" >> { foreach(formats) { format => @@ -121,24 +127,17 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging { val format = "parquet" val fs: SimpleFeatureSource = getFeatureSource(format, location) val q = new Query(format) - // TODO count is based on metadata which we don't currently calculate -// val count = fs.getCount(q) -// count mustEqual(3) - //fs.getCount(new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual(1) - - val feats = CloseableIterator(fs.getFeatures(new Query(format)).features()).toList - feats.foreach{println} - feats.size mustEqual(3) + getCount(fs, q) mustEqual 3 + getCount(fs, new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual 1 - // TODO: This is failing since the DTG columns are mis-aligned: int64 vs int96 - val matches = CloseableIterator(fs.getFeatures(new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " + - "AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'"))).features()).toList + val feats = runQuery(fs, new Query(format)) + feats must haveSize(3) - matches.foreach{println} - matches.size mustEqual(2) + val matches = runQuery(fs, new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " + + "AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'"))) + matches must haveSize(2) - ok foreach(formats) { format => queryWithGeoTools(format, directory2) } @@ -150,51 +149,42 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging { } } "Functions which are failing for Spark output being re-read" >> { - println(s"Trying Count with Dates for $directory2") getCountDateFilterGeoTools("parquet", directory2) } "Functions which are failing for Spark output being re-read 2" >> { - println(s"Trying case_number filter with $directory2") case_numberFilterWithGeoTools("parquet", directory2) } } private def testGeoToolsFilters(format: String, location: String) = { - // TODO counts don't work b/c they read the metadata, which we haven't populated -// println(s"Trying count with FILTER.INCLUDE for $location") -// getCountIncludeGeoTools(format, location) - println(s"Trying case_number filter with $location") + getCountIncludeGeoTools(format, location) case_numberFilterWithGeoTools(format, location) - println(s"Trying spatial temporal filter with $location") queryWithGeoTools(format, location) -// println(s"Trying Count with Dates for $location") -// getCountDateFilterGeoTools(format, location) + getCountDateFilterGeoTools(format, location) } private def getCountIncludeGeoTools(format: String, location: String) = { val fs: SimpleFeatureSource = getFeatureSource(format, location) - fs.getCount(new Query(format)) mustEqual (3) + getCount(fs, new Query(format)) mustEqual 3 } private def getCountDateFilterGeoTools(format: String, location: String) = { val fs: SimpleFeatureSource = getFeatureSource(format, location) - fs.getCount(new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " + - "AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-02T12:00:00Z'"))) mustEqual(2) + getCount(fs, new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " + + "AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-02T12:00:00Z'"))) mustEqual 1 } private def case_numberFilterWithGeoTools(format: String, location: String) = { val fs: SimpleFeatureSource = getFeatureSource(format, location) - val feats = CloseableIterator(fs.getFeatures(new Query(format, ECQL.toFilter("case_number = 1"))).features()).toList - println(s"feat.size : ${feats.size}") + val feats = runQuery(fs, new Query(format, ECQL.toFilter("case_number = 1"))) feats must haveSize(1) - fs.getCount(new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual(1) + getCount(fs, new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual 1 } private def queryWithGeoTools(format: String, location: String) = { val fs: SimpleFeatureSource = getFeatureSource(format, location) - - CloseableIterator(fs.getFeatures(new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " + - "AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'"))).features()).toList must haveSize(2) + runQuery(fs, new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " + + "AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'"))) must haveSize(2) } private def getFeatureSource(format: String, location: String) = { @@ -226,7 +216,7 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging { val stQuery = sc.sql(s"select * from $format where st_intersects(geom, st_makeBbox(-80,35,-75,45)) AND " + "dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-02T12:00:00Z'").collect() stQuery must haveLength(1) - stQuery.head.get(0) mustEqual "2" + stQuery.head.get(stQuery.head.fieldIndex("__fid__")) mustEqual "2" // "select by secondary indexed attribute, using dataframe API" val cases = df.select("case_number").where("case_number = 1").collect().map(_.getInt(0)) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/parquet/ParquetFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/parquet/ParquetFileSystemStorage.scala index 33e1cc9ee55e..e1e6fc11b96c 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/parquet/ParquetFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/parquet/ParquetFileSystemStorage.scala @@ -48,7 +48,7 @@ class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMeta val parquetFilter = fc.map(FilterCompat.get).getOrElse(FilterCompat.NOOP) val gtFilter = residualFilter.map(FastFilterFactory.optimize(readSft, _)) - logger.debug(s"Parquet filter: $parquetFilter and modified gt filter: ${gtFilter.getOrElse(Filter.INCLUDE)}") + logger.debug(s"Parquet filter: ${fc.getOrElse("none")} and modified gt filter: ${gtFilter.getOrElse(Filter.INCLUDE)}") // WARNING it is important to create a new conf per query // because we communicate the transform SFT set here