Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): namespace cardinality publisher #1294

Merged
merged 18 commits into from
Dec 7, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove concurrent asks
  • Loading branch information
alextheimer committed Dec 4, 2021
commit a04c4ac875bf11c53da421b53144d78077904c4e
91 changes: 47 additions & 44 deletions coordinator/src/main/scala/filodb.coordinator/ShardManager.scala
Original file line number Diff line number Diff line change
@@ -3,16 +3,15 @@ package filodb.coordinator
import scala.collection.mutable
import scala.util.{Failure, Success}
import akka.actor.{ActorRef, Address, AddressFromURIString}
import akka.pattern.AskTimeoutException
import com.typesafe.scalalogging.StrictLogging
import org.scalactic._
import filodb.coordinator.NodeClusterActor._
import filodb.core.{DatasetRef, ErrorResponse, Response, Success => SuccessResponse}
import filodb.core.downsample.DownsampleConfig
import filodb.core.metadata.Dataset
import filodb.core.store.{AssignShardConfig, IngestionConfig, StoreConfig}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import filodb.query.QueryResponse

/**
* NodeClusterActor delegates shard management business logic to this class.
@@ -32,8 +31,8 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
import ShardManager._

/**
* Periodically queries randomly-chosen nodes for each dataset's namespace
* cardinalities. Kamon gauges are updated with the response data.
* Periodically queries a node for all namespace cardinalities.
* Kamon gauges are updated with the response data.
*
* The intent is to publish a low-cardinality metric such that namespace
* cardinality queries can be efficiently answered.
@@ -62,60 +61,64 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
private val WS_VALUE = "shard"
private val K: Int = 250

// TODO(a_theimer): can this be static in a function? Weird to have here...
private val ASK_EXTRACTOR: PartialFunction[Any, QueryResponse] = {
case qres: QueryResponse => qres
}

// This will be scheduled when an instance is created.
scheduler.scheduleWithFixedDelay(
SCHED_INIT_DELAY_TU, SCHED_DELAY_TU, TIME_UNIT,
() => {
// Evenly pair LogicalPlan2Query instances to nodes.
// TODO(a_theimer): more efficient (or scala-thonic) way to do this?
val datasetRefs = _datasetInfo.map{case (dsRef, _) => dsRef}.toSeq
val shuffledCoordinators = util.Random.shuffle(_coordinators.map{case (_, actorRef) => actorRef}.toSeq)
val numPairs = 2*datasetInfo.size // one pair for every dataSet/addInactive combo
val lp2qNodePairs = new mutable.ArrayBuffer[(LogicalPlan2Query, ActorRef)](numPairs)
for (i <- 0 until numPairs) {
// Dataset increments every 2; one TopkCardinalities for each addInactive
val dsRef = datasetRefs(i >> 1)
val addInactive: Boolean = if ((i & 1) == 0) false else true // TODO(a_theimer): avoid the 'if'?
val actorRef = shuffledCoordinators(i % shuffledCoordinators.size)
lp2qNodePairs.append((
LogicalPlan2Query(dsRef, TopkCardinalities(Nil, K, addInactive)),
actorRef))
}

// Send a LogicalPlan to each node and store a Future for each.
val futures = new mutable.ArrayBuffer[Future[Any]](numPairs)
lp2qNodePairs.foreach{ case (lp2q, actorRef) =>
futures.append(Client.asyncAsk(
actorRef, lp2q, FiniteDuration(ASK_TIMEOUT_TU, TIME_UNIT)))
}

// Await all futures until ASK_TIMEOUT_TU, then process each.
val futSeq = Future.sequence(
futures.map(_.map(Success(_)).recover{ case t => Failure(t)}))
val readyTrys = Await.result(futSeq, FiniteDuration(ASK_TIMEOUT_TU, TIME_UNIT))
(0 until numPairs).foreach { i =>
// The Try/Dataset below correspond to a pair in lp2qNodePairs
val readyTry = readyTrys(i)
val dsString = lp2qNodePairs(i)._1.dataset.dataset
val addInactive = lp2qNodePairs(i)._1.logicalPlan.asInstanceOf[TopkCardinalities].addInactive
readyTry match {
case s: Success[Any] => {
s.value match {
// foreach addInactive / dataset pair...
Seq(true, false).foreach{ addInactive =>
_datasetInfo.foreach { case (dsRef, _) =>
// get the topk workspace values
val wsResponse = askTopkBlocking(dsRef, Nil, addInactive)
val wsValues = new mutable.ArrayBuffer[String]
wsResponse match {
case qres: QueryResult => {
qres.result.foreach(_.rows().foreach{ rr =>
wsValues.append(rr.getString(0))
}
)}
case qerr: QueryError => ???
}
// for each workspace, get the topk namespaces
wsValues.foreach{ ws =>
val nsRes = askTopkBlocking(dsRef, Seq(ws), addInactive)
nsRes match {
case qres: QueryResult => {
qres.result.foreach(_.rows().foreach { rr =>
val ns = rr.getString(0) // TODO(a_theimer): might need getAny/toString here
qres.result.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val ns = rr.getString(0)
val count = rr.getLong(1)
val tags = Map("_ws_" -> WS_VALUE, "_ns_" -> NS_VALUE,
"ds" -> dsString, "ns" -> ns, "addInactive" -> addInactive.toString)
"ds" -> dsRef.dataset, "ws" -> ws, "ns" -> ns,
"addInactive" -> addInactive.toString)
Kamon.gauge(METRIC_NAME).withTags(TagSet.from(tags)).update(count.toDouble)
})
}
case qerr: QueryError => ???
}
}
case f: Failure[Any] => ???
}
}
})

private def askTopkBlocking(dsRef: DatasetRef,
shardKeyPrefix: Seq[String],
addInactive: Boolean): QueryResponse = {
try {
// asks the first node in _coordinators
Client.actorAsk(_coordinators.head._2,
LogicalPlan2Query(dsRef, TopkCardinalities(shardKeyPrefix, K, addInactive)),
FiniteDuration(ASK_TIMEOUT_TU, TIME_UNIT))(ASK_EXTRACTOR)
} catch {
case e: AskTimeoutException => ???
}

}
}

private val _shardMetricAggregator = new ShardMetricAggregator