Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): add TenantIngestionMetering query throttle #1310

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,

private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = TenantIngestionMetering(
Some(new TenantIngestionMetering(
settings,
() => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator },
() => { _coordinators.head._2 })
inst.schedulePeriodicPublishJob()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real reason to require this method to be called after initialization.

Some(inst)
() => { _coordinators.head._2 }))
} else None

val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,102 @@
package filodb.coordinator

import java.util.concurrent.TimeUnit
import java.util.concurrent.{TimeoutException, TimeUnit}

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

import akka.actor.ActorRef
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.tag.TagSet
import monix.eval.Task
import monix.execution.Scheduler.Implicits.{global => scheduler}
import monix.reactive.Observable

import filodb.coordinator.client.Client
import filodb.coordinator.client.QueryCommands.LogicalPlan2Query
import filodb.core.DatasetRef
import filodb.query.{QueryError, QueryResult, TsCardinalities}

object QueryThrottle {
// currently just add this diff to the interval if the timeout rate exceeds THRESHOLD
protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES)
// number of past query timeouts/non-timeouts to consider
protected val LOOKBACK = 10
// {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the interval
protected val THRESHOLD = 0.85
}

object TenantIngestionMetering {
protected val METRIC_ACTIVE = "active_timeseries_by_tenant"
protected val METRIC_TOTAL = "total_timeseries_by_tenant"
protected val PARALLELISM = 8 // number of datasets queried in parallel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure what this should be-- should I make it configurable? Or get the number of cores from the JRE?

}

/**
* Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts.
*
* @param queryInterval the initial delay between each query. This is the duration to be adjusted.
*/
class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging {
import QueryThrottle._

private var interval: FiniteDuration = queryInterval.copy()

// these track timeouts for the past LOOKBACK queries
private var bits = (1 << LOOKBACK) - 1
private var ibit = 0

/**
* Sets the next lookback bit and increments ibit.
*/
private def setNextBit(bit: Boolean): Unit = {
val bitVal = if (bit) 1 else 0
bits = bits & ~(1 << ibit) // zero the bit
bits = bits | (bitVal << ibit) // 'or' in the new bit
ibit = ibit + 1
if (ibit == LOOKBACK) {
ibit = 0
}
}

/**
* Updates the interval according to the timeout:non-timeout ratio.
*/
private def updateInterval(): Unit = {
val successRate = Integer.bitCount(bits).toDouble / LOOKBACK
if (successRate < THRESHOLD) {
interval = interval + INTERVAL_DIFF
logger.info("too many timeouts; query interval extended to " + interval.toString())
// reset the bits
bits = (1 << LOOKBACK) - 1
}
}

/**
* Record a query timeout.
*/
def recordTimeout(): Unit = {
setNextBit(false)
updateInterval()
}

/**
* Record a query non-timeout.
*/
def recordOnTime(): Unit = {
setNextBit(true)
updateInterval()
}

/**
* Returns the current query interval.
*/
def getInterval(): FiniteDuration = {
interval.copy()
}
}

/**
* Periodically queries a node for all namespace cardinalities.
* Kamon gauges are updated with the response data.
Expand All @@ -27,63 +108,88 @@ import filodb.query.{QueryError, QueryResult, TsCardinalities}
* @param coordActorProducer produces a single actor to ask a query. Actors are
* queried in the order they're returned from this function.
*/
case class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{
class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{
import TenantIngestionMetering._

private val ASK_TIMEOUT = FiniteDuration(
private val clusterType = settings.config.getString("cluster-type")
private var queryAskTimeSec = -1L // unix time of the most recent query ask
private val queryThrottle = new QueryThrottle(FiniteDuration(
settings.config.getDuration("metering-query-interval").toSeconds,
TimeUnit.SECONDS)
private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled
private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first

private val CLUSTER_TYPE = settings.config.getString("cluster-type")

private val METRIC_ACTIVE = "active_timeseries_by_tenant"
private val METRIC_TOTAL = "total_timeseries_by_tenant"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to companion object.


def schedulePeriodicPublishJob() : Unit = {
// NOTE: the FiniteDuration overload of scheduleWithFixedDelay
// does not work. Unsure why, but that's why these FiniteDurations are
// awkwardly parsed into seconds.
scheduler.scheduleWithFixedDelay(
SCHED_INIT_DELAY.toSeconds,
SCHED_DELAY.toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedulePublish())
}
TimeUnit.SECONDS))

// immediately begin periodically querying for / publishing cardinality data
queryAndSchedule()

// scalastyle:off method.length
/**
* For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan.
* Schedules a job to publish the Coordinator's response.
* For each dataset, asks a Coordinator with a TsCardinalities LogicalPlan.
* A publish job is sob is scheduled for each response, and the next batch of
* queries is scheduled after all responses are processed/published.
*/
private def queryAndSchedulePublish() : Unit = {
private def queryAndSchedule() : Unit = {
import filodb.query.exec.TsCardExec._
val numGroupByFields = 2 // group cardinalities at the second level (i.e. ws & ns)
val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name)
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)),
ASK_TIMEOUT)
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _)) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map("metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> CLUSTER_TYPE)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage)
case Failure(t) => logger.warn("Failure: " + t.getMessage)
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)

// Nil prefix in order to query all client-owned workspaces;
// numGroupByFields = 2 to group by (ws, ns)
val tsCardQuery = TsCardinalities(Nil, 2)

// use this later to find total elapsed time
queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond

Observable.fromIterator(dsIterProducer()).mapAsync(PARALLELISM){ dsRef =>
// Asynchronously query each dataset; store (dsRef, queryResult) pairs
Task{
val qres = Client.actorAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, tsCardQuery),
queryThrottle.getInterval()){
case t: Try[Any] => t
}
(dsRef, qres)
}
}.foreach { case (dsRef, qres) => qres match {
// process the query results one-at-a-time (prevents the need for locks in QueryThrottle)
case Success(qresp) =>
queryThrottle.recordOnTime()
qresp match {
case QueryResult(_, _, rv, _, _, _) =>
rv.foreach(_.rows().foreach { rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map(
"metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> clusterType)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage)
}
case Failure(t) =>
logger.warn("Failure: " + t.getMessage)
if (t.isInstanceOf[TimeoutException]) {
queryThrottle.recordTimeout()
} else {
queryThrottle.recordOnTime()
}
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + qres)
}
}.onComplete { _ =>
// Schedule the next query batch at the beginning of the next interval.
// Note: this "batch delay" setup is intended to keep the TIM config simple; only metering-query-interval
// needs to be configured. But it assumes the time required to sequentially process each
// response is negligible with respect to metering-query-interval.
val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec
scheduler.scheduleOnce(
math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec),
TimeUnit.SECONDS,
() => queryAndSchedule())
}
}
// scalastyle:on method.length
}