Skip to content

Commit

Permalink
misc(query): add cardinality limits to binary-join, group-by queries (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sherali42 authored Jul 8, 2020
1 parent 961cfbc commit 3c04bcf
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 8 deletions.
6 changes: 6 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ filodb {
# Maximum number of samples to return in a query
sample-limit = 1000000

# Binary Join Cardinality limit
join-cardinality-limit = 25000

# Group by Cardinality limit
group-by-cardinality-limit = 1000

# Minimum step required for a query
min-step = 5 seconds

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/filodb.core/query/QueryContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ final case class QueryContext(origQueryParams: TsdbQueryParams = UnavailableProm
spreadOverride: Option[SpreadProvider] = None,
queryTimeoutMillis: Int = 30000,
sampleLimit: Int = 1000000,
groupByCardLimit: Int = 100000,
joinQueryCardLimit: Int = 100000,
shardOverrides: Option[Seq[Int]] = None,
queryId: String = UUID.randomUUID().toString,
submitTime: Long = System.currentTimeMillis())
Expand Down
2 changes: 2 additions & 0 deletions http/src/main/scala/filodb/http/HttpSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ class HttpSettings(config: Config, val filoSettings: FilodbSettings) {
lazy val queryDefaultSpread = config.getInt("filodb.spread-default")
lazy val querySampleLimit = config.getInt("filodb.query.sample-limit")
lazy val queryAskTimeout = config.as[FiniteDuration]("filodb.query.ask-timeout")
lazy val queryBinaryJoinCardLimit = config.getInt("filodb.query.join-cardinality-limit")
lazy val queryGroupByCardLimit = config.getInt("filodb.query.group-by-cardinality-limit")
}
17 changes: 13 additions & 4 deletions query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ final case class ReduceAggregateExec(queryContext: QueryContext,
val task = for { schema <- firstSchema }
yield {
val aggregator = RowAggregator(aggrOp, aggrParams, schema)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = true, results, rv => rv.key)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = true, results, rv => rv.key,
querySession.qContext.groupByCardLimit)
}
Observable.fromTask(task).flatten
}
Expand Down Expand Up @@ -76,10 +77,12 @@ final case class AggregateMapReduce(aggrOp: AggregationOperator,
sourceSchema.fixedVectorLen.filter(_ <= querySession.queryConfig.fastReduceMaxWindows).map { numWindows =>
RangeVectorAggregator.fastReduce(aggregator, false, source, numWindows)
}.getOrElse {
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping,
querySession.qContext.groupByCardLimit)
}
} else {
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping,
querySession.qContext.groupByCardLimit)
}
}

