From 2600daa896127459214dad0b98db4b1ca854f333 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Tue, 7 Jul 2020 13:16:56 -0700 Subject: [PATCH 1/3] perf(sparkJobs): Reducing cass batch size to make c* queries smaller (#809) * Reduce downsample batch size from 10k to 500 to reduce cass query size - more queries instead of few very big queries. * Changed spark job to use sync cass calls since they are single threaded anyway - I was encountering unit test bugs in the observable.toIterator conversion. * Remove timed batches since batch sizes were not consistent. --- .../columnstore/CassandraColumnStore.scala | 34 +++++++------ .../columnstore/IngestionTimeIndexTable.scala | 10 ++-- .../columnstore/TimeSeriesChunksTable.scala | 17 +++++++ .../CassandraColumnStoreSpec.scala | 50 ++++++++++++++++++- core/src/main/resources/filodb-defaults.conf | 5 +- .../downsampler/chunk/DownsamplerMain.scala | 10 ++-- .../chunk/DownsamplerSettings.scala | 2 - 7 files changed, 93 insertions(+), 35 deletions(-) diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala index 46e8b23285..012cca0d99 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala @@ -187,33 +187,37 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { * handle this case. */ // scalastyle:off parameter.number - def getChunksByIngestionTimeRange(datasetRef: DatasetRef, - splits: Iterator[ScanSplit], - ingestionTimeStart: Long, - ingestionTimeEnd: Long, - userTimeStart: Long, - endTimeExclusive: Long, - maxChunkTime: Long, - batchSize: Int, - batchTime: FiniteDuration): Observable[Seq[RawPartData]] = { - val partKeys = Observable.fromIterator(splits).flatMap { + def getChunksByIngestionTimeRangeNoAsync(datasetRef: DatasetRef, + splits: Iterator[ScanSplit], + ingestionTimeStart: Long, + ingestionTimeEnd: Long, + userTimeStart: Long, + endTimeExclusive: Long, + maxChunkTime: Long, + batchSize: Int): Iterator[Seq[RawPartData]] = { + val partKeys = splits.flatMap { case split: CassandraTokenRangeSplit => val indexTable = getOrCreateIngestionTimeIndexTable(datasetRef) logger.debug(s"Querying cassandra for partKeys for split=$split ingestionTimeStart=$ingestionTimeStart " + s"ingestionTimeEnd=$ingestionTimeEnd") - indexTable.scanPartKeysByIngestionTime(split.tokens, ingestionTimeStart, ingestionTimeEnd) + indexTable.scanPartKeysByIngestionTimeNoAsync(split.tokens, ingestionTimeStart, ingestionTimeEnd) case split => throw new UnsupportedOperationException(s"Unknown split type $split seen") } import filodb.core.Iterators._ val chunksTable = getOrCreateChunkTable(datasetRef) - partKeys.bufferTimedAndCounted(batchTime, batchSize).map { parts => + partKeys.sliding(batchSize, batchSize).map { parts => logger.debug(s"Querying cassandra for chunks from ${parts.size} partitions userTimeStart=$userTimeStart " + s"endTimeExclusive=$endTimeExclusive maxChunkTime=$maxChunkTime") - // TODO evaluate if we can increase parallelism here. This needs to be tuneable - // based on how much faster downsampling should run, and how much additional read load cassandra can take. - chunksTable.readRawPartitionRangeBB(parts, userTimeStart - maxChunkTime, endTimeExclusive).toIterator().toSeq + // This could be more parallel, but decision was made to control parallelism at one place: In spark (via its + // parallelism configuration. Revisit if needed later. + val batchReadSpan = Kamon.spanBuilder("cassandra-per-batch-data-read-latency").start() + try { + chunksTable.readRawPartitionRangeBBNoAsync(parts, userTimeStart - maxChunkTime, endTimeExclusive) + } finally { + batchReadSpan.finish() + } } } diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala index 318b1fce7c..4ae4bdfb03 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala @@ -5,7 +5,6 @@ import java.nio.ByteBuffer import scala.concurrent.{ExecutionContext, Future} import com.datastax.driver.core.{ConsistencyLevel, ResultSet, Row} -import monix.reactive.Observable import filodb.cassandra.FiloCassandraConnector import filodb.core._ @@ -101,10 +100,10 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef, } } - def scanPartKeysByIngestionTime(tokens: Seq[(String, String)], - ingestionTimeStart: Long, - ingestionTimeEnd: Long): Observable[ByteBuffer] = { - val it = tokens.iterator.flatMap { case (start, end) => + def scanPartKeysByIngestionTimeNoAsync(tokens: Seq[(String, String)], + ingestionTimeStart: Long, + ingestionTimeEnd: Long): Iterator[ByteBuffer] = { + tokens.iterator.flatMap { case (start, end) => /* * FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates * Long based tokens. If other partitioners are used, this can potentially break. @@ -117,7 +116,6 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef, session.execute(stmt).iterator.asScala .map { row => row.getBytes("partition") } } - Observable.fromIterator(it).handleObservableErrors } /** diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala index cf21cea524..56c44768ca 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/TimeSeriesChunksTable.scala @@ -205,6 +205,23 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef, } yield rpd } + /** + * Not an async call - limit number of partitions queried at a time + */ + def readRawPartitionRangeBBNoAsync(partitions: Seq[ByteBuffer], + startTime: Long, + endTimeExclusive: Long): Seq[RawPartData] = { + val query = readChunkRangeCql.bind().setList(0, partitions.asJava, classOf[ByteBuffer]) + .setLong(1, chunkID(startTime, 0)) + .setLong(2, chunkID(endTimeExclusive, 0)) + session.execute(query).iterator().asScala + .map { row => (row.getBytes(0), chunkSetFromRow(row, 1)) } + .sortedGroupBy(_._1) + .map { case (partKeyBuffer, chunkSetIt) => + RawPartData(partKeyBuffer.array, chunkSetIt.map(_._2).toBuffer) + }.toSeq + } + def scanPartitionsBySplit(tokens: Seq[(String, String)]): Observable[RawPartData] = { val res: Observable[Future[Iterator[RawPartData]]] = Observable.fromIterable(tokens).map { case (start, end) => diff --git a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala index f1466f3031..99c60fb42e 100644 --- a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala +++ b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala @@ -16,8 +16,8 @@ import filodb.core._ import filodb.core.binaryrecord2.RecordBuilder import filodb.core.metadata.{Dataset, Schemas} import filodb.core.store.{ChunkSet, ChunkSetInfo, ColumnStoreSpec, PartKeyRecord} -import filodb.memory.BinaryRegionLarge -import filodb.memory.format.UnsafeUtils +import filodb.memory.{BinaryRegionLarge, NativeMemoryManager} +import filodb.memory.format.{TupleRowReader, UnsafeUtils} import filodb.memory.format.ZeroCopyUTF8String._ class CassandraColumnStoreSpec extends ColumnStoreSpec { @@ -27,6 +27,21 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { lazy val colStore = new CassandraColumnStore(config, s, session) lazy val metaStore = new CassandraMetaStore(config.getConfig("cassandra"), session) + val nativeMemoryManager = new NativeMemoryManager(100000000L, Map.empty) + val promDataset = Dataset("prometheus", Schemas.gauge) + + // First create the tables in C* + override def beforeAll(): Unit = { + super.beforeAll() + colStore.initialize(promDataset.ref, 1).futureValue + colStore.truncate(promDataset.ref, 1).futureValue + } + + override def afterAll(): Unit = { + super.afterAll() + nativeMemoryManager.shutdown() + } + "getScanSplits" should "return splits from Cassandra" in { // Single split, token_start should equal token_end val singleSplits = colStore.getScanSplits(dataset.ref).asInstanceOf[Seq[CassandraTokenRangeSplit]] @@ -224,4 +239,35 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { parts(0).chunkSetsTimeOrdered should have length (1) parts(0).chunkSetsTimeOrdered(0).vectors.toSeq shouldEqual sourceChunks.head.chunks } + + "getChunksByIngestionTimeRangeNoAsync" should "batch partitions properly" in { + + val gaugeName = "my_gauge" + val seriesTags = Map("_ws_".utf8 -> "my_ws".utf8) + val firstSampleTime = 74373042000L + val partBuilder = new RecordBuilder(nativeMemoryManager) + val ingestTime = 1594130687316L + val rows = Seq(TupleRowReader((Some(firstSampleTime), Some(0.0d)))) + val chunksets = for { i <- 0 until 1050 } yield { + val partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags) + ChunkSet(Schemas.gauge.data, partKey, ingestTime, rows, nativeMemoryManager) + } + colStore.write(promDataset.ref, Observable.fromIterable(chunksets)).futureValue + + val batches = colStore.getChunksByIngestionTimeRangeNoAsync( + promDataset.ref, + colStore.getScanSplits(promDataset.ref).iterator, + ingestTime - 1, + ingestTime + 1, + firstSampleTime - 1, + firstSampleTime + 1, + 10L, + 100 + ).toList + + batches.size shouldEqual 11 // 100 rows per batch, 1050 rows => 11 batches + batches.zipWithIndex.foreach { case (b, i) => + b.size shouldEqual (if (i == 10) 50 else 100) + } + } } diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index b74fc6e635..c7d1a31a4b 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -233,10 +233,7 @@ filodb { # cass-session-provider-fqcn = fqcn # Number of time series to operate on at one time. Reduce if there is much less memory available - cass-write-batch-size = 10000 - - # Maximum time to wait during cassandra reads to form a batch of partitions to downsample - cass-write-batch-time = 3s + cass-write-batch-size = 500 # amount of parallelism to introduce in the spark job. This controls number of spark partitions # increase if the number of splits seen in cassandra reads is low and spark jobs are slow. diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala index f6e8632d49..66df533946 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala @@ -108,17 +108,15 @@ class Downsampler(settings: DownsamplerSettings, batchDownsampler: BatchDownsamp .mapPartitions { splitIter => Kamon.init() KamonShutdownHook.registerShutdownHook() - import filodb.core.Iterators._ val rawDataSource = batchDownsampler.rawCassandraColStore - val batchReadSpan = Kamon.spanBuilder("cassandra-raw-data-read-latency").start() - val batchIter = rawDataSource.getChunksByIngestionTimeRange(datasetRef = batchDownsampler.rawDatasetRef, + val batchIter = rawDataSource.getChunksByIngestionTimeRangeNoAsync( + datasetRef = batchDownsampler.rawDatasetRef, splits = splitIter, ingestionTimeStart = ingestionTimeStart, ingestionTimeEnd = ingestionTimeEnd, userTimeStart = userTimeStart, endTimeExclusive = userTimeEndExclusive, maxChunkTime = settings.rawDatasetIngestionConfig.storeConfig.maxChunkTime.toMillis, - batchSize = settings.batchSize, batchTime = settings.batchTime).toIterator() - batchReadSpan.finish() - batchIter // iterator of batches + batchSize = settings.batchSize) + batchIter } .foreach { rawPartsBatch => Kamon.init() diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala index 4ee4f055cf..bb0b564159 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala @@ -54,8 +54,6 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ @transient lazy val batchSize = downsamplerConfig.getInt("cass-write-batch-size") - @transient lazy val batchTime = downsamplerConfig.as[FiniteDuration]("cass-write-batch-time") - @transient lazy val splitsPerNode = downsamplerConfig.getInt("splits-per-node") @transient lazy val cassWriteTimeout = downsamplerConfig.as[FiniteDuration]("cassandra-write-timeout") From 961cfbcce8c90cdf745eee933668925f3de682f7 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Tue, 7 Jul 2020 23:22:43 -0700 Subject: [PATCH 2/3] bug(cassandra): Read from ingestion index table had duplicates (#812) Scan on ingestion index table results in duplicates since there is a row per matching chunk. Remove duplicates from the query result before batching. --- .../columnstore/CassandraColumnStore.scala | 2 -- .../columnstore/IngestionTimeIndexTable.scala | 3 +-- .../columnstore/CassandraColumnStoreSpec.scala | 12 ++++++++---- core/src/main/resources/filodb-defaults.conf | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala index 012cca0d99..ec2449967a 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala @@ -204,8 +204,6 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { case split => throw new UnsupportedOperationException(s"Unknown split type $split seen") } - import filodb.core.Iterators._ - val chunksTable = getOrCreateChunkTable(datasetRef) partKeys.sliding(batchSize, batchSize).map { parts => logger.debug(s"Querying cassandra for chunks from ${parts.size} partitions userTimeStart=$userTimeStart " + diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala index 4ae4bdfb03..704640df3e 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala @@ -113,8 +113,7 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef, end.toLong: java.lang.Long, ingestionTimeStart: java.lang.Long, ingestionTimeEnd: java.lang.Long) - session.execute(stmt).iterator.asScala - .map { row => row.getBytes("partition") } + session.execute(stmt).asScala.map { row => row.getBytes("partition") }.toSet.iterator } } diff --git a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala index 99c60fb42e..c1318173bb 100644 --- a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala +++ b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala @@ -247,9 +247,12 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { val firstSampleTime = 74373042000L val partBuilder = new RecordBuilder(nativeMemoryManager) val ingestTime = 1594130687316L - val rows = Seq(TupleRowReader((Some(firstSampleTime), Some(0.0d)))) - val chunksets = for { i <- 0 until 1050 } yield { - val partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags) + val chunksets = for { + i <- 0 until 1050 + partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags) + c <- 0 until 3 + } yield { + val rows = Seq(TupleRowReader((Some(firstSampleTime + c), Some(0.0d)))) ChunkSet(Schemas.gauge.data, partKey, ingestTime, rows, nativeMemoryManager) } colStore.write(promDataset.ref, Observable.fromIterable(chunksets)).futureValue @@ -260,7 +263,7 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { ingestTime - 1, ingestTime + 1, firstSampleTime - 1, - firstSampleTime + 1, + firstSampleTime + 5, 10L, 100 ).toList @@ -268,6 +271,7 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { batches.size shouldEqual 11 // 100 rows per batch, 1050 rows => 11 batches batches.zipWithIndex.foreach { case (b, i) => b.size shouldEqual (if (i == 10) 50 else 100) + b.foreach(_.chunkSetsTimeOrdered.size shouldEqual 3) } } } diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index c7d1a31a4b..3c313e8d94 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -233,7 +233,7 @@ filodb { # cass-session-provider-fqcn = fqcn # Number of time series to operate on at one time. Reduce if there is much less memory available - cass-write-batch-size = 500 + cass-write-batch-size = 250 # amount of parallelism to introduce in the spark job. This controls number of spark partitions # increase if the number of splits seen in cassandra reads is low and spark jobs are slow. From 3c04bcf99890657e92af23714e9c597dfa11b022 Mon Sep 17 00:00:00 2001 From: sherali42 <43357447+sherali42@users.noreply.github.com> Date: Wed, 8 Jul 2020 11:12:00 -0700 Subject: [PATCH 3/3] misc(query): add cardinality limits to binary-join, group-by queries (#788) --- core/src/main/resources/filodb-defaults.conf | 6 ++ .../filodb.core/query/QueryContext.scala | 2 + .../main/scala/filodb/http/HttpSettings.scala | 2 + .../query/exec/AggrOverRangeVectors.scala | 17 +++- .../filodb/query/exec/BinaryJoinExec.scala | 14 ++- .../query/exec/BinaryJoinExecSpec.scala | 50 +++++++++++ .../query/exec/BinaryJoinGroupingSpec.scala | 88 ++++++++++++++++++- 7 files changed, 171 insertions(+), 8 deletions(-) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 3c313e8d94..6d5a7eaca3 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -150,6 +150,12 @@ filodb { # Maximum number of samples to return in a query sample-limit = 1000000 + # Binary Join Cardinality limit + join-cardinality-limit = 25000 + + # Group by Cardinality limit + group-by-cardinality-limit = 1000 + # Minimum step required for a query min-step = 5 seconds diff --git a/core/src/main/scala/filodb.core/query/QueryContext.scala b/core/src/main/scala/filodb.core/query/QueryContext.scala index bc2dc5ebd2..f15babbf3d 100644 --- a/core/src/main/scala/filodb.core/query/QueryContext.scala +++ b/core/src/main/scala/filodb.core/query/QueryContext.scala @@ -23,6 +23,8 @@ final case class QueryContext(origQueryParams: TsdbQueryParams = UnavailableProm spreadOverride: Option[SpreadProvider] = None, queryTimeoutMillis: Int = 30000, sampleLimit: Int = 1000000, + groupByCardLimit: Int = 100000, + joinQueryCardLimit: Int = 100000, shardOverrides: Option[Seq[Int]] = None, queryId: String = UUID.randomUUID().toString, submitTime: Long = System.currentTimeMillis()) diff --git a/http/src/main/scala/filodb/http/HttpSettings.scala b/http/src/main/scala/filodb/http/HttpSettings.scala index 4ac8fad6b8..39bacc438f 100644 --- a/http/src/main/scala/filodb/http/HttpSettings.scala +++ b/http/src/main/scala/filodb/http/HttpSettings.scala @@ -15,4 +15,6 @@ class HttpSettings(config: Config, val filoSettings: FilodbSettings) { lazy val queryDefaultSpread = config.getInt("filodb.spread-default") lazy val querySampleLimit = config.getInt("filodb.query.sample-limit") lazy val queryAskTimeout = config.as[FiniteDuration]("filodb.query.ask-timeout") + lazy val queryBinaryJoinCardLimit = config.getInt("filodb.query.join-cardinality-limit") + lazy val queryGroupByCardLimit = config.getInt("filodb.query.group-by-cardinality-limit") } diff --git a/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala b/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala index 6638c7ce22..fb204c266b 100644 --- a/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala +++ b/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala @@ -35,7 +35,8 @@ final case class ReduceAggregateExec(queryContext: QueryContext, val task = for { schema <- firstSchema } yield { val aggregator = RowAggregator(aggrOp, aggrParams, schema) - RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = true, results, rv => rv.key) + RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = true, results, rv => rv.key, + querySession.qContext.groupByCardLimit) } Observable.fromTask(task).flatten } @@ -76,10 +77,12 @@ final case class AggregateMapReduce(aggrOp: AggregationOperator, sourceSchema.fixedVectorLen.filter(_ <= querySession.queryConfig.fastReduceMaxWindows).map { numWindows => RangeVectorAggregator.fastReduce(aggregator, false, source, numWindows) }.getOrElse { - RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping) + RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping, + querySession.qContext.groupByCardLimit) } } else { - RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping) + RangeVectorAggregator.mapReduce(aggregator, skipMapPhase = false, source, grouping, + querySession.qContext.groupByCardLimit) } } @@ -132,11 +135,17 @@ object RangeVectorAggregator extends StrictLogging { def mapReduce(rowAgg: RowAggregator, skipMapPhase: Boolean, source: Observable[RangeVector], - grouping: RangeVector => RangeVectorKey): Observable[RangeVector] = { + grouping: RangeVector => RangeVectorKey, + cardinalityLimit: Int = Int.MaxValue): Observable[RangeVector] = { // reduce the range vectors using the foldLeft construct. This results in one aggregate per group. val task = source.toListL.map { rvs => // now reduce each group and create one result range vector per group val groupedResult = mapReduceInternal(rvs, rowAgg, skipMapPhase, grouping) + + // if group-by cardinality breaches the limit, throw exception + if (groupedResult.size > cardinalityLimit) + throw new BadQueryException(s"This query results in more than $cardinalityLimit group-by cardinality limit. " + + s"Try applying more filters") groupedResult.map { case (rvk, aggHolder) => val rowIterator = new CustomCloseCursor(aggHolder.map(_.toRowReader))(aggHolder.close()) IteratorBackedRangeVector(rvk, rowIterator) diff --git a/query/src/main/scala/filodb/query/exec/BinaryJoinExec.scala b/query/src/main/scala/filodb/query/exec/BinaryJoinExec.scala index 11efb47024..ed2404649b 100644 --- a/query/src/main/scala/filodb/query/exec/BinaryJoinExec.scala +++ b/query/src/main/scala/filodb/query/exec/BinaryJoinExec.scala @@ -61,10 +61,15 @@ final case class BinaryJoinExec(queryContext: QueryContext, protected def args: String = s"binaryOp=$binaryOp, on=$on, ignoring=$ignoring" + //scalastyle:off method.length protected[exec] def compose(childResponses: Observable[(QueryResponse, Int)], firstSchema: Task[ResultSchema], querySession: QuerySession): Observable[RangeVector] = { val taskOfResults = childResponses.map { + case (QueryResult(_, _, result), _) + if (result.size > queryContext.joinQueryCardLimit && cardinality == Cardinality.OneToOne) => + throw new BadQueryException(s"This query results in more than ${queryContext.joinQueryCardLimit} " + + s"join cardinality. Try applying more filters.") case (QueryResult(_, _, result), i) => (result, i) case (QueryError(_, ex), _) => throw ex }.toListL.map { resp => @@ -73,12 +78,10 @@ final case class BinaryJoinExec(queryContext: QueryContext, // require(resp.size == lhs.size + rhs.size, "Did not get sufficient responses for LHS and RHS") val lhsRvs = resp.filter(_._2 < lhs.size).flatMap(_._1) val rhsRvs = resp.filter(_._2 >= lhs.size).flatMap(_._1) - // figure out which side is the "one" side val (oneSide, otherSide, lhsIsOneSide) = if (cardinality == Cardinality.OneToMany) (lhsRvs, rhsRvs, true) else (rhsRvs, lhsRvs, false) - // load "one" side keys in a hashmap val oneSideMap = new mutable.HashMap[Map[Utf8Str, Utf8Str], RangeVector]() oneSide.foreach { rv => @@ -90,7 +93,6 @@ final case class BinaryJoinExec(queryContext: QueryContext, } oneSideMap.put(jk, rv) } - // keep a hashset of result range vector keys to help ensure uniqueness of result range vectors val resultKeySet = new mutable.HashSet[RangeVectorKey]() // iterate across the the "other" side which could be one or many and perform the binary operation @@ -102,6 +104,12 @@ final case class BinaryJoinExec(queryContext: QueryContext, throw new BadQueryException(s"Non-unique result vectors found for $resKey. " + s"Use grouping to create unique matching") resultKeySet.add(resKey) + + // OneToOne cardinality case is already handled. this condition handles OneToMany case + if (resultKeySet.size > queryContext.joinQueryCardLimit) + throw new BadQueryException(s"This query results in more than ${queryContext.joinQueryCardLimit} " + + s"join cardinality. Try applying more filters.") + val res = if (lhsIsOneSide) binOp(rvOne.rows, rvOther.rows) else binOp(rvOther.rows, rvOne.rows) IteratorBackedRangeVector(resKey, res) } diff --git a/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala b/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala index ce328bb810..e6d3005b7c 100644 --- a/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala @@ -9,6 +9,7 @@ import monix.execution.Scheduler.Implicits.global import monix.reactive.Observable import org.scalatest.{FunSpec, Matchers} import org.scalatest.concurrent.ScalaFutures +import org.scalatest.exceptions.TestFailedException import filodb.core.metadata.Column.ColumnType import filodb.core.query._ @@ -358,4 +359,53 @@ class BinaryJoinExecSpec extends FunSpec with Matchers with ScalaFutures { result.map(_.key).toSet.size shouldEqual 200 } + + it("should throw BadQueryException - one-to-one with ignoring - cardinality limit 1") { + val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1 + val execPlan = BinaryJoinExec(queryContext, dummyDispatcher, + Array(dummyPlan), // cannot be empty as some compose's rely on the schema + new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute + BinaryOperator.ADD, + Cardinality.OneToOne, + Nil, Seq("tag2"), Nil, "__name__") + + // scalastyle:off + val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializedRangeVector(rv, schema))) + // val lhs = QueryResult("someId", null, samplesLhs.filter(rv => rv.key.labelValues.get(ZeroCopyUTF8String("tag2")).get.equals("tag1-1")).map(rv => SerializedRangeVector(rv, schema))) + val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializedRangeVector(rv, schema))) + // scalastyle:on + + // actual query results into 2 rows. since limit is 1, this results in BadQueryException + val thrown = intercept[TestFailedException] { + execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession) + .toListL.runAsync.futureValue + } + thrown.getCause.getClass shouldEqual classOf[BadQueryException] + thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." + + " Try applying more filters." + } + + it("should throw BadQueryException - one-to-one with on - cardinality limit 1") { + val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1 + val execPlan = BinaryJoinExec(queryContext, dummyDispatcher, + Array(dummyPlan), // cannot be empty as some compose's rely on the schema + new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute + BinaryOperator.ADD, + Cardinality.OneToOne, + Seq("tag1", "job"), Nil, Nil, "__name__") + + // scalastyle:off + val lhs = QueryResult("someId", null, samplesLhsGrouping.map(rv => SerializedRangeVector(rv, schema))) + val rhs = QueryResult("someId", null, samplesRhsGrouping.map(rv => SerializedRangeVector(rv, schema))) + // scalastyle:on + + // actual query results into 2 rows. since limit is 1, this results in BadQueryException + val thrown = intercept[TestFailedException] { + execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession) + .toListL.runAsync.futureValue + } + thrown.getCause.getClass shouldEqual classOf[BadQueryException] + thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." + + " Try applying more filters." + } } diff --git a/query/src/test/scala/filodb/query/exec/BinaryJoinGroupingSpec.scala b/query/src/test/scala/filodb/query/exec/BinaryJoinGroupingSpec.scala index 3573dc11ad..59a746fc03 100644 --- a/query/src/test/scala/filodb/query/exec/BinaryJoinGroupingSpec.scala +++ b/query/src/test/scala/filodb/query/exec/BinaryJoinGroupingSpec.scala @@ -9,6 +9,7 @@ import monix.execution.Scheduler.Implicits.global import monix.reactive.Observable import org.scalatest.{FunSpec, Matchers} import org.scalatest.concurrent.ScalaFutures +import org.scalatest.exceptions.TestFailedException import filodb.core.metadata.Column.ColumnType import filodb.core.query._ @@ -118,7 +119,6 @@ class BinaryJoinGroupingSpec extends FunSpec with Matchers with ScalaFutures { ) it("should join many-to-one with on ") { - val samplesRhs2 = scala.util.Random.shuffle(sampleNodeRole.toList) // they may come out of order val execPlan = BinaryJoinExec(QueryContext(), dummyDispatcher, @@ -388,4 +388,90 @@ class BinaryJoinGroupingSpec extends FunSpec with Matchers with ScalaFutures { result.size shouldEqual 2 result.map(_.key.labelValues) sameElements(expectedLabels) shouldEqual true } + + it("should throw BadQueryException - many-to-one with on - cardinality limit 1") { + val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1 + val samplesRhs2 = scala.util.Random.shuffle(sampleNodeRole.toList) // they may come out of order + + val execPlan = BinaryJoinExec(queryContext, dummyDispatcher, + Array(dummyPlan), // cannot be empty as some compose's rely on the schema + new Array[ExecPlan](1), // empty since we test compose, not execute or doExecute + BinaryOperator.MUL, + Cardinality.ManyToOne, + Seq("instance"), Nil, Seq("role"), "__name__") + + // scalastyle:off + val lhs = QueryResult("someId", null, sampleNodeCpu.map(rv => SerializedRangeVector(rv, schema))) + val rhs = QueryResult("someId", null, samplesRhs2.map(rv => SerializedRangeVector(rv, schema))) + // scalastyle:on + + // actual query results into 2 rows. since limit is 1, this results in BadQueryException + val thrown = intercept[TestFailedException] { + execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession) + .toListL.runAsync.futureValue + } + + thrown.getCause.getClass shouldEqual classOf[BadQueryException] + thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." + + " Try applying more filters." + } + + it("should throw BadQueryException - many-to-one with ignoring - cardinality limit 1") { + val queryContext = QueryContext(joinQueryCardLimit = 1) // set join card limit to 1 + val samplesRhs2 = scala.util.Random.shuffle(sampleNodeRole.toList) // they may come out of order + + val execPlan = BinaryJoinExec(queryContext, dummyDispatcher, + Array(dummyPlan), + new Array[ExecPlan](1), + BinaryOperator.MUL, + Cardinality.ManyToOne, + Nil, Seq("role", "mode"), Seq("role"), "__name__") + + // scalastyle:off + val lhs = QueryResult("someId", null, sampleNodeCpu.map(rv => SerializedRangeVector(rv, schema))) + val rhs = QueryResult("someId", null, samplesRhs2.map(rv => SerializedRangeVector(rv, schema))) + // scalastyle:on + + // actual query results into 2 rows. since limit is 1, this results in BadQueryException + val thrown = intercept[TestFailedException] { + execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession) + .toListL.runAsync.futureValue + } + + thrown.getCause.getClass shouldEqual classOf[BadQueryException] + thrown.getCause.getMessage shouldEqual "This query results in more than 1 join cardinality." + + " Try applying more filters." + } + + it("should throw BadQueryException - many-to-one with by and grouping without arguments - cardinality limit 1") { + val queryContext = QueryContext(joinQueryCardLimit = 3) // set join card limit to 3 + val agg = RowAggregator(AggregationOperator.Sum, Nil, tvSchema) + val aggMR = AggregateMapReduce(AggregationOperator.Sum, Nil, Nil, Seq("instance", "job")) + val mapped = aggMR(Observable.fromIterable(sampleNodeCpu), querySession, 1000, tvSchema) + + val resultObs4 = RangeVectorAggregator.mapReduce(agg, true, mapped, rv=>rv.key) + val samplesRhs = resultObs4.toListL.runAsync.futureValue + + val execPlan = BinaryJoinExec(queryContext, dummyDispatcher, + Array(dummyPlan), + new Array[ExecPlan](1), + BinaryOperator.DIV, + Cardinality.ManyToOne, + Seq("instance"), Nil, Nil, "__name__") + + // scalastyle:off + val lhs = QueryResult("someId", null, sampleNodeCpu.map(rv => SerializedRangeVector(rv, schema))) + val rhs = QueryResult("someId", null, samplesRhs.map(rv => SerializedRangeVector(rv, schema))) + // scalastyle:on + + // actual query results into 4 rows. since limit is 3, this results in BadQueryException + val thrown = intercept[TestFailedException] { + execPlan.compose(Observable.fromIterable(Seq((rhs, 1), (lhs, 0))), tvSchemaTask, querySession) + .toListL.runAsync.futureValue + } + + thrown.getCause.getClass shouldEqual classOf[BadQueryException] + thrown.getCause.getMessage shouldEqual "This query results in more than 3 join cardinality." + + " Try applying more filters." + } }