diff --git a/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/MicrometerSetup.scala b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/MicrometerSetup.scala index 3b31150b9143..c5c2d754baf3 100644 --- a/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/MicrometerSetup.scala +++ b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/MicrometerSetup.scala @@ -9,27 +9,19 @@ package org.locationtech.geomesa.metrics.micrometer import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} -import io.micrometer.cloudwatch2.CloudWatchMeterRegistry import io.micrometer.core.instrument.binder.jvm.{ClassLoaderMetrics, JvmGcMetrics, JvmMemoryMetrics, JvmThreadMetrics} import io.micrometer.core.instrument.binder.system.ProcessorMetrics -import io.micrometer.core.instrument.{Clock, MeterRegistry, Metrics, Tag} -import io.micrometer.prometheusmetrics.{PrometheusMeterRegistry, PrometheusRenameFilter} -import io.prometheus.metrics.exporter.httpserver.HTTPServer -import io.prometheus.metrics.exporter.pushgateway.{Format, PushGateway, Scheme} -import org.locationtech.geomesa.utils.io.CloseWithLogging +import io.micrometer.core.instrument.{MeterRegistry, Metrics} +import org.locationtech.geomesa.metrics.micrometer.cloudwatch.CloudwatchFactory +import org.locationtech.geomesa.metrics.micrometer.prometheus.PrometheusFactory import pureconfig.{ConfigReader, ConfigSource} -import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient -import java.io.Closeable import java.util.Locale -import java.util.concurrent.atomic.AtomicReference object MicrometerSetup { import pureconfig.generic.semiauto._ - import scala.collection.JavaConverters._ - private val registries = scala.collection.mutable.Map.empty[String, String] private val bindings = scala.collection.mutable.Set.empty[String] @@ -95,58 +87,12 @@ object MicrometerSetup { implicit val reader: ConfigReader[RegistryConfig] = deriveReader[RegistryConfig] val config = ConfigSource.fromConfig(conf).loadOrThrow[RegistryConfig] config.`type`.toLowerCase(Locale.US) match { - case "prometheus" => createPrometheusRegistry(conf) - case "cloudwatch" => createCloudwatchRegistry(conf) + case "prometheus" => PrometheusFactory(conf) + case "cloudwatch" => CloudwatchFactory(conf) case t => throw new IllegalArgumentException(s"No registry type defined for '$t' - valid values are: prometheus, cloudwatch") } } - private def createPrometheusRegistry(conf: Config): PrometheusMeterRegistry = { - // noinspection ScalaUnusedSymbol - implicit val gatewayReader: ConfigReader[PushGatewayConfig] = deriveReader[PushGatewayConfig] - implicit val prometheusReader: ConfigReader[PrometheusConfig] = deriveReader[PrometheusConfig] - val config = ConfigSource.fromConfig(conf).loadOrThrow[PrometheusConfig] - val dependentClose = new AtomicReference[Closeable]() - val registry = new PrometheusMeterRegistry(k => config.properties.getOrElse(k, null)) { - override def close(): Unit = { - CloseWithLogging(Option(dependentClose.get())) - super.close() - } - } - registry.throwExceptionOnRegistrationFailure() - if (config.rename) { - registry.config().meterFilter(new PrometheusRenameFilter()) - } - if (config.commonTags.nonEmpty) { - val tags = config.commonTags.map { case (k, v) => Tag.of(k, v) } - registry.config.commonTags(tags.asJava) - } - config.pushGateway match { - case None => - val server = - HTTPServer.builder() - .port(config.port) - .registry(registry.getPrometheusRegistry) - .buildAndStart() - dependentClose.set(server) - - case Some(pg) => - val builder = PushGateway.builder().registry(registry.getPrometheusRegistry).address(pg.host) - pg.job.foreach(builder.job) - pg.format.foreach(v => builder.format(Format.valueOf(v.toUpperCase(Locale.US)))) - pg.scheme.foreach(v => builder.scheme(Scheme.fromString(v.toLowerCase(Locale.US)))) - val pushGateway = builder.build() - dependentClose.set(() => pushGateway.pushAdd()) - } - registry - } - - private def createCloudwatchRegistry(conf: Config): CloudWatchMeterRegistry = { - implicit val reader: ConfigReader[CloudwatchConfig] = deriveReader[CloudwatchConfig] - val config = ConfigSource.fromConfig(conf).loadOrThrow[CloudwatchConfig] - new CloudWatchMeterRegistry(k => config.properties.getOrElse(k, null), Clock.SYSTEM, CloudWatchAsyncClient.create()) - } - private case class MetricsConfig( registries: Seq[Config], bindings: MetricsBindings @@ -164,25 +110,4 @@ object MicrometerSetup { `type`: String, enabled: Boolean = true, ) - - private case class PrometheusConfig( - rename: Boolean = false, - commonTags: Map[String, String] = Map.empty, - port: Int = 9090, - // additional config can also be done via sys props - see https://prometheus.github.io/client_java/config/config/ - properties: Map[String, String] = Map.empty, - pushGateway: Option[PushGatewayConfig], - ) - - private case class PushGatewayConfig( - host: String, - scheme: Option[String], - job: Option[String], - format: Option[String], - ) - - private case class CloudwatchConfig( - namespace: String = "geomesa", - properties: Map[String, String] = Map.empty - ) } diff --git a/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/RegistryFactory.scala b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/RegistryFactory.scala new file mode 100644 index 000000000000..8af600e853fb --- /dev/null +++ b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/RegistryFactory.scala @@ -0,0 +1,16 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.metrics.micrometer + +import com.typesafe.config.Config +import io.micrometer.core.instrument.MeterRegistry + +trait RegistryFactory { + def apply(conf: Config): MeterRegistry +} diff --git a/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/cloudwatch/CloudwatchFactory.scala b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/cloudwatch/CloudwatchFactory.scala new file mode 100644 index 000000000000..28c8bf613bf8 --- /dev/null +++ b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/cloudwatch/CloudwatchFactory.scala @@ -0,0 +1,30 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.metrics.micrometer +package cloudwatch +import com.typesafe.config.Config +import io.micrometer.cloudwatch2.CloudWatchMeterRegistry +import io.micrometer.core.instrument.{Clock, MeterRegistry} +import pureconfig.generic.semiauto.deriveReader +import pureconfig.{ConfigReader, ConfigSource} +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient + +object CloudwatchFactory extends RegistryFactory { + + override def apply(conf: Config): MeterRegistry = { + implicit val reader: ConfigReader[CloudwatchConfig] = deriveReader[CloudwatchConfig] + val config = ConfigSource.fromConfig(conf).loadOrThrow[CloudwatchConfig] + new CloudWatchMeterRegistry(k => config.properties.getOrElse(k, null), Clock.SYSTEM, CloudWatchAsyncClient.create()) + } + + private case class CloudwatchConfig( + namespace: String = "geomesa", + properties: Map[String, String] = Map.empty + ) +} diff --git a/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/prometheus/PrometheusFactory.scala b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/prometheus/PrometheusFactory.scala new file mode 100644 index 000000000000..0f16cb38e6b9 --- /dev/null +++ b/geomesa-metrics/geomesa-metrics-micrometer/src/main/scala/org/locationtech/geomesa/metrics/micrometer/prometheus/PrometheusFactory.scala @@ -0,0 +1,83 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.metrics.micrometer +package prometheus +import com.typesafe.config.Config +import io.micrometer.core.instrument.{MeterRegistry, Tag} +import io.micrometer.prometheusmetrics.{PrometheusMeterRegistry, PrometheusRenameFilter} +import io.prometheus.metrics.exporter.httpserver.HTTPServer +import io.prometheus.metrics.exporter.pushgateway.{Format, PushGateway, Scheme} +import org.locationtech.geomesa.utils.io.CloseWithLogging +import pureconfig.{ConfigReader, ConfigSource} +import pureconfig.generic.semiauto.deriveReader + +import java.io.Closeable +import java.util.Locale +import java.util.concurrent.atomic.AtomicReference + +object PrometheusFactory extends RegistryFactory { + + import scala.collection.JavaConverters._ + + override def apply(conf: Config): MeterRegistry = { + // noinspection ScalaUnusedSymbol + implicit val gatewayReader: ConfigReader[PushGatewayConfig] = deriveReader[PushGatewayConfig] + implicit val prometheusReader: ConfigReader[PrometheusConfig] = deriveReader[PrometheusConfig] + val config = ConfigSource.fromConfig(conf).loadOrThrow[PrometheusConfig] + val dependentClose = new AtomicReference[Closeable]() + val registry = new PrometheusMeterRegistry(k => config.properties.getOrElse(k, null)) { + override def close(): Unit = { + CloseWithLogging(Option(dependentClose.get())) + super.close() + } + } + registry.throwExceptionOnRegistrationFailure() + if (config.rename) { + registry.config().meterFilter(new PrometheusRenameFilter()) + } + if (config.commonTags.nonEmpty) { + val tags = config.commonTags.map { case (k, v) => Tag.of(k, v) } + registry.config.commonTags(tags.asJava) + } + config.pushGateway match { + case None => + val server = + HTTPServer.builder() + .port(config.port) + .registry(registry.getPrometheusRegistry) + .buildAndStart() + dependentClose.set(server) + + case Some(pg) => + val builder = PushGateway.builder().registry(registry.getPrometheusRegistry).address(pg.host) + pg.job.foreach(builder.job) + pg.format.foreach(v => builder.format(Format.valueOf(v.toUpperCase(Locale.US)))) + pg.scheme.foreach(v => builder.scheme(Scheme.fromString(v.toLowerCase(Locale.US)))) + val pushGateway = builder.build() + dependentClose.set(() => pushGateway.pushAdd()) + } + registry + } + + private case class PrometheusConfig( + rename: Boolean = false, + commonTags: Map[String, String] = Map.empty, + port: Int = 9090, + // additional config can also be done via sys props - see https://prometheus.github.io/client_java/config/config/ + properties: Map[String, String] = Map.empty, + pushGateway: Option[PushGatewayConfig], + ) + + private case class PushGatewayConfig( + host: String, + scheme: Option[String], + job: Option[String], + format: Option[String], + ) +}