Expand Down Expand Up @@ -132,11 +135,17 @@ object RangeVectorAggregator extends StrictLogging {
def mapReduce(rowAgg: RowAggregator,
skipMapPhase: Boolean,
source: Observable[RangeVector],
grouping: RangeVector => RangeVectorKey): Observable[RangeVector] = {
grouping: RangeVector => RangeVectorKey,
cardinalityLimit: Int = Int.MaxValue): Observable[RangeVector] = {
// reduce the range vectors using the foldLeft construct. This results in one aggregate per group.
val task = source.toListL.map { rvs =>
// now reduce each group and create one result range vector per group
val groupedResult = mapReduceInternal(rvs, rowAgg, skipMapPhase, grouping)

// if group-by cardinality breaches the limit, throw exception
if (groupedResult.size > cardinalityLimit)
throw new BadQueryException(s"This query results in more than $cardinalityLimit group-by cardinality limit. " +
s"Try applying more filters")
groupedResult.map { case (rvk, aggHolder) =>
val rowIterator = new CustomCloseCursor(aggHolder.map(_.toRowReader))(aggHolder.close())
IteratorBackedRangeVector(rvk, rowIterator)
Expand Down
14 changes: 11 additions & 3 deletions query/src/main/scala/filodb/query/exec/BinaryJoinExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ final case class BinaryJoinExec(queryContext: QueryContext,

protected def args: String = s"binaryOp=$binaryOp, on=$on, ignoring=$ignoring"

//scalastyle:off method.length
protected[exec] def compose(childResponses: Observable[(QueryResponse, Int)],
firstSchema: Task[ResultSchema],
querySession: QuerySession): Observable[RangeVector] = {
val taskOfResults = childResponses.map {
case (QueryResult(_, _, result), _)
if (result.size > queryContext.joinQueryCardLimit && cardinality == Cardinality.OneToOne) =>
throw new BadQueryException(s"This query results in more than ${queryContext.joinQueryCardLimit} " +
s"join cardinality. Try applying more filters.")
case (QueryResult(_, _, result), i) => (result, i)
case (QueryError(_, ex), _) => throw ex
}.toListL.map { resp =>
Expand All @@ -73,12 +78,10 @@ final case class BinaryJoinExec(queryContext: QueryContext,
// require(resp.size == lhs.size + rhs.size, "Did not get sufficient responses for LHS and RHS")
val lhsRvs = resp.filter(_._2 < lhs.size).flatMap(_._1)
val rhsRvs = resp.filter(_._2 >= lhs.size).flatMap(_._1)

// figure out which side is the "one" side
val (oneSide, otherSide, lhsIsOneSide) =
if (cardinality == Cardinality.OneToMany) (lhsRvs, rhsRvs, true)
else (rhsRvs, lhsRvs, false)

// load "one" side keys in a hashmap
val oneSideMap = new mutable.HashMap[Map[Utf8Str, Utf8Str], RangeVector]()
oneSide.foreach { rv =>
Expand All @@ -90,7 +93,6 @@ final case class BinaryJoinExec(queryContext: QueryContext,
}
oneSideMap.put(jk, rv)
}

// keep a hashset of result range vector keys to help ensure uniqueness of result range vectors
val resultKeySet = new mutable.HashSet[RangeVectorKey]()
// iterate across the the "other" side which could be one or many and perform the binary operation
Expand All @@ -102,6 +104,12 @@ final case class BinaryJoinExec(queryContext: QueryContext,
throw new BadQueryException(s"Non-unique result vectors found for $resKey. " +
s"Use grouping to create unique matching")
resultKeySet.add(resKey)

// OneToOne cardinality case is already handled. this condition handles OneToMany case
if (resultKeySet.size > queryContext.joinQueryCardLimit)
throw new BadQueryException(s"This query results in more than ${queryContext.joinQueryCardLimit} " +
s"join cardinality. Try applying more filters.")

val res = if (lhsIsOneSide) binOp(rvOne.rows, rvOther.rows) else binOp(rvOther.rows, rvOne.rows)
IteratorBackedRangeVector(resKey, res)
}
Expand Down
50 changes: 50 additions & 0 deletions query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalatest.{FunSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException

import filodb.core.metadata.Column.ColumnType
import filodb.core.query._
Expand Down Expand Up @@ -358,4 +359,53 @@ class BinaryJoinExecSpec extends FunSpec with Matchers with ScalaFutures {

result.map(_.key).toSet.size shouldEqual 200
}

it("should throw BadQueryException - one-to-one with ignoring - cardinality limit 1") {
val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1
val execPlan = BinaryJoinExec(queryContext, dummyDispatcher,
Array(dummyPlan), // cannot be empty as some compose's rely on the schema
new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute
BinaryOperator.ADD,
Cardinality.OneToOne,
Nil, Seq("tag2"), Nil, "__name__")

// scalastyle:off
val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
// val lhs = QueryResult("someId", null, samplesLhs.filter(rv => rv.key.labelValues.get(ZeroCopyUTF8String("tag2")).get.equals("tag1-1")).map(rv => SerializedRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
// scalastyle:on

// actual query results into 2 rows. since limit is 1, this results in BadQueryException
val thrown = intercept[TestFailedException] {
execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession)
.toListL.runAsync.futureValue
}
thrown.getCause.getClass shouldEqual classOf[BadQueryException]
thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." +
" Try applying more filters."
}

it("should throw BadQueryException - one-to-one with on - cardinality limit 1") {
val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1
val execPlan = BinaryJoinExec(queryContext, dummyDispatcher,
Array(dummyPlan), // cannot be empty as some compose's rely on the schema
new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute
BinaryOperator.ADD,
Cardinality.OneToOne,
Seq("tag1", "job"), Nil, Nil, "__name__")

// scalastyle:off
val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
// scalastyle:on

// actual query results into 2 rows. since limit is 1, this results in BadQueryException
val thrown = intercept[TestFailedException] {
execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession)
.toListL.runAsync.futureValue
}
thrown.getCause.getClass shouldEqual classOf[BadQueryException]
thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." +
" Try applying more filters."
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalatest.{FunSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException

import filodb.core.metadata.Column.ColumnType
import filodb.core.query._
Expand Down Expand Up @@ -118,7 +119,6 @@ class BinaryJoinGroupingSpec extends FunSpec with Matchers with ScalaFutures {
)

it("should join many-to-one with on ") {

val samplesRhs2 = scala.util.Random.shuffle(sampleNodeRole.toList) // they may come out of order

val execPlan = BinaryJoinExec(QueryContext(), dummyDispatcher,
Expand Down Expand Up @@ -388,4 +388,90 @@ class BinaryJoinGroupingSpec extends FunSpec with Matchers with ScalaFutures {
result.size shouldEqual 2
result.map(_.key.labelValues) sameElements(expectedLabels) shouldEqual true
}

it("should throw BadQueryException - many-to-one with on - cardinality limit 1") {
val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1
val samplesRhs2 = scala.util.Random.shuffle(sampleNodeRole.toList) // they may come out of order

val execPlan = BinaryJoinExec(queryContext, dummyDispatcher,
Array(dummyPlan), // cannot be empty as some compose's rely on the schema
new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute
BinaryOperator.MUL,
Cardinality.ManyToOne,
Seq("instance"), Nil, Seq("role"), "__name__")

// scalastyle:off
val lhs = QueryResult("someId", null, sampleNodeCpu.map(rv => SerializedRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhs2.map(rv => SerializedRangeVector(rv, schema)))
// scalastyle:on

// actual query results into 2 rows. since limit is 1, this results in BadQueryException
val thrown = intercept[TestFailedException] {
execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession)
.toListL.runAsync.futureValue
}

thrown.getCause.getClass shouldEqual classOf[BadQueryException]
thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." +
" Try applying more filters."
}

it("should throw BadQueryException - many-to-one with ignoring - cardinality limit 1") {
val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1
val samplesRhs2 = scala.util.Random.shuffle(sampleNodeRole.toList) // they may come out of order

val execPlan = BinaryJoinExec(queryContext, dummyDispatcher,
Array(dummyPlan),
new Array[ExecPlan](1),
BinaryOperator.MUL,
Cardinality.ManyToOne,
Nil, Seq("role", "mode"), Seq("role"), "__name__")

// scalastyle:off
val lhs = QueryResult("someId", null, sampleNodeCpu.map(rv => SerializedRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhs2.map(rv => SerializedRangeVector(rv, schema)))
// scalastyle:on

// actual query results into 2 rows. since limit is 1, this results in BadQueryException
val thrown = intercept[TestFailedException] {
execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession)
.toListL.runAsync.futureValue
}

thrown.getCause.getClass shouldEqual classOf[BadQueryException]
thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." +
" Try applying more filters."
}

it("should throw BadQueryException - many-to-one with by and grouping without arguments - cardinality limit 1") {
val queryContext = QueryContext(joinQueryCardLimit = 3) // set join card limit to 3
val agg = RowAggregator(AggregationOperator.Sum, Nil, tvSchema)
val aggMR = AggregateMapReduce(AggregationOperator.Sum, Nil, Nil, Seq("instance", "job"))
val mapped = aggMR(Observable.fromIterable(sampleNodeCpu), querySession, 1000, tvSchema)

val resultObs4 = RangeVectorAggregator.mapReduce(agg, true, mapped, rv=>rv.key)
val samplesRhs = resultObs4.toListL.runAsync.futureValue

val execPlan = BinaryJoinExec(queryContext, dummyDispatcher,
Array(dummyPlan),
new Array[ExecPlan](1),
BinaryOperator.DIV,
Cardinality.ManyToOne,
Seq("instance"), Nil, Nil, "__name__")

// scalastyle:off
val lhs = QueryResult("someId", null, sampleNodeCpu.map(rv => SerializedRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhs.map(rv => SerializedRangeVector(rv, schema)))
// scalastyle:on

// actual query results into 4 rows. since limit is 3, this results in BadQueryException
val thrown = intercept[TestFailedException] {
execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession)
.toListL.runAsync.futureValue
}

thrown.getCause.getClass shouldEqual classOf[BadQueryException]
thrown.getCause.getMessage shouldEqual "This query results in more than 3 join cardinality." +
" Try applying more filters."
}
}

0 comments on commit 3c04bcf

Please sign in to comment.