Skip to content

Commit

Permalink
feat(query): add hierarchical cardinality query infrastructure (#1289)
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer authored Dec 4, 2021
1 parent 943df27 commit a0558cc
Show file tree
Hide file tree
Showing 21 changed files with 601 additions and 472 deletions.
30 changes: 15 additions & 15 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Arguments(args: Seq[String]) extends ScallopConf(args) {
val shards = opt[List[String]]()
val spread = opt[Int]()
val k = opt[Int]()
val groupdepth = opt[Int]()
val active = opt[Boolean](default = Some(false))
val shardkeyprefix = opt[List[String]](default = Some(List()))
val queries = opt[List[String]](default = Some(List()))
Expand Down Expand Up @@ -109,7 +110,7 @@ object CliMain extends FilodbClusterNode {
println(" --host <hostname/IP> [--port ...] --command status --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command labelvalues --labelnames <lable-names> --labelfilter <label-filter> --dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command labels --labelfilter <label-filter> -dataset <dataset>")
println(" --host <hostname/IP> [--port ...] --command topkcard --dataset <dataset> --k <k> [--active] --shardkeyprefix <shard-key-prefix> ")
println(" --host <hostname/IP> [--port ...] --command tscard --dataset <dataset> --shardkeyprefix <shard-key-prefix> --groupdepth {0,1,2}")
println(" --host <hostname/IP> [--port ...] --command topkcardlocal --dataset prometheus --k 2 --shardkeyprefix demo App-0")
println(" --host <hostname/IP> [--port ...] --command labelcardinality --labelfilter <label-filter> --dataset prometheus")
println(" --host <hostname/IP> [--port ...] --command findqueryshards --queries <query> --spread <spread>")
Expand Down Expand Up @@ -176,7 +177,6 @@ object CliMain extends FilodbClusterNode {
val (remote, ref) = getClientAndRef(args)
val values = remote.getIndexValues(ref, args.indexname(), args.shards().head.toInt, args.limit())
values.foreach { case (term, freq) => println(f"$term%40s\t$freq") }

case Some("topkcardlocal") =>
require(args.host.isDefined && args.dataset.isDefined && args.k.isDefined,
"--host, --dataset, --k must be defined")
Expand All @@ -190,9 +190,11 @@ object CliMain extends FilodbClusterNode {
printf("%40s %20s %20s %15s %15s\n", "Child", "TotalTimeSeries", "ActiveTimeSeries", "Children", "Children")
printf("%40s %20s %20s %15s %15s\n", "Name", "Count", "Count", "Count", "Quota")
println("==============================================================================================================================")
crs._2.sortBy(c => if (addInactive) c.tsCount else c.activeTsCount)(Ordering.Int.reverse).foreach { cr =>
printf("%40s %20d %20d %15d %15d\n", cr.childName, cr.tsCount, cr.activeTsCount, cr.childrenCount, cr.childrenQuota)
}
crs._2.sortBy(c => if (addInactive) c.tsCount else c.activeTsCount)(Ordering.Int.reverse)
.foreach { cr =>
printf("%40s %20d %20d %15d %15d\n", cr.prefix, cr.tsCount,
cr.activeTsCount, cr.childrenCount, cr.childrenQuota)
}
}

case Some("status") =>
Expand Down Expand Up @@ -254,12 +256,12 @@ object CliMain extends FilodbClusterNode {
parseLabelCardinalityQuery(remote, args.labelfilter(), args.dataset(),
getQueryRange(args), options)

case Some("topkcard") =>
case Some("tscard") =>
require(args.host.isDefined && args.dataset.isDefined && args.labelfilter.isDefined, "--host, --dataset and --labelfilter must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.samplelimit(), args.everynseconds.map(_.toInt).toOption,
timeout, args.shards.map(_.map(_.toInt)).toOption, args.spread.toOption.map(Integer.valueOf))
parseTopkCardQuery(remote, args.k(), args.active(), args.shardkeyprefix(), args.dataset(), options)
parseTsCardQuery(remote, args.shardkeyprefix(), args.groupdepth(), args.dataset(), options)

case x: Any =>
// This will soon be deprecated
Expand Down Expand Up @@ -381,14 +383,12 @@ object CliMain extends FilodbClusterNode {
executeQuery2(client, dataset, logicalPlan, options, UnavailablePromQlQueryParams)
}

def parseTopkCardQuery(client: LocalClient,
k: Int,
active: Boolean,
shardKeyPrefix: Seq[String],
dataset: String,
options: QOptions): Unit = {
val addInactive = !active
val logicalPlan = TopkCardinalities(shardKeyPrefix, k, addInactive)
def parseTsCardQuery(client: LocalClient,
shardKeyPrefix: Seq[String],
groupDepth: Int,
dataset: String,
options: QOptions): Unit = {
val logicalPlan = TsCardinalities(shardKeyPrefix, groupDepth)
executeQuery2(client, dataset, logicalPlan, options, UnavailablePromQlQueryParams)
}

Expand Down
17 changes: 15 additions & 2 deletions coordinator/src/main/scala/filodb.coordinator/QueryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package filodb.coordinator
import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread}

import scala.collection.mutable
import scala.util.control.NonFatal

import akka.actor.{ActorRef, Props}
Expand All @@ -18,6 +19,7 @@ import net.ceedubs.ficus.readers.ValueReader
import filodb.coordinator.queryplanner.SingleClusterPlanner
import filodb.core._
import filodb.core.memstore.{FiloSchedulers, MemStore, TermInfo}
import filodb.core.memstore.ratelimit.CardinalityRecord
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.query.{QueryConfig, QueryContext, QuerySession, QueryStats}
import filodb.core.store.CorruptVectorException
Expand Down Expand Up @@ -212,9 +214,20 @@ final class QueryActor(memStore: MemStore,
}

private def execTopkCardinalityQuery(q: GetTopkCardinality, sender: ActorRef): Unit = {
implicit val ord = new Ordering[CardinalityRecord]() {
override def compare(x: CardinalityRecord, y: CardinalityRecord): Int = {
if (q.addInactive) x.tsCount - y.tsCount
else x.activeTsCount - y.activeTsCount
}
}.reverse
try {
val ret = memStore.topKCardinality(q.dataset, q.shards, q.shardKeyPrefix, q.k, q.addInactive)
sender ! ret
val cards = memStore.scanTsCardinalities(q.dataset, q.shards, q.shardKeyPrefix, q.depth)
val heap = mutable.PriorityQueue[CardinalityRecord]()
cards.foreach { card =>
heap.enqueue(card)
if (heap.size > q.k) heap.dequeue()
}
sender ! heap.toSeq
} catch { case e: Exception =>
sender ! QueryError(s"Error Occurred", QueryStats(), e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object QueryCommands {
final case class GetTopkCardinality(dataset: DatasetRef,
shards: Seq[Int],
shardKeyPrefix: Seq[String],
depth: Int,
k: Int,
addInactive: Boolean,
submitTime: Long = System.currentTimeMillis()) extends QueryCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ trait QueryOps extends ClientBase with StrictLogging {
k: Int,
addInactive: Boolean,
timeout: FiniteDuration = 15.seconds): Seq[CardinalityRecord] =
askCoordinator(GetTopkCardinality(dataset, shards, shardKeyPrefix, k, addInactive), timeout) {
askCoordinator(
GetTopkCardinality(dataset, shards, shardKeyPrefix,
shardKeyPrefix.size + 1, k, addInactive),
timeout) {
case s: Seq[CardinalityRecord] @unchecked => s
case e: QueryError => throw e.t
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ object LogicalPlanUtils extends StrictLogging {
case lp: LabelValues => TimeRange(lp.startMs, lp.endMs)
case lp: LabelCardinality => TimeRange(lp.startMs, lp.endMs)
case lp: LabelNames => TimeRange(lp.startMs, lp.endMs)
case lp: TopkCardinalities => throw new IllegalArgumentException(
"no time params for TopKCardinalitites")
case lp: TsCardinalities => throw new IllegalArgumentException("no time params for TsCardinalitites")
case lp: SeriesKeysByFilters => TimeRange(lp.startMs, lp.endMs)
case lp: ApplyInstantFunctionRaw => getTimeFromLogicalPlan(lp.vectors)
case lp: ScalarBinaryOperation => TimeRange(lp.rangeParams.startSecs * 1000, lp.rangeParams.endSecs * 1000)
Expand All @@ -93,7 +92,7 @@ object LogicalPlanUtils extends StrictLogging {
case lp: LabelValues => lp.copy(startMs = timeRange.startMs, endMs = timeRange.endMs)
case lp: LabelNames => lp.copy(startMs = timeRange.startMs, endMs = timeRange.endMs)
case lp: LabelCardinality => lp.copy(startMs = timeRange.startMs, endMs = timeRange.endMs)
case lp: TopkCardinalities => lp.copy()
case lp: TsCardinalities => lp // immutable & no members need to be updated
case lp: SeriesKeysByFilters => lp.copy(startMs = timeRange.startMs, endMs = timeRange.endMs)
}
}
Expand Down Expand Up @@ -344,7 +343,7 @@ object LogicalPlanUtils extends StrictLogging {
case lp: RawChunkMeta => None
case sq: SubqueryWithWindowing => getPeriodicSeriesPlan(sq.innerPeriodicSeries)
case tlsq: TopLevelSubquery => getPeriodicSeriesPlan(tlsq.innerPeriodicSeries)
case lp: TopkCardinalities => None
case lp: TsCardinalities => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ import filodb.query.exec._
_: ApplyInstantFunctionRaw |
_: RawSeries |
_: LabelNames |
_: TopkCardinalities => rawClusterMaterialize(qContext, logicalPlan)

_: TsCardinalities => rawClusterMaterialize(qContext, logicalPlan)
}
}
else logicalPlan match {
Expand All @@ -185,7 +184,7 @@ import filodb.query.exec._
case lp: BinaryJoin => materializeBinaryJoin(qContext, lp)
case lp: ScalarVectorBinaryOperation => materializeScalarVectorBinOp(qContext, lp)
case lp: LabelValues => rawClusterMaterialize(qContext, lp)
case lp: TopkCardinalities => rawClusterMaterialize(qContext, lp)
case lp: TsCardinalities => rawClusterMaterialize(qContext, lp)
case lp: SeriesKeysByFilters => rawClusterMaterialize(qContext, lp)
case lp: ApplyMiscellaneousFunction => materializeApplyMiscellaneousFunction(qContext, lp)
case lp: ApplySortFunction => materializeApplySortFunction(qContext, lp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
case lp: ApplyAbsentFunction => super.materializeAbsentFunction(qContext, lp)
case lp: ScalarBinaryOperation => super.materializeScalarBinaryOperation(qContext, lp)
case lp: ApplyLimitFunction => super.materializeLimitFunction(qContext, lp)
case _: TopkCardinalities => throw new IllegalArgumentException("TopkCardinalities unexpected here")
case _: TsCardinalities => throw new IllegalArgumentException("TsCardinalities unexpected here")

// Imp: At the moment, these two cases for subquery will not get executed, materialize is already
// Checking if the plan is a TopLevelSubQuery or any of the descendant is a SubqueryWithWindowing and
Expand Down Expand Up @@ -429,31 +429,6 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
PlanResult(execPlan::Nil)
}

private def materializeTopkCardinalities(qContext: QueryContext, lp: TopkCardinalities) : PlanResult = {
// This code is nearly identical to materializeMetadataQueryPlan, but it prevents some
// boilerplate code clutter that results when TopkCardinalities extends MetadataQueryPlan.
// If this code's maintenance isn't worth some extra stand-in lines of code (i.e. at matchers that
// take MetadataQueryPlan instances), then just extend TopkCardinalities from MetadataQueryPlan.
val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val partitions = partitionLocationProvider.getAuthorizedPartitions(
TimeRange(queryParams.startSecs * 1000, queryParams.endSecs * 1000))
val execPlan = if (partitions.isEmpty) {
logger.warn(s"No partitions found for ${queryParams.startSecs}, ${queryParams.endSecs}")
localPartitionPlanner.materialize(lp, qContext)
} else {
val leafPlans = partitions.map { p =>
logger.debug(s"partitionInfo=$p; queryParams=$queryParams")
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(lp.copy(), qContext)
else ??? // TODO: handle remote
}.toSeq
val reducer = TopkCardReduceExec(qContext, inProcessPlanDispatcher, leafPlans, lp.k)
reducer.addRangeVectorTransformer(TopkCardPresenter(lp.k))
reducer
}
PlanResult(execPlan::Nil)
}

private def createMetadataRemoteExec(qContext: QueryContext, partitionAssignment: PartitionAssignment,
urlParams: Map[String, String]) = {
val finalQueryContext = generateRemoteExecParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class SingleClusterPlanner(val dataset: Dataset,
many.head match {
case _: LabelValuesExec => LabelValuesDistConcatExec(qContext, targetActor, many)
case _: LabelNamesExec => LabelNamesDistConcatExec(qContext, targetActor, many)
case _: TsCardExec => TsCardReduceExec(qContext, targetActor, many)
case _: LabelCardinalityExec => {
val reduceExec = LabelCardinalityReduceExec(qContext, targetActor, many)
// Presenter here is added separately which use the bytes representing the sketch to get an estimate
Expand All @@ -146,11 +147,6 @@ class SingleClusterPlanner(val dataset: Dataset,
}
reduceExec
}
case lce: TopkCardExec => {
val reducer = TopkCardReduceExec(qContext, targetActor, many, lce.k)
reducer.addRangeVectorTransformer(TopkCardPresenter(lce.k))
reducer
}
case ske: PartKeysExec => PartKeysDistConcatExec(qContext, targetActor, many)
case ep: ExecPlan =>
val topPlan = LocalPartitionDistConcatExec(qContext, targetActor, many)
Expand Down Expand Up @@ -240,7 +236,7 @@ class SingleClusterPlanner(val dataset: Dataset,
case lp: ScalarVectorBinaryOperation => materializeScalarVectorBinOp(qContext, lp)
case lp: LabelValues => materializeLabelValues(qContext, lp)
case lp: LabelNames => materializeLabelNames(qContext, lp)
case lp: TopkCardinalities => materializeTopkCardinalities(qContext, lp)
case lp: TsCardinalities => materializeTsCardinalities(qContext, lp)
case lp: SeriesKeysByFilters => materializeSeriesKeysByFilters(qContext, lp)
case lp: ApplyMiscellaneousFunction => materializeApplyMiscellaneousFunction(qContext, lp)
case lp: ApplySortFunction => materializeApplySortFunction(qContext, lp)
Expand Down Expand Up @@ -509,11 +505,11 @@ class SingleClusterPlanner(val dataset: Dataset,
PlanResult(metaExec, false)
}

private def materializeTopkCardinalities(qContext: QueryContext,
lp: TopkCardinalities): PlanResult = {
private def materializeTsCardinalities(qContext: QueryContext,
lp: TsCardinalities): PlanResult = {
val metaExec = shardMapperFunc.assignedShards.map{ shard =>
val dispatcher = dispatcherForShard(shard)
exec.TopkCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.k, lp.addInactive)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.groupDepth)
}
PlanResult(metaExec, false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,25 +807,21 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
validatePlan(execPlan, expected)
}

it ("should correctly materialize TopkCardExec") {
val k = 3
it ("should correctly materialize TsCardExec") {
val shardKeyPrefix = Seq("foo", "bar")
val groupDepth = 2

val addInactive = true
val lp = TopkCardinalities(shardKeyPrefix, k, addInactive)
val lp = TsCardinalities(shardKeyPrefix, groupDepth)
val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams))
execPlan.isInstanceOf[TopkCardReduceExec] shouldEqual true
execPlan.isInstanceOf[TsCardReduceExec] shouldEqual true

val reducer = execPlan.asInstanceOf[TopkCardReduceExec]
reducer.rangeVectorTransformers.size shouldEqual 1
reducer.rangeVectorTransformers(0).isInstanceOf[TopkCardPresenter] shouldEqual true
reducer.rangeVectorTransformers(0).asInstanceOf[TopkCardPresenter].k shouldEqual k
val reducer = execPlan.asInstanceOf[TsCardReduceExec]
reducer.children.size shouldEqual mapper.numShards
reducer.children.foreach{ child =>
child.isInstanceOf[TopkCardExec] shouldEqual true
val leaf = child.asInstanceOf[TopkCardExec]
child.isInstanceOf[TsCardExec] shouldEqual true
val leaf = child.asInstanceOf[TsCardExec]
leaf.shardKeyPrefix shouldEqual shardKeyPrefix
leaf.k shouldEqual k
leaf.groupDepth shouldEqual groupDepth
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,6 @@ extends MemStore with StrictLogging {
chunkMethod: ChunkScanMethod): Observable[RawPartData] = ???

// TODO we need breakdown for downsample store too, but in a less memory intensive way
override def topKCardinality(ref: DatasetRef,
shards: Seq[Int],
shardKeyPrefix: scala.Seq[String],
k: Int,
addInactive: Boolean): scala.Seq[CardinalityRecord] = ???
override def scanTsCardinalities(ref: DatasetRef, shards: Seq[Int],
shardKeyPrefix: Seq[String], depth: Int): scala.Seq[CardinalityRecord] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,16 @@ extends MemStore with StrictLogging {
}
}

def topKCardinality(ref: DatasetRef, shards: Seq[Int],
shardKeyPrefix: Seq[String], k: Int, addInactive: Boolean): Seq[CardinalityRecord] = {
def scanTsCardinalities(ref: DatasetRef, shards: Seq[Int],
shardKeyPrefix: Seq[String], depth: Int): Seq[CardinalityRecord] = {
datasets.get(ref).toSeq
.flatMap { ts =>
ts.values().asScala
.filter(s => shards.isEmpty || shards.contains(s.shardNum))
.flatMap(_.topKCardinality(k, shardKeyPrefix, addInactive))
.flatMap(_.scanTsCardinalities(shardKeyPrefix, depth))
}
}

/**
* WARNING: use only for testing. Not performant
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,12 @@ class TimeSeriesShard(val ref: DatasetRef,
_offset
}

def topKCardinality(k: Int, shardKeyPrefix: Seq[String], addInactive: Boolean): Seq[CardinalityRecord] = {
if (storeConfig.meteringEnabled) cardTracker.topk(k, shardKeyPrefix, addInactive)
else throw new IllegalArgumentException("Metering is not enabled")
def scanTsCardinalities(shardKeyPrefix: Seq[String], depth: Int): Seq[CardinalityRecord] = {
if (storeConfig.meteringEnabled) {
cardTracker.scan(shardKeyPrefix, depth)
} else {
throw new IllegalArgumentException("Metering is not enabled")
}
}

def startFlushingIndex(): Unit =
Expand Down
Loading

0 comments on commit a0558cc

Please sign in to comment.