Skip to content

Commit

Permalink
onComplete schedules next ask
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Dec 22, 2021
1 parent 4dd70ae commit 49abfae
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,

private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = new TenantIngestionMetering(
Some(new TenantIngestionMetering(
settings,
() => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator },
() => { _coordinators.head._2 })
inst.schedulePeriodicPublishJob()
Some(inst)
() => { _coordinators.head._2 }))
} else None

val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package filodb.coordinator

import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}
Expand All @@ -18,24 +17,28 @@ import filodb.core.DatasetRef
import filodb.query.{QueryError, QueryResult, TsCardinalities}

object QueryThrottle {
// currently just add this diff to the delay if the timeout rate exceeds THRESHOLD
val DELAY_DIFF = FiniteDuration(5L, TimeUnit.MINUTES)
// currently just add this diff to the interval if the timeout rate exceeds THRESHOLD
protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES)
// number of past query timeouts/non-timeouts to consider
val LOOKBACK = 10
// {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the delay
val THRESHOLD = 0.85
protected val LOOKBACK = 10
// {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the interval
protected val THRESHOLD = 0.85
}

object TenantIngestionMetering {
protected val METRIC_ACTIVE = "active_timeseries_by_tenant"
protected val METRIC_TOTAL = "total_timeseries_by_tenant"
}

/**
* Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts.
*
* @param queryDelay the initial delay between each query. This is the duration to be adjusted.
* @param queryInterval the initial delay between each query. This is the duration to be adjusted.
*/
class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging {
class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging {
import QueryThrottle._

private var delay: FiniteDuration = queryDelay.copy()
private val lock = new ReentrantReadWriteLock()
private var interval: FiniteDuration = queryInterval.copy()

// these track timeouts for the past LOOKBACK queries
private var bits = (1 << LOOKBACK) - 1
Expand All @@ -55,13 +58,13 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging {
}

/**
* Updates the delay according to the timeout:non-timeoout ratio.
* Updates the interval according to the timeout:non-timeout ratio.
*/
private def updateDelay(): Unit = {
private def updateInterval(): Unit = {
val successRate = Integer.bitCount(bits).toDouble / LOOKBACK
if (successRate < THRESHOLD) {
delay = delay + DELAY_DIFF
logger.info("too many timeouts; query delay extended to " + delay.toString())
interval = interval + INTERVAL_DIFF
logger.info("too many timeouts; query interval extended to " + interval.toString())
// reset the bits
bits = (1 << LOOKBACK) - 1
}
Expand All @@ -71,30 +74,23 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging {
* Record a query timeout.
*/
def recordTimeout(): Unit = {
lock.writeLock().lock()
setNextBit(false)
updateDelay()
lock.writeLock().unlock()
updateInterval()
}

/**
* Record a query non-timeout.
*/
def recordOnTime(): Unit = {
lock.writeLock().lock()
setNextBit(true)
updateDelay()
lock.writeLock().unlock()
updateInterval()
}

/**
* Returns the current query delay.
* Returns the current query interval.
*/
def getDelay(): FiniteDuration = {
lock.readLock().lock()
val currDelay = delay.copy()
lock.readLock().unlock()
return currDelay
def getInterval(): FiniteDuration = {
interval.copy()
}
}

Expand All @@ -110,31 +106,21 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging {
* queried in the order they're returned from this function.
*/
class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{

// time until first query executes
private val SCHED_INIT_DELAY = FiniteDuration(
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{
import TenantIngestionMetering._

private val clusterType = settings.config.getString("cluster-type")
private var queryAskTimeSec = -1L // unix time of the most recent query ask
private var isStarted = false; // true if startPeriodicQueryAndPublishJob has been called
private val queryThrottle = new QueryThrottle(FiniteDuration(
settings.config.getDuration("metering-query-interval").toSeconds,
TimeUnit.SECONDS)
TimeUnit.SECONDS))

private val CLUSTER_TYPE = settings.config.getString("cluster-type")

private val METRIC_ACTIVE = "active_timeseries_by_tenant"
private val METRIC_TOTAL = "total_timeseries_by_tenant"

private val queryLimiter = new QueryThrottle(SCHED_INIT_DELAY)

def schedulePeriodicPublishJob() : Unit = {
// NOTE: the FiniteDuration overload of scheduleWithFixedDelay
// does not work. Unsure why, but that's why these FiniteDurations are
// awkwardly parsed into seconds.
scheduler.scheduleOnce(
SCHED_INIT_DELAY.toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedule())
}
// immediately begin periodically querying for / publishing cardinality data
queryAndSchedule()

// scalastyle:off method.length
/**
* For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan.
* Schedules:
Expand All @@ -143,47 +129,58 @@ class TenantIngestionMetering(settings: FilodbSettings,
*/
private def queryAndSchedule() : Unit = {
import filodb.query.exec.TsCardExec._

val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns)
val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name)

// use this later to find total elapsed time
queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond

dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)),
queryLimiter.getDelay())
fut.onComplete {
case Success(qresp) =>
queryLimiter.recordOnTime()
qresp match {
case QueryResult(_, _, rv, _, _, _) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map("metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> CLUSTER_TYPE)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage)
}
case Failure(t) =>
logger.warn("Failure: " + t.getMessage)
if (t.isInstanceOf[TimeoutException]) {
queryLimiter.recordTimeout()
} else {
queryLimiter.recordOnTime()
}
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)
queryThrottle.getInterval())

fut.onComplete { tryRes =>
tryRes match {
case Success(qresp) =>
queryThrottle.recordOnTime()
qresp match {
case QueryResult(_, _, rv, _, _, _) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map(
"metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> clusterType)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage)
}
case Failure(t) =>
logger.warn("Failure: " + t.getMessage)
if (t.isInstanceOf[TimeoutException]) {
queryThrottle.recordTimeout()
} else {
queryThrottle.recordOnTime()
}
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)
}

// Delay the next query until the beginning of the next interval.
val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec
scheduler.scheduleOnce(
math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec),
TimeUnit.SECONDS,
() => queryAndSchedule())
}
}

// schedule the next query
scheduler.scheduleOnce(
queryLimiter.getDelay().toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedule())
}
// scalastyle:on method.length
}

0 comments on commit 49abfae

Please sign in to comment.