Skip to content

Commit

Permalink
Merge branch 'develop' into integration-0.9.8 -take2
Browse files Browse the repository at this point in the history
  • Loading branch information
jackson-paul committed Jul 9, 2020
2 parents 5e91a44 + 3c04bcf commit 22c1da6
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,33 +187,35 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
* handle this case.
*/
// scalastyle:off parameter.number
def getChunksByIngestionTimeRange(datasetRef: DatasetRef,
splits: Iterator[ScanSplit],
ingestionTimeStart: Long,
ingestionTimeEnd: Long,
userTimeStart: Long,
endTimeExclusive: Long,
maxChunkTime: Long,
batchSize: Int,
batchTime: FiniteDuration): Observable[Seq[RawPartData]] = {
val partKeys = Observable.fromIterator(splits).flatMap {
def getChunksByIngestionTimeRangeNoAsync(datasetRef: DatasetRef,
splits: Iterator[ScanSplit],
ingestionTimeStart: Long,
ingestionTimeEnd: Long,
userTimeStart: Long,
endTimeExclusive: Long,
maxChunkTime: Long,
batchSize: Int): Iterator[Seq[RawPartData]] = {
val partKeys = splits.flatMap {
case split: CassandraTokenRangeSplit =>
val indexTable = getOrCreateIngestionTimeIndexTable(datasetRef)
logger.debug(s"Querying cassandra for partKeys for split=$split ingestionTimeStart=$ingestionTimeStart " +
s"ingestionTimeEnd=$ingestionTimeEnd")
indexTable.scanPartKeysByIngestionTime(split.tokens, ingestionTimeStart, ingestionTimeEnd)
indexTable.scanPartKeysByIngestionTimeNoAsync(split.tokens, ingestionTimeStart, ingestionTimeEnd)
case split => throw new UnsupportedOperationException(s"Unknown split type $split seen")
}

import filodb.core.Iterators._

val chunksTable = getOrCreateChunkTable(datasetRef)
partKeys.bufferTimedAndCounted(batchTime, batchSize).map { parts =>
partKeys.sliding(batchSize, batchSize).map { parts =>
logger.debug(s"Querying cassandra for chunks from ${parts.size} partitions userTimeStart=$userTimeStart " +
s"endTimeExclusive=$endTimeExclusive maxChunkTime=$maxChunkTime")
// TODO evaluate if we can increase parallelism here. This needs to be tuneable
// based on how much faster downsampling should run, and how much additional read load cassandra can take.
chunksTable.readRawPartitionRangeBB(parts, userTimeStart - maxChunkTime, endTimeExclusive).toIterator().toSeq
// This could be more parallel, but decision was made to control parallelism at one place: In spark (via its
// parallelism configuration. Revisit if needed later.
val batchReadSpan = Kamon.spanBuilder("cassandra-per-batch-data-read-latency").start()
try {
chunksTable.readRawPartitionRangeBBNoAsync(parts, userTimeStart - maxChunkTime, endTimeExclusive)
} finally {
batchReadSpan.finish()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import java.nio.ByteBuffer
import scala.concurrent.{ExecutionContext, Future}

import com.datastax.driver.core.{ConsistencyLevel, ResultSet, Row}
import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
Expand Down Expand Up @@ -101,10 +100,10 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
}
}

def scanPartKeysByIngestionTime(tokens: Seq[(String, String)],
ingestionTimeStart: Long,
ingestionTimeEnd: Long): Observable[ByteBuffer] = {
val it = tokens.iterator.flatMap { case (start, end) =>
def scanPartKeysByIngestionTimeNoAsync(tokens: Seq[(String, String)],
ingestionTimeStart: Long,
ingestionTimeEnd: Long): Iterator[ByteBuffer] = {
tokens.iterator.flatMap { case (start, end) =>
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
Expand All @@ -114,10 +113,8 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
end.toLong: java.lang.Long,
ingestionTimeStart: java.lang.Long,
ingestionTimeEnd: java.lang.Long)
session.execute(stmt).iterator.asScala
.map { row => row.getBytes("partition") }
session.execute(stmt).asScala.map { row => row.getBytes("partition") }.toSet.iterator
}
Observable.fromIterator(it).handleObservableErrors
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
} yield rpd
}

/**
* Not an async call - limit number of partitions queried at a time
*/
def readRawPartitionRangeBBNoAsync(partitions: Seq[ByteBuffer],
startTime: Long,
endTimeExclusive: Long): Seq[RawPartData] = {
val query = readChunkRangeCql.bind().setList(0, partitions.asJava, classOf[ByteBuffer])
.setLong(1, chunkID(startTime, 0))
.setLong(2, chunkID(endTimeExclusive, 0))
session.execute(query).iterator().asScala
.map { row => (row.getBytes(0), chunkSetFromRow(row, 1)) }
.sortedGroupBy(_._1)
.map { case (partKeyBuffer, chunkSetIt) =>
RawPartData(partKeyBuffer.array, chunkSetIt.map(_._2).toBuffer)
}.toSeq
}

def scanPartitionsBySplit(tokens: Seq[(String, String)]): Observable[RawPartData] = {

val res: Observable[Future[Iterator[RawPartData]]] = Observable.fromIterable(tokens).map { case (start, end) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import filodb.core._
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.store.{ChunkSet, ChunkSetInfo, ColumnStoreSpec, PartKeyRecord}
import filodb.memory.BinaryRegionLarge
import filodb.memory.format.UnsafeUtils
import filodb.memory.{BinaryRegionLarge, NativeMemoryManager}
import filodb.memory.format.{TupleRowReader, UnsafeUtils}
import filodb.memory.format.ZeroCopyUTF8String._

class CassandraColumnStoreSpec extends ColumnStoreSpec {
Expand All @@ -27,6 +27,21 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
lazy val colStore = new CassandraColumnStore(config, s, session)
lazy val metaStore = new CassandraMetaStore(config.getConfig("cassandra"), session)

val nativeMemoryManager = new NativeMemoryManager(100000000L, Map.empty)
val promDataset = Dataset("prometheus", Schemas.gauge)

// First create the tables in C*
override def beforeAll(): Unit = {
super.beforeAll()
colStore.initialize(promDataset.ref, 1).futureValue
colStore.truncate(promDataset.ref, 1).futureValue
}

override def afterAll(): Unit = {
super.afterAll()
nativeMemoryManager.shutdown()
}

"getScanSplits" should "return splits from Cassandra" in {
// Single split, token_start should equal token_end
val singleSplits = colStore.getScanSplits(dataset.ref).asInstanceOf[Seq[CassandraTokenRangeSplit]]
Expand Down Expand Up @@ -224,4 +239,39 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
parts(0).chunkSetsTimeOrdered should have length (1)
parts(0).chunkSetsTimeOrdered(0).vectors.toSeq shouldEqual sourceChunks.head.chunks
}

"getChunksByIngestionTimeRangeNoAsync" should "batch partitions properly" in {

val gaugeName = "my_gauge"
val seriesTags = Map("_ws_".utf8 -> "my_ws".utf8)
val firstSampleTime = 74373042000L
val partBuilder = new RecordBuilder(nativeMemoryManager)
val ingestTime = 1594130687316L
val chunksets = for {
i <- 0 until 1050
partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags)
c <- 0 until 3
} yield {
val rows = Seq(TupleRowReader((Some(firstSampleTime + c), Some(0.0d))))
ChunkSet(Schemas.gauge.data, partKey, ingestTime, rows, nativeMemoryManager)
}
colStore.write(promDataset.ref, Observable.fromIterable(chunksets)).futureValue

val batches = colStore.getChunksByIngestionTimeRangeNoAsync(
promDataset.ref,
colStore.getScanSplits(promDataset.ref).iterator,
ingestTime - 1,
ingestTime + 1,
firstSampleTime - 1,
firstSampleTime + 5,
10L,
100
).toList

batches.size shouldEqual 11 // 100 rows per batch, 1050 rows => 11 batches
batches.zipWithIndex.foreach { case (b, i) =>
b.size shouldEqual (if (i == 10) 50 else 100)
b.foreach(_.chunkSetsTimeOrdered.size shouldEqual 3)
}
}
}
11 changes: 7 additions & 4 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ filodb {
# Maximum number of samples to return in a query
sample-limit = 1000000

# Binary Join Cardinality limit
join-cardinality-limit = 25000

# Group by Cardinality limit
group-by-cardinality-limit = 1000

# Minimum step required for a query
min-step = 5 seconds

Expand Down Expand Up @@ -233,10 +239,7 @@ filodb {
# cass-session-provider-fqcn = fqcn

# Number of time series to operate on at one time. Reduce if there is much less memory available
cass-write-batch-size = 10000

# Maximum time to wait during cassandra reads to form a batch of partitions to downsample
cass-write-batch-time = 3s
cass-write-batch-size = 250

# amount of parallelism to introduce in the spark job. This controls number of spark partitions
# increase if the number of splits seen in cassandra reads is low and spark jobs are slow.
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/filodb.core/query/QueryContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ final case class QueryContext(origQueryParams: TsdbQueryParams = UnavailableProm
spreadOverride: Option[SpreadProvider] = None,
queryTimeoutMillis: Int = 30000,
sampleLimit: Int = 1000000,
groupByCardLimit: Int = 100000,
joinQueryCardLimit: Int = 100000,
shardOverrides: Option[Seq[Int]] = None,
queryId: String = UUID.randomUUID().toString,
submitTime: Long = System.currentTimeMillis())
Expand Down
2 changes: 2 additions & 0 deletions http/src/main/scala/filodb/http/HttpSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ class HttpSettings(config: Config, val filoSettings: FilodbSettings) {
lazy val queryDefaultSpread = config.getInt("filodb.spread-default")
lazy val querySampleLimit = config.getInt("filodb.query.sample-limit")
lazy val queryAskTimeout = config.as[FiniteDuration]("filodb.query.ask-timeout")
lazy val queryBinaryJoinCardLimit = config.getInt("filodb.query.join-cardinality-limit")
lazy val queryGroupByCardLimit = config.getInt("filodb.query.group-by-cardinality-limit")
}
17 changes: 13 additions & 4 deletions query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ final case class ReduceAggregateExec(queryContext: QueryContext,
val task = for { schema <- firstSchema }
yield {
val aggregator = RowAggregator(aggrOp, aggrParams, schema)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = true, results, rv => rv.key)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = true, results, rv => rv.key,
querySession.qContext.groupByCardLimit)
}
Observable.fromTask(task).flatten
}
Expand Down Expand Up @@ -76,10 +77,12 @@ final case class AggregateMapReduce(aggrOp: AggregationOperator,
sourceSchema.fixedVectorLen.filter(_ <= querySession.queryConfig.fastReduceMaxWindows).map { numWindows =>
RangeVectorAggregator.fastReduce(aggregator, false, source, numWindows)
}.getOrElse {
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping,
querySession.qContext.groupByCardLimit)
}
} else {
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping)
RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping,
querySession.qContext.groupByCardLimit)
}
}

Expand Down Expand Up @@ -132,11 +135,17 @@ object RangeVectorAggregator extends StrictLogging {
def mapReduce(rowAgg: RowAggregator,
skipMapPhase: Boolean,
source: Observable[RangeVector],
grouping: RangeVector => RangeVectorKey): Observable[RangeVector] = {
grouping: RangeVector => RangeVectorKey,
cardinalityLimit: Int = Int.MaxValue): Observable[RangeVector] = {
// reduce the range vectors using the foldLeft construct. This results in one aggregate per group.
val task = source.toListL.map { rvs =>
// now reduce each group and create one result range vector per group
val groupedResult = mapReduceInternal(rvs, rowAgg, skipMapPhase, grouping)

// if group-by cardinality breaches the limit, throw exception
if (groupedResult.size > cardinalityLimit)
throw new BadQueryException(s"This query results in more than $cardinalityLimit group-by cardinality limit. " +
s"Try applying more filters")
groupedResult.map { case (rvk, aggHolder) =>
val rowIterator = new CustomCloseCursor(aggHolder.map(_.toRowReader))(aggHolder.close())
IteratorBackedRangeVector(rvk, rowIterator)
Expand Down
14 changes: 11 additions & 3 deletions query/src/main/scala/filodb/query/exec/BinaryJoinExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ final case class BinaryJoinExec(queryContext: QueryContext,

protected def args: String = s"binaryOp=$binaryOp, on=$on, ignoring=$ignoring"

//scalastyle:off method.length
protected[exec] def compose(childResponses: Observable[(QueryResponse, Int)],
firstSchema: Task[ResultSchema],
querySession: QuerySession): Observable[RangeVector] = {
val taskOfResults = childResponses.map {
case (QueryResult(_, _, result), _)
if (result.size > queryContext.joinQueryCardLimit && cardinality == Cardinality.OneToOne) =>
throw new BadQueryException(s"This query results in more than ${queryContext.joinQueryCardLimit} " +
s"join cardinality. Try applying more filters.")
case (QueryResult(_, _, result), i) => (result, i)
case (QueryError(_, ex), _) => throw ex
}.toListL.map { resp =>
Expand All @@ -73,12 +78,10 @@ final case class BinaryJoinExec(queryContext: QueryContext,
// require(resp.size == lhs.size + rhs.size, "Did not get sufficient responses for LHS and RHS")
val lhsRvs = resp.filter(_._2 < lhs.size).flatMap(_._1)
val rhsRvs = resp.filter(_._2 >= lhs.size).flatMap(_._1)

// figure out which side is the "one" side
val (oneSide, otherSide, lhsIsOneSide) =
if (cardinality == Cardinality.OneToMany) (lhsRvs, rhsRvs, true)
else (rhsRvs, lhsRvs, false)

// load "one" side keys in a hashmap
val oneSideMap = new mutable.HashMap[Map[Utf8Str, Utf8Str], RangeVector]()
oneSide.foreach { rv =>
Expand All @@ -90,7 +93,6 @@ final case class BinaryJoinExec(queryContext: QueryContext,
}
oneSideMap.put(jk, rv)
}

// keep a hashset of result range vector keys to help ensure uniqueness of result range vectors
val resultKeySet = new mutable.HashSet[RangeVectorKey]()
// iterate across the the "other" side which could be one or many and perform the binary operation
Expand All @@ -102,6 +104,12 @@ final case class BinaryJoinExec(queryContext: QueryContext,
throw new BadQueryException(s"Non-unique result vectors found for $resKey. " +
s"Use grouping to create unique matching")
resultKeySet.add(resKey)

// OneToOne cardinality case is already handled. this condition handles OneToMany case
if (resultKeySet.size > queryContext.joinQueryCardLimit)
throw new BadQueryException(s"This query results in more than ${queryContext.joinQueryCardLimit} " +
s"join cardinality. Try applying more filters.")

val res = if (lhsIsOneSide) binOp(rvOne.rows, rvOther.rows) else binOp(rvOther.rows, rvOne.rows)
IteratorBackedRangeVector(resKey, res)
}
Expand Down
50 changes: 50 additions & 0 deletions query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalatest.{FunSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException

import filodb.core.metadata.Column.ColumnType
import filodb.core.query._
Expand Down Expand Up @@ -358,4 +359,53 @@ class BinaryJoinExecSpec extends FunSpec with Matchers with ScalaFutures {

result.map(_.key).toSet.size shouldEqual 200
}

it("should throw BadQueryException - one-to-one with ignoring - cardinality limit 1") {
val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1
val execPlan = BinaryJoinExec(queryContext, dummyDispatcher,
Array(dummyPlan), // cannot be empty as some compose's rely on the schema
new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute
BinaryOperator.ADD,
Cardinality.OneToOne,
Nil, Seq("tag2"), Nil, "__name__")

// scalastyle:off
val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
// val lhs = QueryResult("someId", null, samplesLhs.filter(rv => rv.key.labelValues.get(ZeroCopyUTF8String("tag2")).get.equals("tag1-1")).map(rv => SerializedRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
// scalastyle:on

// actual query results into 2 rows. since limit is 1, this results in BadQueryException
val thrown = intercept[TestFailedException] {
execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession)
.toListL.runAsync.futureValue
}
thrown.getCause.getClass shouldEqual classOf[BadQueryException]
thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." +
" Try applying more filters."
}

it("should throw BadQueryException - one-to-one with on - cardinality limit 1") {
val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1
val execPlan = BinaryJoinExec(queryContext, dummyDispatcher,
Array(dummyPlan), // cannot be empty as some compose's rely on the schema
new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute
BinaryOperator.ADD,
Cardinality.OneToOne,
Seq("tag1", "job"), Nil, Nil, "__name__")

// scalastyle:off
val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializedRangeVector(rv, schema)))
// scalastyle:on

// actual query results into 2 rows. since limit is 1, this results in BadQueryException
val thrown = intercept[TestFailedException] {
execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession)
.toListL.runAsync.futureValue
}
thrown.getCause.getClass shouldEqual classOf[BadQueryException]
thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." +
" Try applying more filters."
}
}
Loading

0 comments on commit 22c1da6

Please sign in to comment.