Skip to content

Commit

Permalink
Merge develop to 0.8.5-Integration - Spread override and app based sp…
Browse files Browse the repository at this point in the history
…read support
  • Loading branch information
tjackpaul authored Jun 5, 2019
2 parents 288da07 + 32c4078 commit 75b5c44
Show file tree
Hide file tree
Showing 53 changed files with 327 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ sealed class PartitionIndexTable(val dataset: DatasetRef,
|) WITH compression = {
'sstable_compression': '$sstableCompression'}""".stripMargin

lazy val readCql =s"SELECT segmentid, segment " +
lazy val readCql = s"SELECT segmentid, segment " +
s"FROM $tableString WHERE shard = ? AND timebucket = ? order by segmentid asc"

lazy val writePartitionCql = session.prepare(
Expand All @@ -51,7 +51,7 @@ sealed class PartitionIndexTable(val dataset: DatasetRef,
// fetch size should be low since each row is about an MB. Default fetchSize can result in ReadTimeouts at server
val it = session.execute(new SimpleStatement(readCql, shard: JInt, timeBucket: JInt).setFetchSize(15))
.asScala.toIterator.map(row => {
PartKeyTimeBucketSegment(row.getInt("segmentid"), row.getBytes("segment"))
PartKeyTimeBucketSegment(row.getInt("segmentid"), row.getBytes("segment"))
})
Observable.fromIterator(it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class CassandraMetaStore(config: Config, filoSessionProvider: Option[FiloSession
checkpointTable.writeCheckpoint(dataset, shardNum, groupNum, offset)
}

def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int,Long]] = {
def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int, Long]] = {
checkpointTable.readCheckpoints(dataset, shardNum)
}

def readEarliestCheckpoint(dataset: DatasetRef, shardNum: Int): Future[Long] = {
readCheckpoints(dataset,shardNum) map { m =>
readCheckpoints(dataset, shardNum) map { m =>
if (m.values.isEmpty) Long.MinValue else m.values.min
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ sealed class CheckpointTable(val config: Config,
dataset.dataset, shardNum: JInt, groupNum: JInt, offset: JLong))
}

def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int,Long]] = {
def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int, Long]] = {
session.executeAsync(readCheckpointCql.bind(dataset.database.getOrElse(""),
dataset.dataset, shardNum: JInt))
.toIterator // future of Iterator
Expand Down
18 changes: 10 additions & 8 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import monix.reactive.Observable

import filodb.coordinator._
import filodb.coordinator.client._
import filodb.coordinator.client.QueryCommands.{SpreadChange, SpreadProvider, StaticSpreadProvider}
import filodb.core._
import filodb.core.metadata.{Column, Dataset, DatasetOptions}
import filodb.core.store._
Expand Down Expand Up @@ -60,6 +61,7 @@ class Arguments extends FieldArgs {
var ignoreTagsOnPartitionKeyHash: Seq[String] = Nil
var everyNSeconds: Option[String] = None
var shards: Option[Seq[String]] = None
var spread: Option[Integer] = None
}

object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbClusterNode {
Expand Down Expand Up @@ -116,7 +118,6 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
}

def main(args: Arguments): Unit = {
val spread = config.getInt("default-spread")
try {
val timeout = args.timeoutSeconds.seconds
args.command match {
Expand Down Expand Up @@ -198,15 +199,15 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
require(args.host.nonEmpty && args.dataset.nonEmpty && args.matcher.nonEmpty, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), spread)
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseTimeSeriesMetadataQuery(remote, args.matcher.get, args.dataset.get,
getQueryRange(args), options)

case Some("labelValues") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.labelNames.nonEmpty, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), spread)
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseLabelValuesQuery(remote, args.labelNames, args.labelFilter, args.dataset.get,
getQueryRange(args), options)

Expand All @@ -216,7 +217,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), spread)
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parsePromQuery2(remote, query, args.dataset.get, getQueryRange(args), options)
}.orElse {
args.select.map { selectCols =>
Expand Down Expand Up @@ -267,13 +268,13 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
ignoreTagsOnPartitionKeyHash: Seq[String],
timeout: FiniteDuration): Unit = {
try {
val datasetObj = Dataset(dataset.dataset, partitionColumns, dataColumns, rowKeys, downsamplers)
val options = DatasetOptions.DefaultOptions.copy(metricColumn = metricColumn,
shardKeyColumns = shardKeyColumns,
ignoreShardKeyColumnSuffixes = ignoreShardKeyColumnSuffixes,
ignoreTagsOnPartitionKeyHash = ignoreTagsOnPartitionKeyHash)
val datasetObj = Dataset(dataset.dataset, partitionColumns, dataColumns, rowKeys, downsamplers, options)
println(s"Creating dataset $dataset with options $options...")
client.createNewDataset(datasetObj.copy(options = options), dataset.database)
client.createNewDataset(datasetObj, dataset.database)
exitCode = 0
} catch {
case b: Dataset.BadSchemaError =>
Expand Down Expand Up @@ -339,7 +340,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
final case class QOptions(limit: Int, sampleLimit: Int,
everyN: Option[Int], timeout: FiniteDuration,
shardOverrides: Option[Seq[Int]],
spread: Int)
spread: Option[Integer])

def parseTimeSeriesMetadataQuery(client: LocalClient, query: String, dataset: String,
timeParams: TimeRangeParams,
Expand All @@ -364,7 +365,8 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste

def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions): Unit = {
val ref = DatasetRef(dataset)
val qOpts = QueryCommands.QueryOptions(options.spread, options.sampleLimit)
val spreadProvider: Option[SpreadProvider] = options.spread.map(s => StaticSpreadProvider(SpreadChange(0, s)))
val qOpts = QueryCommands.QueryOptions(spreadProvider, options.sampleLimit)
.copy(queryTimeoutSecs = options.timeout.toSeconds.toInt,
shardOverrides = options.shardOverrides)
println(s"Sending query command to server for $ref with options $qOpts...")
Expand Down
13 changes: 13 additions & 0 deletions conf/timeseries-filodb-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ filodb {
port = 9042
partition-list-num-groups = 1
}
spread-default = 1

# Override default spread for application using override block which will have non metric shard keys and spread.
spread-assignment = [
{
_ns = App-0,
_spread_ = 2
},
{
_ns = App-5,
_spread_ = 0
}
]
}

kamon {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class KamonMetricsLogReporter extends MetricReporter with StrictLogging {
}
}

private def formatTags(tags: Map[String, String]) = tags.view.map { case (k,v) => s"$k=$v" }.mkString(" ")
private def formatTags(tags: Map[String, String]) = tags.view.map { case (k, v) => s"$k=$v" }.mkString(" ")

private def normalizeLabelName(label: String): String =
label.map(charOrUnderscore)
Expand Down
30 changes: 26 additions & 4 deletions coordinator/src/main/scala/filodb.coordinator/QueryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package filodb.coordinator

import java.util.concurrent.atomic.AtomicLong

import scala.util.control.NonFatal

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.dispatch.{Envelope, UnboundedStablePriorityMailbox}
import com.typesafe.config.Config
import kamon.Kamon
import monix.execution.Scheduler
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader
import scala.util.control.NonFatal

import filodb.coordinator.queryengine2.QueryEngine
import filodb.core._
Expand Down Expand Up @@ -55,6 +56,23 @@ final class QueryActor(memStore: MemStore,

val config = context.system.settings.config

var filodbSpreadMap = new collection.mutable.HashMap[collection.Map[String, String], Int]
val applicationShardKeyName = dataset.options.nonMetricShardColumns(0)
val defaultSpread = config.getInt("filodb.spread-default")

implicit val spreadOverrideReader: ValueReader[SpreadAssignment] = ValueReader.relative { spreadAssignmentConfig =>
SpreadAssignment(
shardKeysMap = dataset.options.nonMetricShardColumns.map(x =>
(x, spreadAssignmentConfig.getString(x))).toMap[String, String],
spread = spreadAssignmentConfig.getInt("_spread_")
)
}
val spreadAssignment : List[SpreadAssignment]= config.as[List[SpreadAssignment]]("filodb.spread-assignment")
spreadAssignment.foreach{ x => filodbSpreadMap.put(x.shardKeysMap, x.spread)}

val spreadFunc = QueryOptions.simpleMapSpreadFunc(applicationShardKeyName, filodbSpreadMap, defaultSpread)
val functionalSpreadProvider = FunctionalSpreadProvider(spreadFunc)

val queryEngine2 = new QueryEngine(dataset, shardMapFunc)
val queryConfig = new QueryConfig(config.getConfig("filodb.query"))
val numSchedThreads = Math.ceil(config.getDouble("filodb.query.threads-factor") * sys.runtime.availableProcessors)
Expand Down Expand Up @@ -91,11 +109,15 @@ final class QueryActor(memStore: MemStore,
}
}

private def getSpreadProvider(queryOptions: QueryOptions): SpreadProvider = {
return queryOptions.spreadProvider.getOrElse(functionalSpreadProvider)
}

private def processLogicalPlan2Query(q: LogicalPlan2Query, replyTo: ActorRef) = {
// This is for CLI use only. Always prefer clients to materialize logical plan
lpRequests.increment
try {
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions)
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, getSpreadProvider(q.queryOptions))
self forward execPlan
} catch {
case NonFatal(ex) =>
Expand All @@ -106,7 +128,7 @@ final class QueryActor(memStore: MemStore,

private def processExplainPlanQuery(q: ExplainPlan2Query, replyTo: ActorRef) = {
try {
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions)
val execPlan = queryEngine2.materialize(q.logicalPlan, q.queryOptions, getSpreadProvider(q.queryOptions))
replyTo ! execPlan
} catch {
case NonFatal(ex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object QueryCommands {
Seq(spreadChange)
}
}
case class SpreadAssignment(shardKeysMap: collection.Map[String, String], spread: Int)

/**
* Serialize with care! would be based on the provided function.
Expand All @@ -60,37 +61,42 @@ object QueryCommands {
* This class provides general query processing parameters
* @param spreadFunc a function that returns chronologically ordered spread changes for the filter
*/
final case class QueryOptions(spreadProvider: SpreadProvider = StaticSpreadProvider(),
final case class QueryOptions(spreadProvider: Option[SpreadProvider] = None,
parallelism: Int = 16,
queryTimeoutSecs: Int = 30,
sampleLimit: Int = 1000000,
shardOverrides: Option[Seq[Int]] = None)

object QueryOptions {
def apply(constSpread: Int, sampleLimit: Int): QueryOptions =
QueryOptions(spreadProvider = StaticSpreadProvider(SpreadChange(0, constSpread)), sampleLimit = sampleLimit)
def apply(constSpread: Option[SpreadProvider], sampleLimit: Int): QueryOptions =
QueryOptions(spreadProvider = constSpread, sampleLimit = sampleLimit)

/**
* Creates a spreadFunc that looks for a particular filter with keyName Equals a value, and then maps values
* present in the spreadMap to specific spread values, with a default if the filter/value not present in the map
*/
def simpleMapSpreadFunc(keyName: String,
spreadMap: collection.Map[String, Int],
spreadMap: collection.mutable.Map[collection.Map[String, String], Int],
defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = {
filters: Seq[ColumnFilter] =>
filters.collectFirst {
case ColumnFilter(key, Filter.Equals(filtVal: String)) if key == keyName => filtVal
}.map { tagValue =>
Seq(SpreadChange(spread = spreadMap.getOrElse(tagValue, defaultSpread)))
Seq(SpreadChange(spread = spreadMap.getOrElse(collection.mutable.Map(keyName->tagValue), defaultSpread)))
}.getOrElse(Seq(SpreadChange(defaultSpread)))
}

import collection.JavaConverters._

def simpleMapSpreadFunc(keyName: String,
spreadMap: java.util.Map[String, Int],
defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] =
simpleMapSpreadFunc(keyName, spreadMap.asScala, defaultSpread)
spreadMap: java.util.Map[java.util.Map[String, String], Integer],
defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = {
val spreadAssignment: collection.mutable.Map[collection.Map[String, String], Int]= spreadMap.asScala.map {
case (d, v) => d.asScala -> v.toInt
}

simpleMapSpreadFunc(keyName, spreadAssignment, defaultSpread)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final case class ChildErrorResponse(source: ActorRef, resp: ErrorResponse) exten
object Utils extends StrictLogging {
import filodb.coordinator.client.QueryCommands._
import TrySugar._
import filodb.coordinator.client.QueryCommands._

/**
* Convert column name strings into columnIDs. NOTE: column names should not include row key columns
Expand All @@ -49,7 +50,8 @@ object Utils extends StrictLogging {
*/
def validatePartQuery(dataset: Dataset, shardMap: ShardMapper,
partQuery: PartitionQuery,
options: QueryOptions): Seq[PartitionScanMethod] Or ErrorResponse =
options: QueryOptions, spreadProvider: SpreadProvider):
Seq[PartitionScanMethod] Or ErrorResponse =
Try(partQuery match {
case SinglePartitionQuery(keyParts) =>
val partKey = dataset.partKey(keyParts: _*)
Expand All @@ -75,7 +77,7 @@ object Utils extends StrictLogging {
if (shardCols.length > 0) {
shardHashFromFilters(filters, shardCols, dataset) match {
case Some(shardHash) => shardMap.queryShards(shardHash,
options.spreadProvider.spreadFunc(filters).last.spread)
spreadProvider.spreadFunc(filters).last.spread)
case None => throw new IllegalArgumentException(s"Must specify filters for $shardCols")
}
} else {
Expand Down
Loading

0 comments on commit 75b5c44

Please sign in to comment.