diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionIndexTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionIndexTable.scala index db607b15e3..2d028686b9 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionIndexTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/PartitionIndexTable.scala @@ -40,7 +40,7 @@ sealed class PartitionIndexTable(val dataset: DatasetRef, |) WITH compression = { 'sstable_compression': '$sstableCompression'}""".stripMargin - lazy val readCql =s"SELECT segmentid, segment " + + lazy val readCql = s"SELECT segmentid, segment " + s"FROM $tableString WHERE shard = ? AND timebucket = ? order by segmentid asc" lazy val writePartitionCql = session.prepare( @@ -51,7 +51,7 @@ sealed class PartitionIndexTable(val dataset: DatasetRef, // fetch size should be low since each row is about an MB. Default fetchSize can result in ReadTimeouts at server val it = session.execute(new SimpleStatement(readCql, shard: JInt, timeBucket: JInt).setFetchSize(15)) .asScala.toIterator.map(row => { - PartKeyTimeBucketSegment(row.getInt("segmentid"), row.getBytes("segment")) + PartKeyTimeBucketSegment(row.getInt("segmentid"), row.getBytes("segment")) }) Observable.fromIterator(it) } diff --git a/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala b/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala index c49ee84d98..6038e26ecf 100644 --- a/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala +++ b/cassandra/src/main/scala/filodb.cassandra/metastore/CassandraMetaStore.scala @@ -59,12 +59,12 @@ class CassandraMetaStore(config: Config, filoSessionProvider: Option[FiloSession checkpointTable.writeCheckpoint(dataset, shardNum, groupNum, offset) } - def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int,Long]] = { + def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int, Long]] = { checkpointTable.readCheckpoints(dataset, shardNum) } def readEarliestCheckpoint(dataset: DatasetRef, shardNum: Int): Future[Long] = { - readCheckpoints(dataset,shardNum) map { m => + readCheckpoints(dataset, shardNum) map { m => if (m.values.isEmpty) Long.MinValue else m.values.min } } diff --git a/cassandra/src/main/scala/filodb.cassandra/metastore/CheckpointTable.scala b/cassandra/src/main/scala/filodb.cassandra/metastore/CheckpointTable.scala index f71eddb328..807a8b00f3 100644 --- a/cassandra/src/main/scala/filodb.cassandra/metastore/CheckpointTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/metastore/CheckpointTable.scala @@ -76,7 +76,7 @@ sealed class CheckpointTable(val config: Config, dataset.dataset, shardNum: JInt, groupNum: JInt, offset: JLong)) } - def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int,Long]] = { + def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int, Long]] = { session.executeAsync(readCheckpointCql.bind(dataset.database.getOrElse(""), dataset.dataset, shardNum: JInt)) .toIterator // future of Iterator diff --git a/cli/src/main/scala/filodb.cli/CliMain.scala b/cli/src/main/scala/filodb.cli/CliMain.scala index 47b153b2f7..85930008dc 100644 --- a/cli/src/main/scala/filodb.cli/CliMain.scala +++ b/cli/src/main/scala/filodb.cli/CliMain.scala @@ -13,6 +13,7 @@ import monix.reactive.Observable import filodb.coordinator._ import filodb.coordinator.client._ +import filodb.coordinator.client.QueryCommands.{SpreadChange, SpreadProvider, StaticSpreadProvider} import filodb.core._ import filodb.core.metadata.{Column, Dataset, DatasetOptions} import filodb.core.store._ @@ -60,6 +61,7 @@ class Arguments extends FieldArgs { var ignoreTagsOnPartitionKeyHash: Seq[String] = Nil var everyNSeconds: Option[String] = None var shards: Option[Seq[String]] = None + var spread: Option[Integer] = None } object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbClusterNode { @@ -116,7 +118,6 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste } def main(args: Arguments): Unit = { - val spread = config.getInt("default-spread") try { val timeout = args.timeoutSeconds.seconds args.command match { @@ -198,7 +199,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste require(args.host.nonEmpty && args.dataset.nonEmpty && args.matcher.nonEmpty, "--host, --dataset and --matcher must be defined") val remote = Client.standaloneClient(system, args.host.get, args.port) val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt), - timeout, args.shards.map(_.map(_.toInt)), spread) + timeout, args.shards.map(_.map(_.toInt)), args.spread) parseTimeSeriesMetadataQuery(remote, args.matcher.get, args.dataset.get, getQueryRange(args), options) @@ -206,7 +207,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste require(args.host.nonEmpty && args.dataset.nonEmpty && args.labelNames.nonEmpty, "--host, --dataset and --labelName must be defined") val remote = Client.standaloneClient(system, args.host.get, args.port) val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt), - timeout, args.shards.map(_.map(_.toInt)), spread) + timeout, args.shards.map(_.map(_.toInt)), args.spread) parseLabelValuesQuery(remote, args.labelNames, args.labelFilter, args.dataset.get, getQueryRange(args), options) @@ -216,7 +217,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined") val remote = Client.standaloneClient(system, args.host.get, args.port) val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt), - timeout, args.shards.map(_.map(_.toInt)), spread) + timeout, args.shards.map(_.map(_.toInt)), args.spread) parsePromQuery2(remote, query, args.dataset.get, getQueryRange(args), options) }.orElse { args.select.map { selectCols => @@ -267,13 +268,13 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste ignoreTagsOnPartitionKeyHash: Seq[String], timeout: FiniteDuration): Unit = { try { - val datasetObj = Dataset(dataset.dataset, partitionColumns, dataColumns, rowKeys, downsamplers) val options = DatasetOptions.DefaultOptions.copy(metricColumn = metricColumn, shardKeyColumns = shardKeyColumns, ignoreShardKeyColumnSuffixes = ignoreShardKeyColumnSuffixes, ignoreTagsOnPartitionKeyHash = ignoreTagsOnPartitionKeyHash) + val datasetObj = Dataset(dataset.dataset, partitionColumns, dataColumns, rowKeys, downsamplers, options) println(s"Creating dataset $dataset with options $options...") - client.createNewDataset(datasetObj.copy(options = options), dataset.database) + client.createNewDataset(datasetObj, dataset.database) exitCode = 0 } catch { case b: Dataset.BadSchemaError => @@ -339,7 +340,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste final case class QOptions(limit: Int, sampleLimit: Int, everyN: Option[Int], timeout: FiniteDuration, shardOverrides: Option[Seq[Int]], - spread: Int) + spread: Option[Integer]) def parseTimeSeriesMetadataQuery(client: LocalClient, query: String, dataset: String, timeParams: TimeRangeParams, @@ -364,7 +365,8 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions): Unit = { val ref = DatasetRef(dataset) - val qOpts = QueryCommands.QueryOptions(options.spread, options.sampleLimit) + val spreadProvider: Option[SpreadProvider] = options.spread.map(s => StaticSpreadProvider(SpreadChange(0, s))) + val qOpts = QueryCommands.QueryOptions(spreadProvider, options.sampleLimit) .copy(queryTimeoutSecs = options.timeout.toSeconds.toInt, shardOverrides = options.shardOverrides) println(s"Sending query command to server for $ref with options $qOpts...") diff --git a/conf/timeseries-filodb-server.conf b/conf/timeseries-filodb-server.conf index 17b40d666c..83a1a4afbe 100644 --- a/conf/timeseries-filodb-server.conf +++ b/conf/timeseries-filodb-server.conf @@ -5,6 +5,19 @@ filodb { port = 9042 partition-list-num-groups = 1 } + spread-default = 1 + + # Override default spread for application using override block which will have non metric shard keys and spread. + spread-assignment = [ + { + _ns = App-0, + _spread_ = 2 + }, + { + _ns = App-5, + _spread_ = 0 + } + ] } kamon { diff --git a/coordinator/src/main/scala/filodb.coordinator/KamonLogger.scala b/coordinator/src/main/scala/filodb.coordinator/KamonLogger.scala index 7c69d77040..67665a07f0 100644 --- a/coordinator/src/main/scala/filodb.coordinator/KamonLogger.scala +++ b/coordinator/src/main/scala/filodb.coordinator/KamonLogger.scala @@ -46,7 +46,7 @@ class KamonMetricsLogReporter extends MetricReporter with StrictLogging { } } - private def formatTags(tags: Map[String, String]) = tags.view.map { case (k,v) => s"$k=$v" }.mkString(" ") + private def formatTags(tags: Map[String, String]) = tags.view.map { case (k, v) => s"$k=$v" }.mkString(" ") private def normalizeLabelName(label: String): String = label.map(charOrUnderscore) diff --git a/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala b/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala index 95efcfed47..5fad8e3350 100644 --- a/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala @@ -2,13 +2,14 @@ package filodb.coordinator import java.util.concurrent.atomic.AtomicLong -import scala.util.control.NonFatal - import akka.actor.{ActorRef, ActorSystem, Props} import akka.dispatch.{Envelope, UnboundedStablePriorityMailbox} import com.typesafe.config.Config import kamon.Kamon import monix.execution.Scheduler +import net.ceedubs.ficus.Ficus._ +import net.ceedubs.ficus.readers.ValueReader +import scala.util.control.NonFatal import filodb.coordinator.queryengine2.QueryEngine import filodb.core._ @@ -55,6 +56,23 @@ final class QueryActor(memStore: MemStore, val config = context.system.settings.config + var filodbSpreadMap = new collection.mutable.HashMap[collection.Map[String, String], Int] + val applicationShardKeyName = dataset.options.nonMetricShardColumns(0) + val defaultSpread = config.getInt("filodb.spread-default") + + implicit val spreadOverrideReader: ValueReader[SpreadAssignment] = ValueReader.relative { spreadAssignmentConfig => + SpreadAssignment( + shardKeysMap = dataset.options.nonMetricShardColumns.map(x => + (x, spreadAssignmentConfig.getString(x))).toMap[String, String], + spread = spreadAssignmentConfig.getInt("_spread_") + ) + } + val spreadAssignment : List[SpreadAssignment]= config.as[List[SpreadAssignment]]("filodb.spread-assignment") + spreadAssignment.foreach{ x => filodbSpreadMap.put(x.shardKeysMap, x.spread)} + + val spreadFunc = QueryOptions.simpleMapSpreadFunc(applicationShardKeyName, filodbSpreadMap, defaultSpread) + val functionalSpreadProvider = FunctionalSpreadProvider(spreadFunc) + val queryEngine2 = new QueryEngine(dataset, shardMapFunc) val queryConfig = new QueryConfig(config.getConfig("filodb.query")) val numSchedThreads = Math.ceil(config.getDouble("filodb.query.threads-factor") * sys.runtime.availableProcessors) @@ -91,11 +109,15 @@ final class QueryActor(memStore: MemStore, } } + private def getSpreadProvider(queryOptions: QueryOptions): SpreadProvider = { + return queryOptions.spreadProvider.getOrElse(functionalSpreadProvider) + } + private def processLogicalPlan2Query(q: LogicalPlan2Query, replyTo: ActorRef) = { // This is for CLI use only. Always prefer clients to materialize logical plan lpRequests.increment try { - val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions) + val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, getSpreadProvider(q.queryOptions)) self forward execPlan } catch { case NonFatal(ex) => @@ -106,7 +128,7 @@ final class QueryActor(memStore: MemStore, private def processExplainPlanQuery(q: ExplainPlan2Query, replyTo: ActorRef) = { try { - val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions) + val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, getSpreadProvider(q.queryOptions)) replyTo ! execPlan } catch { case NonFatal(ex) => diff --git a/coordinator/src/main/scala/filodb.coordinator/client/QueryCommands.scala b/coordinator/src/main/scala/filodb.coordinator/client/QueryCommands.scala index 142e4ad34c..0df6600d1c 100644 --- a/coordinator/src/main/scala/filodb.coordinator/client/QueryCommands.scala +++ b/coordinator/src/main/scala/filodb.coordinator/client/QueryCommands.scala @@ -44,6 +44,7 @@ object QueryCommands { Seq(spreadChange) } } + case class SpreadAssignment(shardKeysMap: collection.Map[String, String], spread: Int) /** * Serialize with care! would be based on the provided function. @@ -60,37 +61,42 @@ object QueryCommands { * This class provides general query processing parameters * @param spreadFunc a function that returns chronologically ordered spread changes for the filter */ - final case class QueryOptions(spreadProvider: SpreadProvider = StaticSpreadProvider(), + final case class QueryOptions(spreadProvider: Option[SpreadProvider] = None, parallelism: Int = 16, queryTimeoutSecs: Int = 30, sampleLimit: Int = 1000000, shardOverrides: Option[Seq[Int]] = None) object QueryOptions { - def apply(constSpread: Int, sampleLimit: Int): QueryOptions = - QueryOptions(spreadProvider = StaticSpreadProvider(SpreadChange(0, constSpread)), sampleLimit = sampleLimit) + def apply(constSpread: Option[SpreadProvider], sampleLimit: Int): QueryOptions = + QueryOptions(spreadProvider = constSpread, sampleLimit = sampleLimit) /** * Creates a spreadFunc that looks for a particular filter with keyName Equals a value, and then maps values * present in the spreadMap to specific spread values, with a default if the filter/value not present in the map */ def simpleMapSpreadFunc(keyName: String, - spreadMap: collection.Map[String, Int], + spreadMap: collection.mutable.Map[collection.Map[String, String], Int], defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = { filters: Seq[ColumnFilter] => filters.collectFirst { case ColumnFilter(key, Filter.Equals(filtVal: String)) if key == keyName => filtVal }.map { tagValue => - Seq(SpreadChange(spread = spreadMap.getOrElse(tagValue, defaultSpread))) + Seq(SpreadChange(spread = spreadMap.getOrElse(collection.mutable.Map(keyName->tagValue), defaultSpread))) }.getOrElse(Seq(SpreadChange(defaultSpread))) } import collection.JavaConverters._ def simpleMapSpreadFunc(keyName: String, - spreadMap: java.util.Map[String, Int], - defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = - simpleMapSpreadFunc(keyName, spreadMap.asScala, defaultSpread) + spreadMap: java.util.Map[java.util.Map[String, String], Integer], + defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = { + val spreadAssignment: collection.mutable.Map[collection.Map[String, String], Int]= spreadMap.asScala.map { + case (d, v) => d.asScala -> v.toInt + } + + simpleMapSpreadFunc(keyName, spreadAssignment, defaultSpread) + } } /** diff --git a/coordinator/src/main/scala/filodb.coordinator/queryengine/Utils.scala b/coordinator/src/main/scala/filodb.coordinator/queryengine/Utils.scala index bb48d1ef92..5687cde7e3 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryengine/Utils.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryengine/Utils.scala @@ -28,6 +28,7 @@ final case class ChildErrorResponse(source: ActorRef, resp: ErrorResponse) exten object Utils extends StrictLogging { import filodb.coordinator.client.QueryCommands._ import TrySugar._ + import filodb.coordinator.client.QueryCommands._ /** * Convert column name strings into columnIDs. NOTE: column names should not include row key columns @@ -49,7 +50,8 @@ object Utils extends StrictLogging { */ def validatePartQuery(dataset: Dataset, shardMap: ShardMapper, partQuery: PartitionQuery, - options: QueryOptions): Seq[PartitionScanMethod] Or ErrorResponse = + options: QueryOptions, spreadProvider: SpreadProvider): + Seq[PartitionScanMethod] Or ErrorResponse = Try(partQuery match { case SinglePartitionQuery(keyParts) => val partKey = dataset.partKey(keyParts: _*) @@ -75,7 +77,7 @@ object Utils extends StrictLogging { if (shardCols.length > 0) { shardHashFromFilters(filters, shardCols, dataset) match { case Some(shardHash) => shardMap.queryShards(shardHash, - options.spreadProvider.spreadFunc(filters).last.spread) + spreadProvider.spreadFunc(filters).last.spread) case None => throw new IllegalArgumentException(s"Must specify filters for $shardCols") } } else { diff --git a/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala b/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala index 58422da884..ba0743be6a 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala @@ -12,7 +12,7 @@ import kamon.Kamon import monix.eval.Task import filodb.coordinator.ShardMapper -import filodb.coordinator.client.QueryCommands.QueryOptions +import filodb.coordinator.client.QueryCommands.{QueryOptions, SpreadProvider, StaticSpreadProvider} import filodb.core.Types import filodb.core.binaryrecord2.RecordBuilder import filodb.core.metadata.Dataset @@ -58,16 +58,19 @@ class QueryEngine(dataset: Dataset, * Converts a LogicalPlan to the ExecPlan */ def materialize(rootLogicalPlan: LogicalPlan, - options: QueryOptions): ExecPlan = { + options: QueryOptions, spreadProvider: SpreadProvider = StaticSpreadProvider()): ExecPlan = { val queryId = UUID.randomUUID().toString - val materialized = walkLogicalPlanTree(rootLogicalPlan, queryId, System.currentTimeMillis(), options) match { + + val materialized = walkLogicalPlanTree(rootLogicalPlan, queryId, System.currentTimeMillis(), + options, spreadProvider) + match { case PlanResult(Seq(justOne), stitch) => if (stitch) justOne.addRangeVectorTransformer(new StitchRvsMapper()) justOne case PlanResult(many, stitch) => val targetActor = pickDispatcher(many) many(0) match { - case lve: LabelValuesExec =>LabelValuesDistConcatExec(queryId, targetActor, many) + case lve: LabelValuesExec => LabelValuesDistConcatExec(queryId, targetActor, many) case ske: PartKeysExec => PartKeysDistConcatExec(queryId, targetActor, many) case ep: ExecPlan => val topPlan = DistConcatExec(queryId, targetActor, many) @@ -83,7 +86,7 @@ class QueryEngine(dataset: Dataset, val shardColumns = dataset.options.shardKeyColumns.sorted private def shardsFromFilters(filters: Seq[ColumnFilter], - options: QueryOptions): Seq[Int] = { + options: QueryOptions, spreadProvider : SpreadProvider): Seq[Int] = { require(shardColumns.nonEmpty || options.shardOverrides.nonEmpty, s"Dataset ${dataset.ref} does not have shard columns defined, and shard overrides were not mentioned") @@ -107,7 +110,7 @@ class QueryEngine(dataset: Dataset, val shardValues = shardVals.filterNot(_._1 == dataset.options.metricColumn).map(_._2) logger.debug(s"For shardColumns $shardColumns, extracted metric $metric and shard values $shardValues") val shardHash = RecordBuilder.shardKeyHash(shardValues, metric) - shardMapperFunc.queryShards(shardHash, options.spreadProvider.spreadFunc(filters).last.spread) + shardMapperFunc.queryShards(shardHash, spreadProvider.spreadFunc(filters).last.spread) } } @@ -128,27 +131,35 @@ class QueryEngine(dataset: Dataset, private def walkLogicalPlanTree(logicalPlan: LogicalPlan, queryId: String, submitTime: Long, - options: QueryOptions): PlanResult = { + options: QueryOptions, spreadProvider: SpreadProvider): PlanResult = { + logicalPlan match { - case lp: RawSeries => materializeRawSeries(queryId, submitTime, options, lp) - case lp: RawChunkMeta => materializeRawChunkMeta(queryId, submitTime, options, lp) - case lp: PeriodicSeries => materializePeriodicSeries(queryId, submitTime, options, lp) - case lp: PeriodicSeriesWithWindowing => materializePeriodicSeriesWithWindowing(queryId, submitTime, options, lp) - case lp: ApplyInstantFunction => materializeApplyInstantFunction(queryId, submitTime, options, lp) - case lp: Aggregate => materializeAggregate(queryId, submitTime, options, lp) - case lp: BinaryJoin => materializeBinaryJoin(queryId, submitTime, options, lp) - case lp: ScalarVectorBinaryOperation => materializeScalarVectorBinOp(queryId, submitTime, options, lp) - case lp: LabelValues => materializeLabelValues(queryId, submitTime, options, lp) - case lp: SeriesKeysByFilters => materializeSeriesKeysByFilters(queryId, submitTime, options, lp) - case lp: ApplyMiscellaneousFunction => materializeApplyMiscellaneousFunction(queryId, submitTime, options, lp) + case lp: RawSeries => materializeRawSeries(queryId, submitTime, options, lp, spreadProvider) + case lp: RawChunkMeta => materializeRawChunkMeta(queryId, submitTime, options, lp, spreadProvider) + case lp: PeriodicSeries => materializePeriodicSeries(queryId, submitTime, options, lp, + spreadProvider) + case lp: PeriodicSeriesWithWindowing => materializePeriodicSeriesWithWindowing(queryId, submitTime, options, lp, + spreadProvider) + case lp: ApplyInstantFunction => materializeApplyInstantFunction(queryId, submitTime, options, lp, + spreadProvider) + case lp: Aggregate => materializeAggregate(queryId, submitTime, options, lp, spreadProvider) + case lp: BinaryJoin => materializeBinaryJoin(queryId, submitTime, options, lp, spreadProvider) + case lp: ScalarVectorBinaryOperation => materializeScalarVectorBinOp(queryId, submitTime, options, lp, + spreadProvider) + case lp: LabelValues => materializeLabelValues(queryId, submitTime, options, lp, spreadProvider) + case lp: SeriesKeysByFilters => materializeSeriesKeysByFilters(queryId, submitTime, options, lp, + spreadProvider) + case lp: ApplyMiscellaneousFunction => materializeApplyMiscellaneousFunction(queryId, submitTime, options, lp, + spreadProvider) } } private def materializeScalarVectorBinOp(queryId: String, submitTime: Long, options: QueryOptions, - lp: ScalarVectorBinaryOperation): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vector, queryId, submitTime, options) + lp: ScalarVectorBinaryOperation, + spreadProvider : SpreadProvider): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vector, queryId, submitTime, options, spreadProvider) vectors.plans.foreach(_.addRangeVectorTransformer(ScalarOperationMapper(lp.operator, lp.scalar, lp.scalarIsLhs))) vectors } @@ -156,11 +167,11 @@ class QueryEngine(dataset: Dataset, private def materializeBinaryJoin(queryId: String, submitTime: Long, options: QueryOptions, - lp: BinaryJoin): PlanResult = { - val lhs = walkLogicalPlanTree(lp.lhs, queryId, submitTime, options) + lp: BinaryJoin, spreadProvider : SpreadProvider): PlanResult = { + val lhs = walkLogicalPlanTree(lp.lhs, queryId, submitTime, options, spreadProvider) val stitchedLhs = if (lhs.needsStitch) Seq(StitchRvsExec(queryId, pickDispatcher(lhs.plans), lhs.plans)) else lhs.plans - val rhs = walkLogicalPlanTree(lp.rhs, queryId, submitTime, options) + val rhs = walkLogicalPlanTree(lp.rhs, queryId, submitTime, options, spreadProvider) val stitchedRhs = if (rhs.needsStitch) Seq(StitchRvsExec(queryId, pickDispatcher(rhs.plans), rhs.plans)) else rhs.plans // TODO Currently we create separate exec plan node for stitching. @@ -178,8 +189,8 @@ class QueryEngine(dataset: Dataset, private def materializeAggregate(queryId: String, submitTime: Long, options: QueryOptions, - lp: Aggregate): PlanResult = { - val toReduce = walkLogicalPlanTree(lp.vectors, queryId, submitTime, options) + lp: Aggregate, spreadProvider : SpreadProvider): PlanResult = { + val toReduce = walkLogicalPlanTree(lp.vectors, queryId, submitTime, options, spreadProvider ) // Now we have one exec plan per shard /* * Note that in order for same overlapping RVs to not be double counted when spread is increased, @@ -203,8 +214,8 @@ class QueryEngine(dataset: Dataset, private def materializeApplyInstantFunction(queryId: String, submitTime: Long, options: QueryOptions, - lp: ApplyInstantFunction): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vectors, queryId, submitTime, options) + lp: ApplyInstantFunction, spreadProvider : SpreadProvider): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vectors, queryId, submitTime, options, spreadProvider) vectors.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(lp.function, lp.functionArgs))) vectors } @@ -212,8 +223,9 @@ class QueryEngine(dataset: Dataset, private def materializePeriodicSeriesWithWindowing(queryId: String, submitTime: Long, options: QueryOptions, - lp: PeriodicSeriesWithWindowing): PlanResult ={ - val rawSeries = walkLogicalPlanTree(lp.rawSeries, queryId, submitTime, options) + lp: PeriodicSeriesWithWindowing, + spreadProvider: SpreadProvider): PlanResult = { + val rawSeries = walkLogicalPlanTree(lp.rawSeries, queryId, submitTime, options, spreadProvider) rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.start, lp.step, lp.end, Some(lp.window), Some(lp.function), lp.functionArgs))) rawSeries @@ -222,8 +234,8 @@ class QueryEngine(dataset: Dataset, private def materializePeriodicSeries(queryId: String, submitTime: Long, options: QueryOptions, - lp: PeriodicSeries): PlanResult = { - val rawSeries = walkLogicalPlanTree(lp.rawSeries, queryId, submitTime, options) + lp: PeriodicSeries, spreadProvider : SpreadProvider): PlanResult = { + val rawSeries = walkLogicalPlanTree(lp.rawSeries, queryId, submitTime, options, spreadProvider) rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.start, lp.step, lp.end, None, None, Nil))) rawSeries @@ -232,15 +244,15 @@ class QueryEngine(dataset: Dataset, private def materializeRawSeries(queryId: String, submitTime: Long, options: QueryOptions, - lp: RawSeries): PlanResult = { + lp: RawSeries, spreadProvider : SpreadProvider): PlanResult = { val colIDs = getColumnIDs(dataset, lp.columns) val renamedFilters = renameMetricFilter(lp.filters) - val spreadChanges = options.spreadProvider.spreadFunc(renamedFilters) + val spreadChanges = spreadProvider.spreadFunc(renamedFilters) val needsStitch = lp.rangeSelector match { case IntervalSelector(from, to) => spreadChanges.exists(c => c.time >= from && c.time <= to) case _ => false } - val execPlans = shardsFromFilters(renamedFilters, options).map { shard => + val execPlans = shardsFromFilters(renamedFilters, options, spreadProvider).map { shard => val dispatcher = dispatcherForShard(shard) SelectRawPartitionsExec(queryId, submitTime, options.sampleLimit, dispatcher, dataset.ref, shard, renamedFilters, toChunkScanMethod(lp.rangeSelector), colIDs) @@ -251,7 +263,7 @@ class QueryEngine(dataset: Dataset, private def materializeLabelValues(queryId: String, submitTime: Long, options: QueryOptions, - lp: LabelValues): PlanResult = { + lp: LabelValues, spreadProvider : SpreadProvider): PlanResult = { val filters = lp.labelConstraints.map { case (k, v) => new ColumnFilter(k, Filter.Equals(v)) }.toSeq @@ -262,7 +274,7 @@ class QueryEngine(dataset: Dataset, lp.labelNames.updated(metricLabelIndex, dataset.options.metricColumn) else lp.labelNames val shardsToHit = if (shardColumns.toSet.subsetOf(lp.labelConstraints.keySet)) { - shardsFromFilters(filters, options) + shardsFromFilters(filters, options, spreadProvider) } else { mdNoShardKeyFilterRequests.increment() shardMapperFunc.assignedShards @@ -278,11 +290,11 @@ class QueryEngine(dataset: Dataset, private def materializeSeriesKeysByFilters(queryId: String, submitTime: Long, options: QueryOptions, - lp: SeriesKeysByFilters): PlanResult = { + lp: SeriesKeysByFilters, spreadProvider : SpreadProvider): PlanResult = { val renamedFilters = renameMetricFilter(lp.filters) val filterCols = lp.filters.map(_.column).toSet val shardsToHit = if (shardColumns.toSet.subsetOf(filterCols)) { - shardsFromFilters(lp.filters, options) + shardsFromFilters(lp.filters, options, spreadProvider) } else { mdNoShardKeyFilterRequests.increment() shardMapperFunc.assignedShards @@ -298,12 +310,12 @@ class QueryEngine(dataset: Dataset, private def materializeRawChunkMeta(queryId: String, submitTime: Long, options: QueryOptions, - lp: RawChunkMeta): PlanResult = { + lp: RawChunkMeta, spreadProvider : SpreadProvider): PlanResult = { // Translate column name to ID and validate here val colName = if (lp.column.isEmpty) dataset.options.valueColumn else lp.column val colID = dataset.colIDs(colName).get.head val renamedFilters = renameMetricFilter(lp.filters) - val metaExec = shardsFromFilters(renamedFilters, options).map { shard => + val metaExec = shardsFromFilters(renamedFilters, options, spreadProvider).map { shard => val dispatcher = dispatcherForShard(shard) SelectChunkInfosExec(queryId, submitTime, options.sampleLimit, dispatcher, dataset.ref, shard, renamedFilters, toChunkScanMethod(lp.rangeSelector), colID) @@ -312,10 +324,11 @@ class QueryEngine(dataset: Dataset, } private def materializeApplyMiscellaneousFunction(queryId: String, - submitTime: Long, - options: QueryOptions, - lp: ApplyMiscellaneousFunction): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vectors, queryId, submitTime, options) + submitTime: Long, + options: QueryOptions, + lp: ApplyMiscellaneousFunction, + spreadProvider: SpreadProvider): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vectors, queryId, submitTime, options, spreadProvider) vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.functionArgs))) vectors } diff --git a/coordinator/src/test/scala/filodb.coordinator/client/SerializationSpec.scala b/coordinator/src/test/scala/filodb.coordinator/client/SerializationSpec.scala index 482b1bf206..6ec7a90921 100644 --- a/coordinator/src/test/scala/filodb.coordinator/client/SerializationSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/client/SerializationSpec.scala @@ -201,7 +201,8 @@ class SerializationSpec extends ActorTest(SerializationSpecConfig.getNewSystem) val windowed2 = PeriodicSeriesWithWindowing(raw2, from, 1000, to, 5000, RangeFunctionId.Rate) val summed2 = Aggregate(AggregationOperator.Sum, windowed2, Nil, Seq("job")) val logicalPlan = BinaryJoin(summed1, BinaryOperator.DIV, Cardinality.OneToOne, summed2) - val execPlan = engine.materialize(logicalPlan, QueryOptions(0, 100)) + val execPlan = engine.materialize(logicalPlan, QueryOptions(Some(StaticSpreadProvider(SpreadChange(0, 0))), + 100), new StaticSpreadProvider(SpreadChange(0, 0))) roundTrip(execPlan) shouldEqual execPlan } @@ -219,7 +220,8 @@ class SerializationSpec extends ActorTest(SerializationSpecConfig.getNewSystem) val logicalPlan1 = Parser.queryRangeToLogicalPlan( "sum(rate(http_request_duration_seconds_bucket{job=\"prometheus\"}[20s])) by (handler)", qParams) - val execPlan1 = engine.materialize(logicalPlan1, QueryOptions(0, 100)) + val execPlan1 = engine.materialize(logicalPlan1, QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 0))), + 100), new StaticSpreadProvider(SpreadChange(0, 0))) roundTrip(execPlan1) shouldEqual execPlan1 // scalastyle:off @@ -227,7 +229,7 @@ class SerializationSpec extends ActorTest(SerializationSpecConfig.getNewSystem) "sum(rate(http_request_duration_microseconds_sum{job=\"prometheus\"}[5m])) by (handler) / sum(rate(http_request_duration_microseconds_count{job=\"prometheus\"}[5m])) by (handler)", qParams) // scalastyle:on - val execPlan2 = engine.materialize(logicalPlan2, QueryOptions(0, 100)) + val execPlan2 = engine.materialize(logicalPlan2, QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 0))), 100), new StaticSpreadProvider(SpreadChange(0, 0))) roundTrip(execPlan2) shouldEqual execPlan2 } @@ -249,7 +251,8 @@ class SerializationSpec extends ActorTest(SerializationSpecConfig.getNewSystem) val logicalPlan1 = Parser.metadataQueryToLogicalPlan( "http_request_duration_seconds_bucket{job=\"prometheus\"}", qParams) - val execPlan1 = engine.materialize(logicalPlan1, QueryOptions(0, 100)) + val execPlan1 = engine.materialize(logicalPlan1, QueryOptions(Some( + new StaticSpreadProvider(SpreadChange(0, 0))), 100), new StaticSpreadProvider(SpreadChange(0, 0))) val partKeysExec = execPlan1.asInstanceOf[PartKeysExec] // will be dispatched to single shard roundTrip(partKeysExec) shouldEqual partKeysExec @@ -257,7 +260,8 @@ class SerializationSpec extends ActorTest(SerializationSpecConfig.getNewSystem) val logicalPlan2 = Parser.metadataQueryToLogicalPlan( "http_request_duration_seconds_bucket", qParams) - val execPlan2 = engine.materialize(logicalPlan2, QueryOptions(0, 100)) + val execPlan2 = engine.materialize(logicalPlan2, QueryOptions( + Some(new StaticSpreadProvider(SpreadChange(0, 0))), 100)) val partKeysDistConcatExec = execPlan2.asInstanceOf[PartKeysDistConcatExec] // will be dispatched to all active shards since no shard column filters in the query diff --git a/coordinator/src/test/scala/filodb.coordinator/queryengine2/QueryEngineSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryengine2/QueryEngineSpec.scala index 72ee6b9761..9ef329c53b 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryengine2/QueryEngineSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryengine2/QueryEngineSpec.scala @@ -3,7 +3,6 @@ package filodb.coordinator.queryengine2 import akka.actor.ActorSystem import akka.testkit.TestProbe import org.scalatest.{FunSpec, Matchers} - import filodb.coordinator.ShardMapper import filodb.coordinator.client.QueryCommands.{FunctionalSpreadProvider, QueryOptions, SpreadChange} import filodb.core.MetricsTestData @@ -117,7 +116,6 @@ class QueryEngineSpec extends FunSpec with Matchers { // materialized exec plan val execPlan = engine2.materialize(raw2, QueryOptions()) - // println(execPlan.printTree()) execPlan.isInstanceOf[DistConcatExec] shouldEqual true execPlan.children.foreach { l1 => l1.isInstanceOf[SelectRawPartitionsExec] shouldEqual true @@ -127,13 +125,17 @@ class QueryEngineSpec extends FunSpec with Matchers { } it("should use spread function to change/override spread and generate ExecPlan with appropriate shards") { - val spreadFunc = QueryOptions.simpleMapSpreadFunc("job", Map("myService" -> 2), 1) + var filodbSpreadMap = new collection.mutable.HashMap[collection.Map[String, String], Int] + filodbSpreadMap.put(collection.Map(("job" -> "myService")), 2) + + val spreadFunc = QueryOptions.simpleMapSpreadFunc("job", filodbSpreadMap, 1) // final logical plan val logicalPlan = BinaryJoin(summed1, BinaryOperator.DIV, Cardinality.OneToOne, summed2) // materialized exec plan - val execPlan = engine.materialize(logicalPlan, QueryOptions(FunctionalSpreadProvider(spreadFunc))) + val execPlan = engine.materialize(logicalPlan, QueryOptions(), FunctionalSpreadProvider(spreadFunc)) + execPlan.printTree() execPlan.isInstanceOf[BinaryJoinExec] shouldEqual true execPlan.children should have length (2) @@ -148,7 +150,7 @@ class QueryEngineSpec extends FunSpec with Matchers { def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { Seq(SpreadChange(0, 1), SpreadChange(25000000, 2)) // spread change time is in ms } - val execPlan = engine.materialize(lp, QueryOptions(FunctionalSpreadProvider(spread))) + val execPlan = engine.materialize(lp, QueryOptions(), FunctionalSpreadProvider(spread)) execPlan.rangeVectorTransformers.head.isInstanceOf[StitchRvsMapper] shouldEqual true } @@ -157,7 +159,7 @@ class QueryEngineSpec extends FunSpec with Matchers { def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { Seq(SpreadChange(0, 1), SpreadChange(35000000, 2)) } - val execPlan = engine.materialize(lp, QueryOptions(FunctionalSpreadProvider(spread))) + val execPlan = engine.materialize(lp, QueryOptions(), FunctionalSpreadProvider(spread)) execPlan.rangeVectorTransformers.isEmpty shouldEqual true } @@ -167,7 +169,7 @@ class QueryEngineSpec extends FunSpec with Matchers { def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { Seq(SpreadChange(0, 1), SpreadChange(25000000, 2)) } - val execPlan = engine.materialize(lp, QueryOptions(FunctionalSpreadProvider(spread))) + val execPlan = engine.materialize(lp, QueryOptions(), FunctionalSpreadProvider(spread)) val binaryJoinNode = execPlan.children(0) binaryJoinNode.isInstanceOf[BinaryJoinExec] shouldEqual true binaryJoinNode.children.size shouldEqual 2 @@ -180,7 +182,7 @@ class QueryEngineSpec extends FunSpec with Matchers { def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { Seq(SpreadChange(0, 1), SpreadChange(35000000, 2)) } - val execPlan = engine.materialize(lp, QueryOptions(FunctionalSpreadProvider(spread))) + val execPlan = engine.materialize(lp, QueryOptions(), FunctionalSpreadProvider(spread)) val binaryJoinNode = execPlan.children(0) binaryJoinNode.isInstanceOf[BinaryJoinExec] shouldEqual true binaryJoinNode.children.foreach(_.isInstanceOf[StitchRvsExec] should not equal true) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 465878137c..0acdfdfa04 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -25,7 +25,11 @@ filodb { } # # of shards each application/metric is spread out to = 2^spread - default-spread = 1 + spread-default = 1 + # default spread can be overriden for a specific sharding key combination. + # Eg: If "__name__, _ns" are your sharding key, for a _ns "App-Test001" the spread can be overriden as follows: + # spread-assignment = [ { _ns = App-Test001, _spread_ = 5 } ] + spread-assignment = [] query { # Timeout for query engine subtree/ExecPlans for requests to sub nodes diff --git a/core/src/main/scala/filodb.core/Iterators.scala b/core/src/main/scala/filodb.core/Iterators.scala index 2523c2d64a..d7fb8ad6b3 100644 --- a/core/src/main/scala/filodb.core/Iterators.scala +++ b/core/src/main/scala/filodb.core/Iterators.scala @@ -30,7 +30,7 @@ object Iterators extends StrictLogging { def next: (B, Iterator[T]) = { val first = iter.next() val firstValue = func(first) - val (i1,i2) = iter.span(el => func(el) == firstValue) + val (i1, i2) = iter.span(el => func(el) == firstValue) iter = i2 (firstValue, Iterator.single(first) ++ i1) } diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala index 95e6ce77e5..5732610db2 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala @@ -27,7 +27,7 @@ final class RecordContainer(val base: Any, val offset: Long, maxLength: Int, var numRecords: Int = 0) { import RecordBuilder._ - @inline final def numBytes: Int = UnsafeUtils.getInt(base, offset) + @inline final def numBytes: Int = UnsafeUtils.getInt(base, offset) @inline final def isEmpty: Boolean = numBytes <= 4 /** diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 5ba58974c3..1039d1cead 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -32,10 +32,10 @@ import filodb.memory.{BinaryRegionLarge, UTF8StringMedium} import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String => UTF8Str} object PartKeyLuceneIndex { - final val PART_ID = "__partId__" + final val PART_ID = "__partId__" final val START_TIME = "__startTime__" - final val END_TIME = "__endTime__" - final val PART_KEY = "__partKey__" + final val END_TIME = "__endTime__" + final val PART_KEY = "__partKey__" final val ignoreIndexNames = HashSet(START_TIME, PART_KEY, END_TIME, PART_ID) diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala index 0790a25d54..24c2fc23b4 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala @@ -134,7 +134,7 @@ extends ChunkMap(memFactory, initMapSize) with ReadablePartition { // vectors fills up. This is possible if one vector fills up but the other one does not for some reason. // So we do not call ingest again unless switcing buffers succeeds. // re-ingest every element, allocating new WriteBuffers - if (switchBuffers(blockHolder, encode=true)) { ingest(row, blockHolder) } + if (switchBuffers(blockHolder, encode = true)) { ingest(row, blockHolder) } else { _log.warn("EMPTY WRITEBUFFERS when switchBuffers called! Likely a severe bug!!! " + s"Part=$stringPartition ts=$ts col=$col numRows=${currentInfo.numRows}") } return diff --git a/core/src/main/scala/filodb.core/metadata/Dataset.scala b/core/src/main/scala/filodb.core/metadata/Dataset.scala index decc5629d2..7424d1a895 100644 --- a/core/src/main/scala/filodb.core/metadata/Dataset.scala +++ b/core/src/main/scala/filodb.core/metadata/Dataset.scala @@ -232,25 +232,25 @@ object Dataset { partitionColumns: Seq[String], dataColumns: Seq[String], keyColumns: Seq[String]): Dataset = - apply(name, partitionColumns, dataColumns, keyColumns, Nil) + apply(name, partitionColumns, dataColumns, keyColumns, Nil, DatasetOptions.DefaultOptions) def apply(name: String, partitionColumns: Seq[String], dataColumns: Seq[String], keyColumns: Seq[String], - downsamplers: Seq[String]): Dataset = - make(name, partitionColumns, dataColumns, keyColumns, downsamplers).badMap(BadSchemaError).toTry.get + downsamplers: Seq[String], options : DatasetOptions): Dataset = + make(name, partitionColumns, dataColumns, keyColumns, downsamplers, options).badMap(BadSchemaError).toTry.get def apply(name: String, partitionColumns: Seq[String], dataColumns: Seq[String], - keyColumn: String): Dataset = - apply(name, partitionColumns, dataColumns, Seq(keyColumn), Nil) + keyColumn: String, options: DatasetOptions): Dataset = + apply(name, partitionColumns, dataColumns, Seq(keyColumn), Nil, options) def apply(name: String, partitionColumns: Seq[String], dataColumns: Seq[String]): Dataset = - apply(name, partitionColumns, dataColumns, "timestamp") + apply(name, partitionColumns, dataColumns, "timestamp", DatasetOptions.DefaultOptions) sealed trait BadSchema case class BadDownsampler(msg: String) extends BadSchema diff --git a/core/src/main/scala/filodb.core/store/ChunkSink.scala b/core/src/main/scala/filodb.core/store/ChunkSink.scala index aba2b581ce..19db32546b 100644 --- a/core/src/main/scala/filodb.core/store/ChunkSink.scala +++ b/core/src/main/scala/filodb.core/store/ChunkSink.scala @@ -133,7 +133,7 @@ class NullColumnStore(implicit sched: Scheduler) extends ColumnStore with Strict def readRawPartitions(dataset: Dataset, columnIDs: Seq[Types.ColumnId], partMethod: PartitionScanMethod, - chunkMethod: ChunkScanMethod = AllChunkScan): Observable[RawPartData] = Observable.empty + chunkMethod: ChunkScanMethod = AllChunkScan): Observable[RawPartData] = Observable.empty override def getScanSplits(dataset: DatasetRef, splitsPerNode: Int): Seq[ScanSplit] = Seq.empty diff --git a/core/src/test/resources/application_test.conf b/core/src/test/resources/application_test.conf index 650256c6ca..e287e60892 100644 --- a/core/src/test/resources/application_test.conf +++ b/core/src/test/resources/application_test.conf @@ -56,6 +56,15 @@ filodb { sample-limit = 1000000 min-step = 1 ms } + + spread-default = 1 + + spread-assignment = [ + { + _ns = App-0, + _spread_ = 2 + } + ] } query-actor-mailbox { diff --git a/core/src/test/scala/filodb.core/TestData.scala b/core/src/test/scala/filodb.core/TestData.scala index c68c766a4e..1f1d42cc40 100644 --- a/core/src/test/scala/filodb.core/TestData.scala +++ b/core/src/test/scala/filodb.core/TestData.scala @@ -58,7 +58,7 @@ object NamesTestData { def mapper(rows: Seq[Product]): Seq[RowReader] = rows.map(TupleRowReader) val dataColSpecs = Seq("first:string", "last:string", "age:long:interval=10") - val dataset = Dataset("dataset", Seq("seg:int"), dataColSpecs, "age") + val dataset = Dataset("dataset", Seq("seg:int"), dataColSpecs, "age", DatasetOptions.DefaultOptions) // NOTE: first 3 columns are the data columns, thus names could be used for either complete record // or the data column rowReader @@ -91,7 +91,7 @@ object NamesTestData { val sortedFirstNames = Seq("Khalil", "Rodney", "Ndamukong", "Terrance", "Peyton", "Jerry") val sortedUtf8Firsts = sortedFirstNames.map(_.utf8) - val largeDataset = Dataset("dataset", Seq("league:string"), dataColSpecs, "age") + val largeDataset = Dataset("dataset", Seq("league:string"), dataColSpecs, "age", DatasetOptions.DefaultOptions) val lotLotNames = { for { league <- Seq("nfc", "afc") @@ -170,7 +170,7 @@ object GdeltTestData { val seqReaders = records.map { record => SeqRowReader(record.productIterator.toList) } // Dataset1: Partition keys (Actor2Code, Year) / Row key GLOBALEVENTID - val dataset1 = Dataset("gdelt", Seq(schema(4), schema(3)), schema.patch(3, Nil, 2), "GLOBALEVENTID") + val dataset1 = Dataset("gdelt", Seq(schema(4), schema(3)), schema.patch(3, Nil, 2), "GLOBALEVENTID", DatasetOptions.DefaultOptions) // Dataset2: Partition key (MonthYear) / Row keys (GLOBALEVENTID, Actor2Code) val dataset2 = Dataset("gdelt", Seq(schema(2)), schema.patch(2, Nil, 1), Seq("GLOBALEVENTID", "Actor2Code")) @@ -185,7 +185,9 @@ object GdeltTestData { // val partBuilder4 = new RecordBuilder(TestData.nativeMem, dataset4.partKeySchema, 10240) // Proj 6: partition Actor2Code,Actor2Name to test partition key bitmap indexing - val dataset6 = Dataset("gdelt", schema.slice(4, 6), schema.patch(4, Nil, 2), "GLOBALEVENTID") + val datasetOptions = DatasetOptions.DefaultOptions.copy( + shardKeyColumns = Seq( "__name__","_ns")) + val dataset6 = Dataset("gdelt", schema.slice(4, 6), schema.patch(4, Nil, 2), "GLOBALEVENTID", datasetOptions) } // A simulation of machine metrics data diff --git a/core/src/test/scala/filodb.core/metadata/DatasetSpec.scala b/core/src/test/scala/filodb.core/metadata/DatasetSpec.scala index 09853207a1..91c505cf5f 100644 --- a/core/src/test/scala/filodb.core/metadata/DatasetSpec.scala +++ b/core/src/test/scala/filodb.core/metadata/DatasetSpec.scala @@ -17,7 +17,7 @@ class DatasetSpec extends FunSpec with Matchers { resp1.swap.get shouldEqual ColumnErrors(Seq(NotNameColonType("column2"))) intercept[BadSchemaError] { - Dataset("dataset", Seq("part:string"), dataColSpecs :+ "column2:a:b", "age") + Dataset("dataset", Seq("part:string"), dataColSpecs :+ "column2:a:b", "age", DatasetOptions.DefaultOptions) } } @@ -94,11 +94,11 @@ class DatasetSpec extends FunSpec with Matchers { val mapCol = "tags:map" // OK: only partition column is map - val ds1 = Dataset("dataset", Seq(mapCol), dataColSpecs, "age") + val ds1 = Dataset("dataset", Seq(mapCol), dataColSpecs, "age", DatasetOptions.DefaultOptions) ds1.partitionColumns.map(_.name) should equal (Seq("tags")) // OK: last partition column is map - val ds2 = Dataset("dataset", Seq("first:string", mapCol), dataColSpecs drop 1, "age") + val ds2 = Dataset("dataset", Seq("first:string", mapCol), dataColSpecs drop 1, "age", DatasetOptions.DefaultOptions) ds2.partitionColumns.map(_.name) should equal (Seq("first", "tags")) // Not OK: first partition column is map @@ -118,7 +118,7 @@ class DatasetSpec extends FunSpec with Matchers { } it("should return a valid Dataset when a good specification passed") { - val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, "age") + val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, "age", DatasetOptions.DefaultOptions) ds.rowKeyIDs shouldEqual Seq(2) ds.dataColumns should have length (3) ds.dataColumns.map(_.id) shouldEqual Seq(0, 1, 2) @@ -144,7 +144,7 @@ class DatasetSpec extends FunSpec with Matchers { } it("should return IDs for column names or seq of missing names") { - val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, "age") + val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, "age", DatasetOptions.DefaultOptions) ds.colIDs("first", "age").get shouldEqual Seq(0, 2) ds.colIDs("part").get shouldEqual Seq(Dataset.PartColStartIndex) @@ -155,7 +155,7 @@ class DatasetSpec extends FunSpec with Matchers { } it("should return ColumnInfos for colIDs") { - val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, "age") + val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, "age", DatasetOptions.DefaultOptions) val infos = ds.infosFromIDs(Seq(0, 2)) infos shouldEqual Seq(ColumnInfo("first", StringColumn), ColumnInfo("age", LongColumn)) @@ -172,7 +172,8 @@ class DatasetSpec extends FunSpec with Matchers { describe("Dataset serialization") { it("should serialize and deserialize") { - val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, Seq("age"), Seq("dMin(1)")) + val ds = Dataset("dataset", Seq("part:string"), dataColSpecs, Seq("age"), Seq("dMin(1)"), + DatasetOptions.DefaultOptions) .copy(options = DatasetOptions.DefaultOptions.copy( copyTags = Map("exporter" -> "_ns"))) Dataset.fromCompactString(ds.asCompactString) shouldEqual ds diff --git a/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala b/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala index a6b59a993b..869331ad5c 100644 --- a/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala +++ b/core/src/test/scala/filodb.core/store/MetaStoreSpec.scala @@ -19,7 +19,8 @@ with BeforeAndAfter with BeforeAndAfterAll with ScalaFutures { metaStore.initialize().futureValue } - val dataset = Dataset("foo", Seq("part:string"), Seq("timestamp:long", "value:double"), "timestamp") + val dataset = Dataset("foo", Seq("part:string"), Seq("timestamp:long", "value:double"), "timestamp", + DatasetOptions.DefaultOptions) before { metaStore.clearAllData().futureValue } diff --git a/doc/http_api.md b/doc/http_api.md index da4b6c1b31..9fb4de6718 100644 --- a/doc/http_api.md +++ b/doc/http_api.md @@ -179,6 +179,7 @@ For more details, see Prometheus HTTP API Documentation ``` params: • `explainOnly` -- returns an ExecPlan instead of the query results +• `spread` -- override default spread ``` #### GET /promql/{dataset}/api/v1/query?query={promQLString}&time={timestamp} @@ -190,6 +191,7 @@ range expression. For more details, see Prometheus HTTP API Documentation ``` params: • `explainOnly` -- returns an ExecPlan instead of the query results +• `spread` -- override default spread ``` #### POST /promql/{dataset}/api/v1/read diff --git a/doc/sharding.md b/doc/sharding.md index 8751580d66..83d82ddf89 100644 --- a/doc/sharding.md +++ b/doc/sharding.md @@ -39,8 +39,19 @@ What the above means is that ## Spread, or How to Avoid Hotspotting -The **spread** determines how many shards a given shard key is mapped to. The number of shards is equal to 2 to the power of the spread. It is used to manage how widely specific shard keys (such as applications, the job, or metrics) are distributed. For example, if one job or metric has a huge number of series, one can assign a higher spread to it to avoid hotspotting. (The management of spreads for individual shard keys is not currently included in the open source offering). - +The **spread** determines how many shards a given shard key is mapped to. The number of shards is equal to 2 to the power of the spread. It is used to manage how widely specific shard keys (such as applications, the job, or metrics) are distributed. For example, if one job or metric has a huge number of series, one can assign a higher spread to it to avoid hotspotting. Default spread can be overriden by appending spread as a parameter in the query. Spread can also be overriden for every application by specifying all non metric shard keys in the config +``` +spread-assignment = [ + { + _ns = App-0, + _spread_ = 2 + }, + { + _ns = App-5, + _spread_ = 0 + } + ] +``` ## Shard Coordination FiloDB Clients enable users to set up new datasets as needed. Internally clients send a `SetupDataset` command to the [NodeClusterActor](../coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala). diff --git a/gateway/src/main/scala/filodb/gateway/GatewayServer.scala b/gateway/src/main/scala/filodb/gateway/GatewayServer.scala index a61cd91a66..76657905af 100644 --- a/gateway/src/main/scala/filodb/gateway/GatewayServer.scala +++ b/gateway/src/main/scala/filodb/gateway/GatewayServer.scala @@ -72,11 +72,11 @@ object GatewayServer extends StrictLogging { // Most options are for generating test data class GatewayOptions(args: Seq[String]) extends ScallopConf(args) { val samplesPerSeries = opt[Int](short = 'n', default = Some(100), - descr="# of samples per time series") - val numSeries = opt[Int](short='p', default = Some(20), descr="# of total time series") - val sourceConfigPath = trailArg[String](descr="Path to source config, eg conf/timeseries-dev-source.conf") - val genHistData = toggle(noshort=true, descrYes="Generate histogram-schema test data and exit") - val genPromData = toggle(noshort=true, descrYes="Generate Prometheus-schema test data and exit") + descr = "# of samples per time series") + val numSeries = opt[Int](short = 'p', default = Some(20), descr = "# of total time series") + val sourceConfigPath = trailArg[String](descr = "Path to source config, eg conf/timeseries-dev-source.conf") + val genHistData = toggle(noshort = true, descrYes = "Generate histogram-schema test data and exit") + val genPromData = toggle(noshort = true, descrYes = "Generate Prometheus-schema test data and exit") verify() } @@ -95,7 +95,7 @@ object GatewayServer extends StrictLogging { // NOTE: the spread MUST match the default spread used in the HTTP module for consistency between querying // and ingestion sharding - val spread = config.getInt("filodb.default-spread") + val spread = config.getInt("filodb.spread-default") val shardMapper = new ShardMapper(numShards) val queueFullWait = config.as[FiniteDuration]("gateway.queue-full-wait").toMillis diff --git a/http/src/main/scala/filodb/http/HttpSettings.scala b/http/src/main/scala/filodb/http/HttpSettings.scala index 1339958279..3b63ac63d2 100644 --- a/http/src/main/scala/filodb/http/HttpSettings.scala +++ b/http/src/main/scala/filodb/http/HttpSettings.scala @@ -7,6 +7,6 @@ class HttpSettings(config: Config) { lazy val httpServerBindPort = config.getInt("filodb.http.bind-port") lazy val httpServerStartTimeout = config.getDuration("filodb.http.start-timeout") - lazy val queryDefaultSpread = config.getInt("filodb.default-spread") + lazy val queryDefaultSpread = config.getInt("filodb.spread-default") lazy val querySampleLimit = config.getInt("filodb.query.sample-limit") } diff --git a/http/src/main/scala/filodb/http/PrometheusApiRoute.scala b/http/src/main/scala/filodb/http/PrometheusApiRoute.scala index 5aae05d281..f7af8b2adc 100644 --- a/http/src/main/scala/filodb/http/PrometheusApiRoute.scala +++ b/http/src/main/scala/filodb/http/PrometheusApiRoute.scala @@ -22,7 +22,6 @@ import filodb.prometheus.query.PrometheusModel.Sampl import filodb.query.{LogicalPlan, QueryError, QueryResult} import filodb.query.exec.ExecPlan - class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit am: ActorMaterializer) extends FiloRoute with StrictLogging { @@ -34,10 +33,6 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a import filodb.coordinator.client.Client._ import filodb.prometheus.query.PrometheusModel._ - val spreadProvider = new StaticSpreadProvider(SpreadChange(0, settings.queryDefaultSpread)) - - val queryOptions = QueryOptions(spreadProvider, settings.querySampleLimit) - val route = pathPrefix( "promql" / Segment) { dataset => // Path: /promql//api/v1/query_range // Used to issue a promQL query for a time range with a `start` and `end` timestamp and at regular `step` intervals. @@ -46,10 +41,11 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a path( "api" / "v1" / "query_range") { get { parameter('query.as[String], 'start.as[Double], 'end.as[Double], - 'step.as[Int], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?) - { (query, start, end, step, explainOnly, verbose) => + 'step.as[Int], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?, 'spread.as[Int].?) + { (query, start, end, step, explainOnly, verbose, spread) => val logicalPlan = Parser.queryRangeToLogicalPlan(query, TimeStepParams(start.toLong, step, end.toLong)) - askQueryAndRespond(dataset, logicalPlan, explainOnly.getOrElse(false),verbose.getOrElse(false)) + askQueryAndRespond(dataset, logicalPlan, explainOnly.getOrElse(false), verbose.getOrElse(false), + spread) } } } ~ @@ -59,10 +55,12 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a // [Instant Queries](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries) path( "api" / "v1" / "query") { get { - parameter('query.as[String], 'time.as[Double], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?) - { (query, time, explainOnly, verbose) => + parameter('query.as[String], 'time.as[Double], 'explainOnly.as[Boolean].?, 'verbose.as[Boolean].?, + 'spread.as[Int].?) + { (query, time, explainOnly, verbose, spread) => val logicalPlan = Parser.queryToLogicalPlan(query, time.toLong) - askQueryAndRespond(dataset, logicalPlan, explainOnly.getOrElse(false), verbose.getOrElse(false)) + askQueryAndRespond(dataset, logicalPlan, explainOnly.getOrElse(false), + verbose.getOrElse(false), spread) } } } ~ @@ -84,7 +82,7 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a // but Akka doesnt support snappy out of the box. Elegant solution is a TODO for later. val readReq = ReadRequest.parseFrom(Snappy.uncompress(bytes.toArray)) val asks = toFiloDBLogicalPlans(readReq).map { logicalPlan => - asyncAsk(nodeCoord, LogicalPlan2Query(DatasetRef.fromDotString(dataset), logicalPlan, queryOptions)) + asyncAsk(nodeCoord, LogicalPlan2Query(DatasetRef.fromDotString(dataset), logicalPlan)) } Future.sequence(asks) } @@ -108,12 +106,14 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a } } - private def askQueryAndRespond(dataset: String, logicalPlan: LogicalPlan, explainOnly: Boolean, verbose: Boolean) = { + private def askQueryAndRespond(dataset: String, logicalPlan: LogicalPlan, explainOnly: Boolean, verbose: Boolean, + spread: Option[Int]) = { + val spreadProvider: Option[SpreadProvider] = spread.map(s => StaticSpreadProvider(SpreadChange(0, s))) val command = if (explainOnly) { - ExplainPlan2Query(DatasetRef.fromDotString(dataset), logicalPlan, queryOptions) + ExplainPlan2Query(DatasetRef.fromDotString(dataset), logicalPlan, QueryOptions(spreadProvider)) } else { - LogicalPlan2Query(DatasetRef.fromDotString(dataset), logicalPlan, queryOptions) + LogicalPlan2Query(DatasetRef.fromDotString(dataset), logicalPlan, QueryOptions(spreadProvider)) } onSuccess(asyncAsk(nodeCoord, command)) { case qr: QueryResult => complete(toPromSuccessResponse(qr, verbose)) diff --git a/http/src/test/scala/filodb/http/PrometheusApiRouteSpec.scala b/http/src/test/scala/filodb/http/PrometheusApiRouteSpec.scala index 0f4fbfd51e..c8d5eb2f2a 100644 --- a/http/src/test/scala/filodb/http/PrometheusApiRouteSpec.scala +++ b/http/src/test/scala/filodb/http/PrometheusApiRouteSpec.scala @@ -27,7 +27,7 @@ class PrometheusApiRouteSpec extends FunSpec with ScalatestRouteTest with AsyncT val cluster = FilodbCluster(system) val probe = TestProbe() - implicit val timeout = RouteTestTimeout(10.minute) + implicit val timeout = RouteTestTimeout(20.minute) cluster.coordinatorActor cluster.join() val clusterProxy = cluster.clusterSingleton(ClusterRole.Server, None) @@ -38,9 +38,11 @@ class PrometheusApiRouteSpec extends FunSpec with ScalatestRouteTest with AsyncT val prometheusAPIRoute = (new PrometheusApiRoute(cluster.coordinatorActor, settings)).route private def setupDataset(): Unit = { - val command = SetupDataset(FormatConversion.dataset.ref, DatasetResourceSpec(2, 1), noOpSource, TestData.storeConf) + val command = SetupDataset(FormatConversion.dataset.ref, DatasetResourceSpec(8, 1), noOpSource, TestData.storeConf) probe.send(clusterProxy, command) probe.expectMsg(DatasetVerified) + // Give the coordinator nodes some time to get started + Thread sleep 5000 } before { @@ -55,8 +57,6 @@ class PrometheusApiRouteSpec extends FunSpec with ScalatestRouteTest with AsyncT it("should get explainPlan for query") { setupDataset() - // Give the coordinator nodes some time to get started - Thread sleep 2000 val query = "heap_usage{_ns=\"App-0\"}" Get(s"/promql/prometheus/api/v1/query_range?query=${query}&" + @@ -73,9 +73,55 @@ class PrometheusApiRouteSpec extends FunSpec with ScalatestRouteTest with AsyncT resp.debugInfo(2) should startWith("--E~SelectRawPartitionsExec") resp.debugInfo(3) should startWith("-T~PeriodicSamplesMapper") resp.debugInfo(4) should startWith("--E~SelectRawPartitionsExec") + } + } + + it("should take spread override value from config for app") { + setupDataset() + val query = "heap_usage{_ns=\"App-0\"}" + + Get(s"/promql/prometheus/api/v1/query_range?query=${query}&" + + s"start=1555427432&end=1555447432&step=15&explainOnly=true") ~> prometheusAPIRoute ~> check { + + handled shouldBe true + status shouldEqual StatusCodes.OK + contentType shouldEqual ContentTypes.`application/json` + val resp = responseAs[ExplainPlanResponse] + resp.status shouldEqual "success" + resp.debugInfo.filter(_.startsWith("--E~SelectRawPartitionsExec")).length shouldEqual 4 + } + } + + it("should get explainPlan for query based on spread as query parameter") { + setupDataset() + val query = "heap_usage{_ns=\"App-1\"}" + Get(s"/promql/prometheus/api/v1/query_range?query=${query}&" + + s"start=1555427432&end=1555447432&step=15&explainOnly=true&spread=3") ~> prometheusAPIRoute ~> check { + handled shouldBe true + status shouldEqual StatusCodes.OK + contentType shouldEqual ContentTypes.`application/json` + val resp = responseAs[ExplainPlanResponse] + resp.status shouldEqual "success" + resp.debugInfo.filter(_.startsWith("--E~SelectRawPartitionsExec")).length shouldEqual 8 } } + + it("should take default spread value if there is no override") { + setupDataset() + val query = "heap_usage{_ns=\"App-1\"}" + + Get(s"/promql/prometheus/api/v1/query_range?query=${query}&" + + s"start=1555427432&end=1555447432&step=15&explainOnly=true") ~> prometheusAPIRoute ~> check { + + handled shouldBe true + status shouldEqual StatusCodes.OK + contentType shouldEqual ContentTypes.`application/json` + val resp = responseAs[ExplainPlanResponse] + resp.status shouldEqual "success" + resp.debugInfo.filter(_.startsWith("--E~SelectRawPartitionsExec")).length shouldEqual 2 + } + } } diff --git a/jmh/src/main/scala/filodb.jmh/EncodingBenchmark.scala b/jmh/src/main/scala/filodb.jmh/EncodingBenchmark.scala index eb5a06b6ca..9ea310d729 100644 --- a/jmh/src/main/scala/filodb.jmh/EncodingBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/EncodingBenchmark.scala @@ -119,7 +119,7 @@ class EncodingBenchmark { @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) def newDictUtf8VectorEncoding(): Unit = { - val hint = Encodings.AutoDictString(samplingRate=0.5) + val hint = Encodings.AutoDictString(samplingRate = 0.5) UTF8Vector(memFactory, utf8strings).optimize(memFactory, hint) } } \ No newline at end of file diff --git a/jmh/src/main/scala/filodb.jmh/GatewayBenchmark.scala b/jmh/src/main/scala/filodb.jmh/GatewayBenchmark.scala index 14232331b7..d4a805b8be 100644 --- a/jmh/src/main/scala/filodb.jmh/GatewayBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/GatewayBenchmark.scala @@ -44,7 +44,7 @@ class GatewayBenchmark extends StrictLogging { val singlePromTSBytes = timeseries(tagMap).toByteArray - val singleInfluxRec = s"${tagMap("__name__")},${influxTags.map{case (k,v) => s"$k=$v"}.mkString(",")} " + + val singleInfluxRec = s"${tagMap("__name__")}, ${influxTags.map{case (k, v) => s"$k=$v"}.mkString(",")} " + s"counter=$value ${initTimestamp}000000" val singleInfluxBuf = ChannelBuffers.buffer(1024) singleInfluxBuf.writeBytes(singleInfluxRec.getBytes) @@ -52,7 +52,7 @@ class GatewayBenchmark extends StrictLogging { // Histogram containing 8 buckets + sum and count val histBuckets = Map("0.025" -> 0, "0.05" -> 0, "0.1" -> 2, "0.25" -> 2, - "0.5" -> 5, "1.0" -> 9, "2.5" -> 11, "+Inf" -> 11) + "0.5" -> 5, "1.0" -> 9, "2.5" -> 11, "+Inf" -> 11) val histSum = histBuckets.values.sum val histPromSeries = @@ -62,8 +62,8 @@ class GatewayBenchmark extends StrictLogging { timeseries(tagMap ++ Map("__name__" -> "heap_usage_count"), histBuckets.size)) val histPromBytes = histPromSeries.map(_.toByteArray) - val histInfluxRec = s"${tagMap("__name__")},${influxTags.map{case (k,v) => s"$k=$v"}.mkString(",")} " + - s"${histBuckets.map { case (k,v) => s"$k=$v"}.mkString(",") },sum=$histSum,count=8 " + + val histInfluxRec = s"${tagMap("__name__")}, ${influxTags.map{case (k, v) => s"$k=$v"}.mkString(",")} " + + s"${histBuckets.map { case (k, v) => s"$k=$v"}.mkString(",") }, sum=$histSum,count=8 " + s"${initTimestamp}000000" val histInfluxBuf = ChannelBuffers.buffer(1024) histInfluxBuf.writeBytes(histInfluxRec.getBytes) diff --git a/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala b/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala index f8df8dc4e4..ca73a59144 100644 --- a/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala @@ -95,8 +95,8 @@ class HistogramIngestBenchmark { @Benchmark @BenchmarkMode(Array(Mode.SingleShotTime)) @OutputTimeUnit(TimeUnit.SECONDS) - @Warmup(batchSize=50) - @Measurement(batchSize=100) + @Warmup(batchSize = 50) + @Measurement(batchSize = 100) def ingestHistColumn1(): Unit = { hShard.ingest(histContainers(containerNo), 0) containerNo += 1 @@ -113,8 +113,8 @@ class HistogramIngestBenchmark { @Benchmark @BenchmarkMode(Array(Mode.SingleShotTime)) @OutputTimeUnit(TimeUnit.SECONDS) - @Warmup(batchSize=50) - @Measurement(batchSize=100) + @Warmup(batchSize = 50) + @Measurement(batchSize = 100) def ingestPromHistograms(): Unit = { pShard.ingest(promContainers(containerNo), 0) containerNo += 1 diff --git a/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala index 72a252b20e..0bbc5b4bbd 100644 --- a/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala @@ -85,7 +85,8 @@ class HistogramQueryBenchmark { // Single-threaded query test val numQueries = 500 - val qOptions = QueryOptions(1, 100).copy(shardOverrides = Some(Seq(0))) + val qOptions = QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 1))), 100). + copy(shardOverrides = Some(Seq(0))) val hLogicalPlan = Parser.queryToLogicalPlan(histQuery, startTime/1000) val hExecPlan = hEngine.materialize(hLogicalPlan, qOptions) val querySched = Scheduler.singleThread(s"benchmark-query") diff --git a/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala b/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala index a78dd553ec..baabb4ddc5 100644 --- a/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala @@ -30,7 +30,7 @@ class IngestionBenchmark { org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.ERROR) // # of records in a container to test ingestion speed - val dataStream = withMap(linearMultiSeries(), extraTags=extraTags) + val dataStream = withMap(linearMultiSeries(), extraTags = extraTags) val schemaWithPredefKeys = RecordSchema.ingestion(dataset2, Seq("job", "instance")) diff --git a/jmh/src/main/scala/filodb.jmh/IntSumReadBenchmark.scala b/jmh/src/main/scala/filodb.jmh/IntSumReadBenchmark.scala index 382ea8c12e..377ea0699d 100644 --- a/jmh/src/main/scala/filodb.jmh/IntSumReadBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/IntSumReadBenchmark.scala @@ -9,12 +9,13 @@ import org.openjdk.jmh.annotations._ import scalaxy.loops._ import filodb.core.{NamesTestData, TestData} -import filodb.core.metadata.Dataset +import filodb.core.metadata.{Dataset, DatasetOptions} import filodb.core.store.ChunkSet import filodb.memory.format.{vectors => bv, TupleRowReader, UnsafeUtils} object IntSumReadBenchmark { - val dataset = Dataset("dataset", Seq("part:int"), Seq("int:int", "rownum:long"), "rownum") + val dataset = Dataset("dataset", Seq("part:int"), Seq("int:int", "rownum:long"), "rownum", + DatasetOptions.DefaultOptions) val rowIt = Iterator.from(0).map { row => (Some(scala.util.Random.nextInt), Some(row.toLong), Some(0)) } val partKey = NamesTestData.defaultPartKey val rowColumns = Seq("int", "rownum", "part") diff --git a/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala b/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala index 8a0df92511..9124fb08d9 100644 --- a/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala @@ -128,7 +128,7 @@ class QueryAndIngestBenchmark extends StrictLogging { val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60) val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) } val queryCommands = logicalPlans.map { plan => - LogicalPlan2Query(dataset.ref, plan, QueryOptions(1, 1000000)) + LogicalPlan2Query(dataset.ref, plan, QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 1))), 1000000)) } private var testProducingFut: Option[Future[Unit]] = None diff --git a/jmh/src/main/scala/filodb.jmh/QueryHiCardInMemoryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/QueryHiCardInMemoryBenchmark.scala index 0836281ecd..7d6e7288da 100644 --- a/jmh/src/main/scala/filodb.jmh/QueryHiCardInMemoryBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/QueryHiCardInMemoryBenchmark.scala @@ -110,7 +110,8 @@ class QueryHiCardInMemoryBenchmark extends StrictLogging { private def toExecPlan(query: String): ExecPlan = { val queryStartTime = ingestionStartTime + 5.minutes.toMillis // 5 minutes from start until 60 minutes from start val qParams = TimeStepParams(queryStartTime/1000, queryStep, queryStartTime/1000 + queryIntervalSec) - val execPlan = engine.materialize(Parser.queryRangeToLogicalPlan(query, qParams), QueryOptions(0, 20000)) + val execPlan = engine.materialize(Parser.queryRangeToLogicalPlan(query, qParams), + QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 0))), 20000)) var child = execPlan while (child.children.size > 0) child = child.children(0) child diff --git a/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala index 66b46d3b49..c40f76de83 100644 --- a/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala @@ -119,7 +119,7 @@ class QueryInMemoryBenchmark extends StrictLogging { val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60) val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) } val queryCommands = logicalPlans.map { plan => - LogicalPlan2Query(dataset.ref, plan, QueryOptions(1, 20000)) + LogicalPlan2Query(dataset.ref, plan, QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 1))), 20000)) } @TearDown @@ -148,7 +148,7 @@ class QueryInMemoryBenchmark extends StrictLogging { val qParams2 = TimeStepParams(queryTime/1000, noOverlapStep, (queryTime/1000) + queryIntervalMin*60) val logicalPlans2 = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams2) } val queryCommands2 = logicalPlans2.map { plan => - LogicalPlan2Query(dataset.ref, plan, QueryOptions(1, 10000)) + LogicalPlan2Query(dataset.ref, plan, QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 1))), 10000)) } @Benchmark @@ -168,7 +168,7 @@ class QueryInMemoryBenchmark extends StrictLogging { } // Single-threaded query test - val qOptions = QueryOptions(1, 10000) + val qOptions = QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 1))), 10000) val logicalPlan = Parser.queryRangeToLogicalPlan(rawQuery, qParams) // Pick the children nodes, not the DistConcatExec. Thus we can run in a single thread this way val execPlan = engine.materialize(logicalPlan, qOptions).children.head diff --git a/jmh/src/main/scala/filodb.jmh/QueryOnDemandBenchmark.scala b/jmh/src/main/scala/filodb.jmh/QueryOnDemandBenchmark.scala index 7f919bfb3c..dbc3945dbd 100644 --- a/jmh/src/main/scala/filodb.jmh/QueryOnDemandBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/QueryOnDemandBenchmark.scala @@ -135,7 +135,7 @@ class QueryOnDemandBenchmark extends StrictLogging { val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60) val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) } val queryCommands = logicalPlans.map { plan => - LogicalPlan2Query(dataset.ref, plan, QueryOptions(1, 20000)) + LogicalPlan2Query(dataset.ref, plan, QueryOptions(Some(new StaticSpreadProvider(SpreadChange(0, 1))), 20000)) } @TearDown diff --git a/kafka/src/main/scala/filodb/kafka/KafkaDownsamplePublisher.scala b/kafka/src/main/scala/filodb/kafka/KafkaDownsamplePublisher.scala index 3b3981fbb0..730e026ff1 100644 --- a/kafka/src/main/scala/filodb/kafka/KafkaDownsamplePublisher.scala +++ b/kafka/src/main/scala/filodb/kafka/KafkaDownsamplePublisher.scala @@ -51,7 +51,7 @@ class KafkaDownsamplePublisher(downsampleConfig: Config) extends DownsamplePubli override def start(): Unit = { logger.info(s"Starting Kafka Downsampling Publisher. Will be publishing to $topics with config: $kafkaConfig") - producer = new KafkaProducer(kafkaConfig.asJava) + producer = new KafkaProducer(kafkaConfig.asJava) } override def stop(): Unit = { diff --git a/kafka/src/main/scala/filodb/kafka/PartitionedConsumerObservable.scala b/kafka/src/main/scala/filodb/kafka/PartitionedConsumerObservable.scala index b425dea058..5dddd56bc5 100644 --- a/kafka/src/main/scala/filodb/kafka/PartitionedConsumerObservable.scala +++ b/kafka/src/main/scala/filodb/kafka/PartitionedConsumerObservable.scala @@ -51,7 +51,7 @@ private[filodb] class PartitionedConsumerObservable[K, V] private consumer: Task[KafkaConsumer[K, V]]) extends Observable[ConsumerRecord[K, V]] { - override def unsafeSubscribeFn(out: Subscriber[ConsumerRecord[K,V]]): Cancelable = { + override def unsafeSubscribeFn(out: Subscriber[ConsumerRecord[K, V]]): Cancelable = { import out.scheduler feedTask(out).runAsync(new Callback[Unit] { diff --git a/memory/src/main/scala/filodb.memory/BlockManager.scala b/memory/src/main/scala/filodb.memory/BlockManager.scala index 7096858a8b..8cc17026b2 100644 --- a/memory/src/main/scala/filodb.memory/BlockManager.scala +++ b/memory/src/main/scala/filodb.memory/BlockManager.scala @@ -192,7 +192,7 @@ class PageAlignedBlockManager(val totalMemorySizeInBytes: Long, // if we do not get required blocks even after reclaim call if (reclaimed < num) { logger.warn(s"$num blocks to reclaim but only reclaimed $reclaimed. usedblocks=${usedBlocks.size} " + - s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n,l) => (n, l.size)}}") + s"usedBlocksTimeOrdered=${usedBlocksTimeOrdered.asScala.toList.map{case(n, l) => (n, l.size)}}") } def reclaimFrom(list: util.LinkedList[Block], reclaimedCounter: Counter): Unit = { diff --git a/memory/src/main/scala/filodb.memory/data/ChunkMap.scala b/memory/src/main/scala/filodb.memory/data/ChunkMap.scala index 4aa72a0be1..b7ba4ef8f7 100644 --- a/memory/src/main/scala/filodb.memory/data/ChunkMap.scala +++ b/memory/src/main/scala/filodb.memory/data/ChunkMap.scala @@ -145,7 +145,7 @@ class ChunkMap(val memFactory: MemFactory, var capacity: Int) extends StrictLogg private var lockState: Int = 0 private var size: Int = 0 private var first: Int = 0 - private var arrayPtr = memFactory.allocateOffheap(capacity << 3, zero=true) + private var arrayPtr = memFactory.allocateOffheap(capacity << 3, zero = true) import ChunkMap._ diff --git a/memory/src/main/scala/filodb.memory/format/BinaryVector.scala b/memory/src/main/scala/filodb.memory/format/BinaryVector.scala index 69f5ef4cb8..eb3b66439d 100644 --- a/memory/src/main/scala/filodb.memory/format/BinaryVector.scala +++ b/memory/src/main/scala/filodb.memory/format/BinaryVector.scala @@ -542,7 +542,7 @@ extends BinaryAppendableVector[A] { UnsafeUtils.unsafe.setMemory(UnsafeUtils.ZeroPointer, bitmapOffset, bitmapMaskBufferSize, 0) - UnsafeUtils.setInt(addr, 8 + bitmapMaskBufferSize) + UnsafeUtils.setInt(addr, 8 + bitmapMaskBufferSize) BinaryVector.writeMajorAndSubType(addr, vectMajorType, vectSubType) val subVectOffset = 12 + bitmapMaskBufferSize UnsafeUtils.setInt(addr + 8, subVectOffset) diff --git a/memory/src/main/scala/filodb.memory/format/RowToVectorBuilder.scala b/memory/src/main/scala/filodb.memory/format/RowToVectorBuilder.scala index 6637eeb9be..ca123ee1b5 100644 --- a/memory/src/main/scala/filodb.memory/format/RowToVectorBuilder.scala +++ b/memory/src/main/scala/filodb.memory/format/RowToVectorBuilder.scala @@ -58,7 +58,7 @@ object RowToVectorBuilder { class RowToVectorBuilder(schema: Seq[VectorInfo], memFactory: MemFactory) { import RowToVectorBuilder._ val builders = schema.zipWithIndex.map { - case (VectorInfo(_, dataType),index)=> dataType match { + case (VectorInfo(_, dataType), index)=> dataType match { case Classes.Int => IntBinaryVector.appendingVector(memFactory, MaxElements) case Classes.Long => LongBinaryVector.appendingVector(memFactory, MaxElements) case Classes.Double => DoubleVector.appendingVector(memFactory, MaxElements) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/ConstVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/ConstVector.scala index 73fc8987a0..703bb90c8c 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/ConstVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/ConstVector.scala @@ -26,7 +26,7 @@ object ConstVector { */ def make(memFactory: MemFactory, len: Int, neededBytes: Int)(fillBytes: NativePointer => Unit): BinaryVectorPtr = { val (_, addr, _) = memFactory.allocate(12 + neededBytes) - UnsafeUtils.setInt(ZeroPointer, addr, 8 + neededBytes) + UnsafeUtils.setInt(ZeroPointer, addr, 8 + neededBytes) UnsafeUtils.setInt(ZeroPointer, addr + 4, WireFormat(WireFormat.VECTORTYPE_BINSIMPLE, WireFormat.SUBTYPE_REPEATED)) UnsafeUtils.setInt(ZeroPointer, addr + 8, len) fillBytes(addr + DataOffset) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/DeltaDeltaVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/DeltaDeltaVector.scala index e1983861e6..5bbfe734d6 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/DeltaDeltaVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/DeltaDeltaVector.scala @@ -96,7 +96,7 @@ object DeltaDeltaVector { */ def const(memFactory: MemFactory, numElements: Int, initValue: Long, slope: Int): BinaryVectorPtr = { val addr = memFactory.allocateOffheap(24) - UnsafeUtils.setInt(addr, 20) + UnsafeUtils.setInt(addr, 20) UnsafeUtils.setInt(addr + 4, WireFormat(VECTORTYPE_DELTA2, SUBTYPE_REPEATED)) UnsafeUtils.setInt(addr + 8, numElements) UnsafeUtils.setLong(addr + 12, initValue) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/DictUTF8Vector.scala b/memory/src/main/scala/filodb.memory/format/vectors/DictUTF8Vector.scala index 3e05c66c78..2869ae0ce6 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/DictUTF8Vector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/DictUTF8Vector.scala @@ -87,7 +87,7 @@ object DictUTF8Vector { codeVect.addVector(info.codes) // Write 12 bytes of metadata at beginning - UnsafeUtils.setInt(addr, bytesRequired - 4) + UnsafeUtils.setInt(addr, bytesRequired - 4) UnsafeUtils.setInt(addr + 4, WireFormat(VECTORTYPE_BINDICT, SUBTYPE_UTF8)) UnsafeUtils.setInt(addr + 8, 12 + dictVectSize) addr diff --git a/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala index aea1ec93d5..bc741b30df 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala @@ -20,8 +20,8 @@ object DoubleVector { def appendingVector(memFactory: MemFactory, maxElements: Int): BinaryAppendableVector[Double] = { val bytesRequired = 12 + BitmapMask.numBytesRequired(maxElements) + 8 + 8 * maxElements val addr = memFactory.allocateOffheap(bytesRequired) - val dispose = () => memFactory.freeMemory(addr) - GrowableVector(memFactory,new MaskedDoubleAppendingVector(addr, bytesRequired, maxElements, dispose)) + val dispose = () => memFactory.freeMemory(addr) + GrowableVector(memFactory, new MaskedDoubleAppendingVector(addr, bytesRequired, maxElements, dispose)) } /** @@ -32,7 +32,7 @@ object DoubleVector { def appendingVectorNoNA(memFactory: MemFactory, maxElements: Int): BinaryAppendableVector[Double] = { val bytesRequired = 8 + 8 * maxElements val addr = memFactory.allocateOffheap(bytesRequired) - val dispose = () => memFactory.freeMemory(addr) + val dispose = () => memFactory.freeMemory(addr) new DoubleAppendingVector(addr, bytesRequired, dispose) } @@ -54,8 +54,8 @@ object DoubleVector { * DoubleVectorDataReader object for parsing it */ def apply(vector: BinaryVectorPtr): DoubleVectorDataReader = BinaryVector.vectorType(vector) match { - case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_INT_NOMASK) => DoubleLongWrapDataReader - case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_REPEATED) => DoubleLongWrapDataReader + case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_INT_NOMASK) => DoubleLongWrapDataReader + case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_REPEATED) => DoubleLongWrapDataReader case x if x == WireFormat(VECTORTYPE_BINSIMPLE, SUBTYPE_PRIMITIVE) => MaskedDoubleDataReader case x if x == WireFormat(VECTORTYPE_BINSIMPLE, SUBTYPE_PRIMITIVE_NOMASK) => DoubleVectorDataReader64 } diff --git a/memory/src/main/scala/filodb.memory/format/vectors/LongBinaryVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/LongBinaryVector.scala index 3240ef3ab7..a150b7b276 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/LongBinaryVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/LongBinaryVector.scala @@ -19,7 +19,7 @@ object LongBinaryVector { def appendingVector(memFactory: MemFactory, maxElements: Int): BinaryAppendableVector[Long] = { val bytesRequired = 12 + BitmapMask.numBytesRequired(maxElements) + 8 + 8 * maxElements val addr = memFactory.allocateOffheap(bytesRequired) - val dispose = () => memFactory.freeMemory(addr) + val dispose = () => memFactory.freeMemory(addr) GrowableVector(memFactory, new MaskedLongAppendingVector(addr, bytesRequired, maxElements, dispose)) } @@ -30,7 +30,7 @@ object LongBinaryVector { def appendingVectorNoNA(memFactory: MemFactory, maxElements: Int): BinaryAppendableVector[Long] = { val bytesRequired = 8 + 8 * maxElements val addr = memFactory.allocateOffheap(bytesRequired) - val dispose = () => memFactory.freeMemory(addr) + val dispose = () => memFactory.freeMemory(addr) new LongAppendingVector(addr, bytesRequired, dispose) } @@ -41,7 +41,7 @@ object LongBinaryVector { def timestampVector(memFactory: MemFactory, maxElements: Int): BinaryAppendableVector[Long] = { val bytesRequired = 8 + 8 * maxElements val addr = memFactory.allocateOffheap(bytesRequired) - val dispose = () => memFactory.freeMemory(addr) + val dispose = () => memFactory.freeMemory(addr) new TimestampAppendingVector(addr, bytesRequired, dispose) } @@ -54,8 +54,8 @@ object LongBinaryVector { * LongVectorDataReader object for parsing it */ def apply(vector: BinaryVectorPtr): LongVectorDataReader = BinaryVector.vectorType(vector) match { - case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_INT_NOMASK) => DeltaDeltaDataReader - case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_REPEATED) => DeltaDeltaConstDataReader + case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_INT_NOMASK) => DeltaDeltaDataReader + case x if x == WireFormat(VECTORTYPE_DELTA2, SUBTYPE_REPEATED) => DeltaDeltaConstDataReader case x if x == WireFormat(VECTORTYPE_BINSIMPLE, SUBTYPE_PRIMITIVE) => MaskedLongDataReader case x if x == WireFormat(VECTORTYPE_BINSIMPLE, SUBTYPE_PRIMITIVE_NOMASK) => LongVectorDataReader64 } diff --git a/query/src/main/scala/filodb/query/exec/TransientRow.scala b/query/src/main/scala/filodb/query/exec/TransientRow.scala index 0a8aeb9b7b..f5af0b4536 100644 --- a/query/src/main/scala/filodb/query/exec/TransientRow.scala +++ b/query/src/main/scala/filodb/query/exec/TransientRow.scala @@ -148,10 +148,10 @@ final class QuantileAggTransientRow() extends MutableRowReader { def getFloat(columnNo: Int): Float = ??? def getString(columnNo: Int): String = ??? def getAny(columnNo: Int): Any = ??? - def getBlobBase(columnNo: Int): Any = if (columnNo == 1) blobBase - else throw new IllegalArgumentException() - def getBlobOffset(columnNo: Int): Long = if (columnNo == 1) blobOffset - else throw new IllegalArgumentException() + def getBlobBase(columnNo: Int): Any = if (columnNo == 1) blobBase + else throw new IllegalArgumentException() + def getBlobOffset(columnNo: Int): Long = if (columnNo == 1) blobOffset + else throw new IllegalArgumentException() def getBlobNumBytes(columnNo: Int): Int = if (columnNo == 1) blobLength else throw new IllegalArgumentException() diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 845bbe3cb2..b3edf81229 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -129,7 +129,7 @@ - COLON, IF + COLON, IF, EQUALS, COMMA diff --git a/spark/src/main/scala/filodb.spark/FiloRelation.scala b/spark/src/main/scala/filodb.spark/FiloRelation.scala index 16a1e9f953..197369b26e 100644 --- a/spark/src/main/scala/filodb.spark/FiloRelation.scala +++ b/spark/src/main/scala/filodb.spark/FiloRelation.scala @@ -102,9 +102,9 @@ object FiloRelation extends StrictLogging { } val betweenRangePF: PartialFunction[(Column, Seq[Filter]), Option[(Any, Any)]] = { - case (_, Seq(GreaterThan(_, lVal), LessThan(_, rVal))) => Some((lVal, rVal)) + case (_, Seq(GreaterThan(_, lVal), LessThan(_, rVal))) => Some((lVal, rVal)) case (_, Seq(GreaterThanOrEqual(_, lVal), LessThan(_, rVal))) => Some((lVal, rVal)) - case (_, Seq(GreaterThan(_, lVal), LessThanOrEqual(_, rVal))) => Some((lVal, rVal)) + case (_, Seq(GreaterThan(_, lVal), LessThanOrEqual(_, rVal))) => Some((lVal, rVal)) case (_, Seq(GreaterThanOrEqual(_, lVal), LessThanOrEqual(_, rVal))) => Some((lVal, rVal)) }