diff --git a/coordinator/src/main/scala/filodb.coordinator/NamespaceCardinalityPublisher.scala b/coordinator/src/main/scala/filodb.coordinator/NamespaceCardinalityPublisher.scala deleted file mode 100644 index 9f9c229706..0000000000 --- a/coordinator/src/main/scala/filodb.coordinator/NamespaceCardinalityPublisher.scala +++ /dev/null @@ -1,103 +0,0 @@ -package filodb.coordinator - -import java.util.concurrent.TimeUnit - -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration -import scala.util.Success - -import akka.actor.ActorRef -import kamon.Kamon -import kamon.tag.TagSet -import monix.execution.Scheduler.{global => scheduler} - -import filodb.coordinator.client.Client -import filodb.coordinator.client.QueryCommands.LogicalPlan2Query -import filodb.core.DatasetRef -import filodb.query.{QueryResult, TsCardinalities} - -/** - * Periodically queries a node for all namespace cardinalities. - * Kamon gauges are updated with the response data. - * - * The intent is to publish a low-cardinality metric such that namespace - * cardinality queries can be efficiently answered. - */ -case class NamespaceCardinalityPublisher(dsIterProducer: () => Iterator[DatasetRef], - coordProducer: () => ActorRef) { - // All "_TU" constants quantify units of TIME_UNIT. - private val TIME_UNIT = TimeUnit.SECONDS - private val SCHED_INIT_DELAY_TU = 10 - private val SCHED_DELAY_TU = 10 - private val ASK_TIMEOUT_TU = 30 - - private val METRIC_ACTIVE = "active" - private val METRIC_TOTAL = "total" - private val NS = "ns_agg" - private val WS = "ns_agg" - - // (future, dataset) pairs - private val futQueue_ = new java.util.concurrent.ConcurrentLinkedQueue[(Future[Any], String)]() - - def schedulePeriodicPublishJob() : Unit = { - scheduler.scheduleWithFixedDelay( - SCHED_INIT_DELAY_TU, - SCHED_DELAY_TU, - TIME_UNIT, - () => queryAndSchedulePublish()) - } - - /** - * Publishes Future results until the queue is exhausted or - * the Future at the front of the queue is incomplete. - */ - private def publishFutureData() : Unit = { - import filodb.query.exec.TsCardExec.RowData - - while (true) { - // Need to acquire the queue lock for the duration of the - // occupancy/isCompleted checks and poll() call. - var futOpt : Option[(Future[Any], String)] = None - futQueue_.synchronized{ - if (!futQueue_.isEmpty && futQueue_.peek()._1.isCompleted) { - futOpt = Some(futQueue_.poll()) - } - } - - if (futOpt.isEmpty) { - // We've exhausted the queue. - return - } - - futOpt.get._1.value match { - case Some(Success(QueryResult(_, _, rv, _, _, _))) => - rv.foreach(_.rows().foreach{ rr => - // publish a cardinality metric for each namespace - val data = RowData.fromRowReader(rr) - val dataset = futOpt.get._2 - val tags = Map("_ws_" -> WS, "_ns_" -> NS, "ds" -> dataset, "prefix" -> data.group) - Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) - Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) - }) - case _ => ??? - } - } - } - - /** - * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. - * Schedules a job to publish the Coordinator's response. - */ - private def queryAndSchedulePublish() : Unit = { - val groupDepth = 1 - val prefix = Nil - dsIterProducer().foreach { dsRef => - val fut = Client.asyncAsk( - coordProducer(), - LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)), - FiniteDuration(ASK_TIMEOUT_TU, TIME_UNIT)) - futQueue_.add((fut, dsRef.dataset)) - } - scheduler.scheduleOnce(ASK_TIMEOUT_TU, TIME_UNIT, () => publishFutureData) - } -} diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 084e40817d..119ceea7d8 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -37,13 +37,15 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, private val _coordinators = new mutable.LinkedHashMap[Address, ActorRef] private val _errorShardReassignedAt = new mutable.HashMap[DatasetRef, mutable.HashMap[Int, Long]] - private val _shardMetricAggregator = NamespaceCardinalityPublisher( - () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, - () => { _coordinators.head._2 } - ) - if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { - _shardMetricAggregator.schedulePeriodicPublishJob() - } + private val _tenantIngestionMeteringOpt = + if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { + val inst = TenantIngestionMetering( + settings, + () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, + () => { _coordinators.head._2 }) + inst.schedulePeriodicPublishJob() + Some(inst) + } else None val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval") diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala new file mode 100644 index 0000000000..865e3cf125 --- /dev/null +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -0,0 +1,85 @@ +package filodb.coordinator + +import java.util.concurrent.TimeUnit + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.FiniteDuration +import scala.util.{Failure, Success} + +import akka.actor.ActorRef +import com.typesafe.scalalogging.StrictLogging +import kamon.Kamon +import kamon.tag.TagSet +import monix.execution.Scheduler.{global => scheduler} + +import filodb.coordinator.client.Client +import filodb.coordinator.client.QueryCommands.LogicalPlan2Query +import filodb.core.DatasetRef +import filodb.query.{QueryError, QueryResult, TsCardinalities} + +/** + * Periodically queries a node for all namespace cardinalities. + * Kamon gauges are updated with the response data. + * + * The intent is to publish a low-cardinality metric such that namespace + * cardinality queries can be efficiently answered. + * + * @param dsIterProducer produces an iterator to step through all datasets. + * @param coordActorProducer produces a single actor to ask a query. Actors are + * queried in the order they're returned from this function. + */ +case class TenantIngestionMetering(settings: FilodbSettings, + dsIterProducer: () => Iterator[DatasetRef], + coordActorProducer: () => ActorRef) extends StrictLogging{ + + private val ASK_TIMEOUT = FiniteDuration( + settings.config.getDuration("shard-manager.metering-query-interval").toSeconds, + TimeUnit.SECONDS) + private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled + private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first + + private val CLUSTER_TYPE = settings.config.getString("cluster-type") + + private val METRIC_ACTIVE = "active_timeseries_by_tenant" + private val METRIC_TOTAL = "total_timeseries_by_tenant" + + def schedulePeriodicPublishJob() : Unit = { + scheduler.scheduleWithFixedDelay( + SCHED_INIT_DELAY, + SCHED_DELAY)(() => queryAndSchedulePublish()) + } + + /** + * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. + * Schedules a job to publish the Coordinator's response. + */ + private def queryAndSchedulePublish() : Unit = { + import filodb.query.exec.TsCardExec._ + val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns) + val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) + dsIterProducer().foreach { dsRef => + val fut = Client.asyncAsk( + coordActorProducer(), + LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)), + ASK_TIMEOUT) + fut.onComplete { + case Success(QueryResult(_, _, rv, _, _, _)) => + rv.foreach(_.rows().foreach{ rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map("metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> CLUSTER_TYPE) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage) + case Failure(t) => logger.warn("Failure: " + t.getMessage) + // required to compile + case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + } + } + } +} diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 2b91fdcf60..8df5eb11d4 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -211,6 +211,9 @@ filodb { shard-manager { # Minimum time required between successive automatic shard reassignments done by ShardManager reassignment-min-interval = 2 hours + + # Time between each TenantIngestionMetering query/publish job + metering-query-interval = 15 minutes } cassandra {