diff --git a/geomesa-kafka/geomesa-kafka-datastore/pom.xml b/geomesa-kafka/geomesa-kafka-datastore/pom.xml
index 7b18c3b415cb..e951c7e2c692 100644
--- a/geomesa-kafka/geomesa-kafka-datastore/pom.xml
+++ b/geomesa-kafka/geomesa-kafka-datastore/pom.xml
@@ -105,6 +105,11 @@
org.specs2
specs2-mock_${scala.binary.version}
+
+ org.locationtech.geomesa
+ geomesa-index-api_${scala.binary.version}
+ tests
+
org.apache.kafka
kafka_${scala.binary.version}
diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala
index 110691bede89..fc75c6cd4098 100644
--- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala
+++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala
@@ -315,6 +315,7 @@ class KafkaDataStore(
CloseWithLogging(defaultProducer)
CloseWithLogging(partitionedProducer)
CloseWithLogging(caches.asMap.asScala.values)
+ CloseWithLogging(config.metrics)
caches.invalidateAll()
super.dispose()
}
diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala b/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala
index e17604973582..1dace101a084 100644
--- a/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala
+++ b/geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala
@@ -8,6 +8,7 @@
package org.locationtech.geomesa.kafka.data
+import com.codahale.metrics.{MetricRegistry, ScheduledReporter}
import kafka.admin.ConfigCommand.{ConfigEntity, Entity}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.curator.framework.CuratorFrameworkFactory
@@ -20,21 +21,24 @@ import org.geotools.filter.text.ecql.ECQL
import org.geotools.geometry.jts.JTSFactoryFinder
import org.geotools.util.factory.Hints
import org.junit.runner.RunWith
-import org.locationtech.geomesa.features.ScalaSimpleFeature
+import org.locationtech.geomesa.features.{ScalaSimpleFeature, SerializationType}
+import org.locationtech.geomesa.index.InMemoryMetadata
import org.locationtech.geomesa.index.conf.QueryHints
import org.locationtech.geomesa.index.metadata.TableBasedMetadata
import org.locationtech.geomesa.kafka.ExpirationMocking.{MockTicker, ScheduledExpiry, WrappedRunnable}
import org.locationtech.geomesa.kafka.KafkaContainerTest
import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult
import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.BatchResult
+import org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.GeoMessageSerializerFactory
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.{KafkaFeatureChanged, KafkaFeatureCleared, KafkaFeatureRemoved}
import org.locationtech.geomesa.kafka.utils.{GeoMessage, GeoMessageProcessor}
+import org.locationtech.geomesa.metrics.core.GeoMesaMetrics
import org.locationtech.geomesa.security.{AuthorizationsProvider, SecurityUtils}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.Configs
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.index.SizeSeparatedBucketIndex
-import org.locationtech.geomesa.utils.io.WithClose
+import org.locationtech.geomesa.utils.io.{CloseQuietly, WithClose}
import org.locationtech.jts.geom.Point
import org.mockito.ArgumentMatchers
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
@@ -123,6 +127,20 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
}
}
+ "clean up metrics" >> {
+ val reporter = mock[ScheduledReporter]
+ val metrics = new GeoMesaMetrics(new MetricRegistry(), "", Seq(reporter))
+ val config = {
+ val orig = KafkaDataStoreFactory.buildConfig(baseParams.asJava)
+ CloseQuietly(orig.metrics)
+ orig.copy(metrics = Some(metrics))
+ }
+ val serializer = new GeoMessageSerializerFactory(SerializationType.KRYO)
+ new KafkaDataStore(config, new InMemoryMetadata[String](), serializer).dispose()
+
+ there was one(reporter).close()
+ }
+
"use namespaces" >> {
import org.locationtech.geomesa.kafka.data.KafkaDataStoreParams._
val path = s"geomesa/namespace/test/${paths.getAndIncrement()}"