diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheImpl.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheImpl.scala index 5fb4117e978e..59ec4acaad3b 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheImpl.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheImpl.scala @@ -48,6 +48,10 @@ class KafkaFeatureCacheImpl(sft: SimpleFeatureType, config: IndexConfig, layerVi * due to kafka consumer partitioning */ override def put(feature: SimpleFeature): Unit = { + if (feature.getDefaultGeometry == null) { + logger.warn(s"Null geometry detected for feature ${feature.getID}. Skipping loading into cache.") + return + } val featureState = factory.createState(feature) logger.trace(s"${featureState.id} adding feature $featureState") val old = state.put(featureState.id, featureState) diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheTest.scala b/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheTest.scala index 810859e6d6ca..b2561d09b2e9 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheTest.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheTest.scala @@ -47,6 +47,8 @@ class KafkaFeatureCacheTest extends Specification with Mockito { val track3v0 = track("track3", "POINT (0 60)") + val track4v0 = track("track4", "POINT (null null)") + def track(id: String, track: String): SimpleFeature = ScalaSimpleFeature.create(sft, id, id, track) def caches(expiry: Option[(Duration, ScheduledExecutorService, Ticker)] = None) = { @@ -188,6 +190,22 @@ class KafkaFeatureCacheTest extends Specification with Mockito { } } } + + "query with null geometries inserted into cache" >> { + foreach(caches()) { cache => + try { + cache.put(track0v0) + cache.put(track1v0) + cache.put(track4v0) + cache.size() mustEqual 2 + + val res = cache.query(wholeWorldFilter).toSeq + res must containTheSameElementsAs(Seq(track0v0, track1v0)) + } finally { + cache.close() + } + } + } } }