Skip to content

Commit

Permalink
GEOMESA-3385 Kafka - Check for features with null geometries (#3146)
Browse files Browse the repository at this point in the history
  • Loading branch information
malinsinbigler authored Aug 8, 2024
1 parent bcb1cd5 commit 50c82af
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class KafkaFeatureCacheImpl(sft: SimpleFeatureType, config: IndexConfig, layerVi
* due to kafka consumer partitioning
*/
override def put(feature: SimpleFeature): Unit = {
if (feature.getDefaultGeometry == null) {
logger.warn(s"Null geometry detected for feature ${feature.getID}. Skipping loading into cache.")
return
}
val featureState = factory.createState(feature)
logger.trace(s"${featureState.id} adding feature $featureState")
val old = state.put(featureState.id, featureState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class KafkaFeatureCacheTest extends Specification with Mockito {

val track3v0 = track("track3", "POINT (0 60)")

val track4v0 = track("track4", "POINT (null null)")

def track(id: String, track: String): SimpleFeature = ScalaSimpleFeature.create(sft, id, id, track)

def caches(expiry: Option[(Duration, ScheduledExecutorService, Ticker)] = None) = {
Expand Down Expand Up @@ -188,6 +190,22 @@ class KafkaFeatureCacheTest extends Specification with Mockito {
}
}
}

"query with null geometries inserted into cache" >> {
foreach(caches()) { cache =>
try {
cache.put(track0v0)
cache.put(track1v0)
cache.put(track4v0)
cache.size() mustEqual 2

val res = cache.query(wholeWorldFilter).toSeq
res must containTheSameElementsAs(Seq(track0v0, track1v0))
} finally {
cache.close()
}
}
}
}
}

0 comments on commit 50c82af

Please sign in to comment.