Skip to content

Commit

Permalink
uncomment and fix alignment test, fix logger debug on parquet push do…
Browse files Browse the repository at this point in the history
…wn filter

Signed-off-by: Emilio Lahr-Vivaz <[email protected]>
  • Loading branch information
elahrvivaz committed Dec 11, 2020
1 parent c9c35bf commit 56fa1eb
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import java.nio.file.{Files, Path}

import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.io.FileUtils
import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster}
import org.apache.spark.sql.{SQLContext, SQLTypes, SparkSession}
import org.geotools.data.simple.SimpleFeatureSource
import org.geotools.data.{DataStore, DataStoreFinder, Query, Transaction}
Expand All @@ -24,6 +23,7 @@ import org.locationtech.geomesa.spark.SparkSQLTestUtils
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.io.WithClose
import org.opengis.feature.simple.SimpleFeature
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

Expand All @@ -33,6 +33,9 @@ import scala.collection.JavaConverters._
class FileSystemDSAlignmentTest extends Specification with LazyLogging {
sequential

// TODO count is based on metadata which we don't currently calculate
val countWithMetadata = false // use metadata for count checks or run queries and counr results

val tempDir: Path = Files.createTempDirectory("fsAlignmentTest")

lazy val directory1: String = tempDir + "/data/first"
Expand All @@ -56,11 +59,16 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging {
// val formats = Seq("orc", "parquet")
val formats = Seq("parquet")

def runQuery(fs: SimpleFeatureSource, q: Query): List[SimpleFeature] =
CloseableIterator(fs.getFeatures(q).features()).toList

def getCount(fs: SimpleFeatureSource, q: Query): Int =
if (countWithMetadata) { fs.getCount(q) } else { runQuery(fs, q).size }

"FileSystem DataStore" should {
"Write data to directory1 using the GT FSDS" >> {
println(s" ***: Temporary directory is ${tempDir}. Directory is $directory1.")
println(s" ***: Temporary directory is $tempDir. Directory is $directory1.")

// TODO: This writing approach writes out the column data as "case_5fnumber"
formats.foreach { format =>
val sft = SimpleFeatureTypes.createType(format,
"arrest:String,case_number:Int:index=full:cardinality=high,dtg:Date,*geom:Point:srid=4326")
Expand All @@ -81,19 +89,17 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging {
ok
}

// Commented out since this works!
// "Query directory2 with GeoMesa's Spark integration" >> {
// foreach(formats) { format =>
// queryWithSpark(format, directory1)
// }
// }
"Query directory2 with GeoMesa's Spark integration" >> {
foreach(formats) { format =>
queryWithSpark(format, directory1)
}
}

// Commented out since this works!
// "Query directory1 with the GM FSDS DS" >> {
// foreach(formats) { format =>
// queryWithGeoTools(format, directory1)
// }
// }
"Query directory1 with the GM FSDS DS" >> {
foreach(formats) { format =>
queryWithGeoTools(format, directory1)
}
}

"write data to directory2 using Spark" >> {
foreach(formats) { format =>
Expand Down Expand Up @@ -121,24 +127,17 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging {
val format = "parquet"
val fs: SimpleFeatureSource = getFeatureSource(format, location)
val q = new Query(format)
// TODO count is based on metadata which we don't currently calculate
// val count = fs.getCount(q)
// count mustEqual(3)
//fs.getCount(new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual(1)

val feats = CloseableIterator(fs.getFeatures(new Query(format)).features()).toList
feats.foreach{println}
feats.size mustEqual(3)

getCount(fs, q) mustEqual 3
getCount(fs, new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual 1

// TODO: This is failing since the DTG columns are mis-aligned: int64 vs int96
val matches = CloseableIterator(fs.getFeatures(new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " +
"AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'"))).features()).toList
val feats = runQuery(fs, new Query(format))
feats must haveSize(3)

matches.foreach{println}
matches.size mustEqual(2)
val matches = runQuery(fs, new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " +
"AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'")))
matches must haveSize(2)

ok
foreach(formats) { format =>
queryWithGeoTools(format, directory2)
}
Expand All @@ -150,51 +149,42 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging {
}
}
"Functions which are failing for Spark output being re-read" >> {
println(s"Trying Count with Dates for $directory2")
getCountDateFilterGeoTools("parquet", directory2)
}
"Functions which are failing for Spark output being re-read 2" >> {
println(s"Trying case_number filter with $directory2")
case_numberFilterWithGeoTools("parquet", directory2)
}
}

private def testGeoToolsFilters(format: String, location: String) = {
// TODO counts don't work b/c they read the metadata, which we haven't populated
// println(s"Trying count with FILTER.INCLUDE for $location")
// getCountIncludeGeoTools(format, location)
println(s"Trying case_number filter with $location")
getCountIncludeGeoTools(format, location)
case_numberFilterWithGeoTools(format, location)
println(s"Trying spatial temporal filter with $location")
queryWithGeoTools(format, location)
// println(s"Trying Count with Dates for $location")
// getCountDateFilterGeoTools(format, location)
getCountDateFilterGeoTools(format, location)
}

private def getCountIncludeGeoTools(format: String, location: String) = {
val fs: SimpleFeatureSource = getFeatureSource(format, location)
fs.getCount(new Query(format)) mustEqual (3)
getCount(fs, new Query(format)) mustEqual 3
}

private def getCountDateFilterGeoTools(format: String, location: String) = {
val fs: SimpleFeatureSource = getFeatureSource(format, location)
fs.getCount(new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " +
"AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-02T12:00:00Z'"))) mustEqual(2)
getCount(fs, new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " +
"AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-02T12:00:00Z'"))) mustEqual 1
}

