Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): namespace cardinality publisher #1294

Merged
merged 18 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions coordinator/src/main/scala/filodb.coordinator/ShardManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
private val _coordinators = new mutable.LinkedHashMap[Address, ActorRef]
private val _errorShardReassignedAt = new mutable.HashMap[DatasetRef, mutable.HashMap[Int, Long]]

private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = TenantIngestionMetering(
settings,
() => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator },
() => { _coordinators.head._2 })
inst.schedulePeriodicPublishJob()
Some(inst)
} else None

val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval")

/* These workloads were in an actor and exist now in an unprotected class.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package filodb.coordinator

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

import akka.actor.ActorRef
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.tag.TagSet
import monix.execution.Scheduler.Implicits.{global => scheduler}

import filodb.coordinator.client.Client
import filodb.coordinator.client.QueryCommands.LogicalPlan2Query
import filodb.core.DatasetRef
import filodb.query.{QueryError, QueryResult, TsCardinalities}

/**
* Periodically queries a node for all namespace cardinalities.
* Kamon gauges are updated with the response data.
*
* The intent is to publish a low-cardinality metric such that namespace
* cardinality queries can be efficiently answered.
*
* @param dsIterProducer produces an iterator to step through all datasets.
* @param coordActorProducer produces a single actor to ask a query. Actors are
* queried in the order they're returned from this function.
*/
case class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{

private val ASK_TIMEOUT = FiniteDuration(
settings.config.getDuration("metering-query-interval").toSeconds,
TimeUnit.SECONDS)
private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled
private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first

private val CLUSTER_TYPE = settings.config.getString("cluster-type")

private val METRIC_ACTIVE = "active_timeseries_by_tenant"
private val METRIC_TOTAL = "total_timeseries_by_tenant"

def schedulePeriodicPublishJob() : Unit = {
// NOTE: the FiniteDuration overload of scheduleWithFixedDelay
// does not work. Unsure why, but that's why these FiniteDurations are
// awkwardly parsed into seconds.
scheduler.scheduleWithFixedDelay(
SCHED_INIT_DELAY.toSeconds,
SCHED_DELAY.toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedulePublish())
}

/**
* For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan.
* Schedules a job to publish the Coordinator's response.
*/
private def queryAndSchedulePublish() : Unit = {
import filodb.query.exec.TsCardExec._
val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns)
val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name)
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)),
ASK_TIMEOUT)
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _)) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map("metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> CLUSTER_TYPE)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage)
case Failure(t) => logger.warn("Failure: " + t.getMessage)
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)
}
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ filodb {
spread-assignment = []

shard-key-level-ingestion-metrics-enabled = true
# Time between each TenantIngestionMetering query/publish job
metering-query-interval = 15 minutes
# info config used for metric breakdown
cluster-type = "raw" // possible values: downsample, raw, recRule, aggregates etc.
# Name of the deployment partition that this FiloDB cluster belongs to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package filodb.core.memstore.ratelimit

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import filodb.core.memstore.ratelimit.CardinalityStore._
import filodb.core.MetricsTestData

class RocksDbCardinalityStoreSpec extends AnyFunSpec with Matchers {
it ("should correctly return overflow record") {
val dset = MetricsTestData.timeseriesDatasetMultipleShardKeys
val shard = 0
val numOverflow = 123

val db = new RocksDbCardinalityStore(dset.ref, shard)

(0 until MAX_RESULT_SIZE + numOverflow).foreach{ i =>
val prefix = Seq("ws", "ns", s"metric-$i")
db.store(CardinalityRecord(shard, prefix, 1, 1, 1, 1))
}

Seq(Nil, Seq("ws"), Seq("ws", "ns")).foreach{ prefix =>
val res = db.scanChildren(prefix, 3)
res.size shouldEqual MAX_RESULT_SIZE + 1 // one extra for the overflow CardinalityRecord
res.contains(CardinalityRecord(shard, OVERFLOW_PREFIX,
numOverflow, numOverflow, numOverflow, numOverflow)) shouldEqual true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,14 @@ final case object TsCardExec {
// row name assigned to overflow counts
val OVERFLOW_GROUP = prefixToGroup(CardinalityStore.OVERFLOW_PREFIX)

val PREFIX_DELIM = ","

/**
* Convert a shard key prefix to a row's group name.
*/
def prefixToGroup(prefix: Seq[String]): ZeroCopyUTF8String = {
// just concat the prefix together with a single char delimiter
prefix.mkString(",").utf8
prefix.mkString(PREFIX_DELIM).utf8
}

case class CardCounts(active: Int, total: Int) {
Expand Down