diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 8d9dc29422..119ceea7d8 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -37,6 +37,16 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, private val _coordinators = new mutable.LinkedHashMap[Address, ActorRef] private val _errorShardReassignedAt = new mutable.HashMap[DatasetRef, mutable.HashMap[Int, Long]] + private val _tenantIngestionMeteringOpt = + if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { + val inst = TenantIngestionMetering( + settings, + () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, + () => { _coordinators.head._2 }) + inst.schedulePeriodicPublishJob() + Some(inst) + } else None + val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval") /* These workloads were in an actor and exist now in an unprotected class. diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala new file mode 100644 index 0000000000..a78181fc19 --- /dev/null +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -0,0 +1,89 @@ +package filodb.coordinator + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.FiniteDuration +import scala.util.{Failure, Success} + +import akka.actor.ActorRef +import com.typesafe.scalalogging.StrictLogging +import kamon.Kamon +import kamon.tag.TagSet +import monix.execution.Scheduler.Implicits.{global => scheduler} + +import filodb.coordinator.client.Client +import filodb.coordinator.client.QueryCommands.LogicalPlan2Query +import filodb.core.DatasetRef +import filodb.query.{QueryError, QueryResult, TsCardinalities} + +/** + * Periodically queries a node for all namespace cardinalities. + * Kamon gauges are updated with the response data. + * + * The intent is to publish a low-cardinality metric such that namespace + * cardinality queries can be efficiently answered. + * + * @param dsIterProducer produces an iterator to step through all datasets. + * @param coordActorProducer produces a single actor to ask a query. Actors are + * queried in the order they're returned from this function. + */ +case class TenantIngestionMetering(settings: FilodbSettings, + dsIterProducer: () => Iterator[DatasetRef], + coordActorProducer: () => ActorRef) extends StrictLogging{ + + private val ASK_TIMEOUT = FiniteDuration( + settings.config.getDuration("metering-query-interval").toSeconds, + TimeUnit.SECONDS) + private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled + private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first + + private val CLUSTER_TYPE = settings.config.getString("cluster-type") + + private val METRIC_ACTIVE = "active_timeseries_by_tenant" + private val METRIC_TOTAL = "total_timeseries_by_tenant" + + def schedulePeriodicPublishJob() : Unit = { + // NOTE: the FiniteDuration overload of scheduleWithFixedDelay + // does not work. Unsure why, but that's why these FiniteDurations are + // awkwardly parsed into seconds. + scheduler.scheduleWithFixedDelay( + SCHED_INIT_DELAY.toSeconds, + SCHED_DELAY.toSeconds, + TimeUnit.SECONDS, + () => queryAndSchedulePublish()) + } + + /** + * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. + * Schedules a job to publish the Coordinator's response. + */ + private def queryAndSchedulePublish() : Unit = { + import filodb.query.exec.TsCardExec._ + val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns) + val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) + dsIterProducer().foreach { dsRef => + val fut = Client.asyncAsk( + coordActorProducer(), + LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)), + ASK_TIMEOUT) + fut.onComplete { + case Success(QueryResult(_, _, rv, _, _, _)) => + rv.foreach(_.rows().foreach{ rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map("metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> CLUSTER_TYPE) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage) + case Failure(t) => logger.warn("Failure: " + t.getMessage) + // required to compile + case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + } + } + } +} diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 2b91fdcf60..7f8fe39a47 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -161,6 +161,8 @@ filodb { spread-assignment = [] shard-key-level-ingestion-metrics-enabled = true + # Time between each TenantIngestionMetering query/publish job + metering-query-interval = 15 minutes # info config used for metric breakdown cluster-type = "raw" // possible values: downsample, raw, recRule, aggregates etc. # Name of the deployment partition that this FiloDB cluster belongs to diff --git a/core/src/test/scala/filodb.core/memstore/ratelimit/RocksDbCardinalityStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/ratelimit/RocksDbCardinalityStoreSpec.scala new file mode 100644 index 0000000000..37e7c84e11 --- /dev/null +++ b/core/src/test/scala/filodb.core/memstore/ratelimit/RocksDbCardinalityStoreSpec.scala @@ -0,0 +1,29 @@ +package filodb.core.memstore.ratelimit + +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +import filodb.core.memstore.ratelimit.CardinalityStore._ +import filodb.core.MetricsTestData + +class RocksDbCardinalityStoreSpec extends AnyFunSpec with Matchers { + it ("should correctly return overflow record") { + val dset = MetricsTestData.timeseriesDatasetMultipleShardKeys + val shard = 0 + val numOverflow = 123 + + val db = new RocksDbCardinalityStore(dset.ref, shard) + + (0 until MAX_RESULT_SIZE + numOverflow).foreach{ i => + val prefix = Seq("ws", "ns", s"metric-$i") + db.store(CardinalityRecord(shard, prefix, 1, 1, 1, 1)) + } + + Seq(Nil, Seq("ws"), Seq("ws", "ns")).foreach{ prefix => + val res = db.scanChildren(prefix, 3) + res.size shouldEqual MAX_RESULT_SIZE + 1 // one extra for the overflow CardinalityRecord + res.contains(CardinalityRecord(shard, OVERFLOW_PREFIX, + numOverflow, numOverflow, numOverflow, numOverflow)) shouldEqual true + } + } +} diff --git a/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala b/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala index b77b965611..99c0d0680f 100644 --- a/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala +++ b/query/src/main/scala/filodb/query/exec/MetadataExecPlan.scala @@ -361,12 +361,14 @@ final case object TsCardExec { // row name assigned to overflow counts val OVERFLOW_GROUP = prefixToGroup(CardinalityStore.OVERFLOW_PREFIX) + val PREFIX_DELIM = "," + /** * Convert a shard key prefix to a row's group name. */ def prefixToGroup(prefix: Seq[String]): ZeroCopyUTF8String = { // just concat the prefix together with a single char delimiter - prefix.mkString(",").utf8 + prefix.mkString(PREFIX_DELIM).utf8 } case class CardCounts(active: Int, total: Int) {