Skip to content

Commit

Permalink
GEOMESA-3312 Kafka - close metrics on DataStore.dipose()
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Nov 16, 2023
1 parent 71537ca commit 0c83d50
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
5 changes: 5 additions & 0 deletions geomesa-kafka/geomesa-kafka-datastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
<groupId>org.specs2</groupId>
<artifactId>specs2-mock_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ class KafkaDataStore(
CloseWithLogging(defaultProducer)
CloseWithLogging(partitionedProducer)
CloseWithLogging(caches.asMap.asScala.values)
CloseWithLogging(config.metrics)
caches.invalidateAll()
super.dispose()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.locationtech.geomesa.kafka.data

import com.codahale.metrics.{MetricRegistry, ScheduledReporter}
import kafka.admin.ConfigCommand.{ConfigEntity, Entity}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.curator.framework.CuratorFrameworkFactory
Expand All @@ -20,21 +21,24 @@ import org.geotools.filter.text.ecql.ECQL
import org.geotools.geometry.jts.JTSFactoryFinder
import org.geotools.util.factory.Hints
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.features.{ScalaSimpleFeature, SerializationType}
import org.locationtech.geomesa.index.InMemoryMetadata
import org.locationtech.geomesa.index.conf.QueryHints
import org.locationtech.geomesa.index.metadata.TableBasedMetadata
import org.locationtech.geomesa.kafka.ExpirationMocking.{MockTicker, ScheduledExpiry, WrappedRunnable}
import org.locationtech.geomesa.kafka.KafkaContainerTest
import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult
import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.BatchResult
import org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.GeoMessageSerializerFactory
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.{KafkaFeatureChanged, KafkaFeatureCleared, KafkaFeatureRemoved}
import org.locationtech.geomesa.kafka.utils.{GeoMessage, GeoMessageProcessor}
import org.locationtech.geomesa.metrics.core.GeoMesaMetrics
import org.locationtech.geomesa.security.{AuthorizationsProvider, SecurityUtils}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.Configs
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.index.SizeSeparatedBucketIndex
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.io.{CloseQuietly, WithClose}
import org.locationtech.jts.geom.Point
import org.mockito.ArgumentMatchers
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
Expand Down Expand Up @@ -123,6 +127,20 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
}
}

"clean up metrics" >> {
val reporter = mock[ScheduledReporter]
val metrics = new GeoMesaMetrics(new MetricRegistry(), "", Seq(reporter))
val config = {
val orig = KafkaDataStoreFactory.buildConfig(baseParams.asJava)
CloseQuietly(orig.metrics)
orig.copy(metrics = Some(metrics))
}
val serializer = new GeoMessageSerializerFactory(SerializationType.KRYO)
new KafkaDataStore(config, new InMemoryMetadata[String](), serializer).dispose()

there was one(reporter).close()
}

"use namespaces" >> {
import org.locationtech.geomesa.kafka.data.KafkaDataStoreParams._
val path = s"geomesa/namespace/test/${paths.getAndIncrement()}"
Expand Down

0 comments on commit 0c83d50

Please sign in to comment.