private def case_numberFilterWithGeoTools(format: String, location: String) = {
val fs: SimpleFeatureSource = getFeatureSource(format, location)
val feats = CloseableIterator(fs.getFeatures(new Query(format, ECQL.toFilter("case_number = 1"))).features()).toList
println(s"feat.size : ${feats.size}")
val feats = runQuery(fs, new Query(format, ECQL.toFilter("case_number = 1")))
feats must haveSize(1)
fs.getCount(new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual(1)
getCount(fs, new Query(format, ECQL.toFilter("case_number = 1"))) mustEqual 1
}

private def queryWithGeoTools(format: String, location: String) = {
val fs: SimpleFeatureSource = getFeatureSource(format, location)

CloseableIterator(fs.getFeatures(new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " +
"AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'"))).features()).toList must haveSize(2)
runQuery(fs, new Query(format, ECQL.toFilter("bbox(geom,-80,35,-75,45) " +
"AND dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-03T12:00:00Z'"))) must haveSize(2)
}

private def getFeatureSource(format: String, location: String) = {
Expand Down Expand Up @@ -226,7 +216,7 @@ class FileSystemDSAlignmentTest extends Specification with LazyLogging {
val stQuery = sc.sql(s"select * from $format where st_intersects(geom, st_makeBbox(-80,35,-75,45)) AND " +
"dtg > '2016-01-01T12:00:00Z' AND dtg < '2016-01-02T12:00:00Z'").collect()
stQuery must haveLength(1)
stQuery.head.get(0) mustEqual "2"
stQuery.head.get(stQuery.head.fieldIndex("__fid__")) mustEqual "2"

// "select by secondary indexed attribute, using dataframe API"
val cases = df.select("case_number").where("case_number = 1").collect().map(_.getInt(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMeta
val parquetFilter = fc.map(FilterCompat.get).getOrElse(FilterCompat.NOOP)
val gtFilter = residualFilter.map(FastFilterFactory.optimize(readSft, _))

logger.debug(s"Parquet filter: $parquetFilter and modified gt filter: ${gtFilter.getOrElse(Filter.INCLUDE)}")
logger.debug(s"Parquet filter: ${fc.getOrElse("none")} and modified gt filter: ${gtFilter.getOrElse(Filter.INCLUDE)}")

// WARNING it is important to create a new conf per query
// because we communicate the transform SFT set here
Expand Down

0 comments on commit 56fa1eb

Please sign in to comment.