Skip to content

Commit

Permalink
perf(query) Memoize the part of the logical plan tree traversal for r…
Browse files Browse the repository at this point in the history
…educed memory allocation and faster planning (#1874)
  • Loading branch information
amolnayak311 authored Nov 1, 2024
1 parent 376e7c6 commit 7f008e7
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 30 deletions.
2 changes: 1 addition & 1 deletion conf/logback-dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</encoder>
</appender>

<logger name="filodb.coordinator" level="DEBUG" />
<logger name="filodb.coordinator" level="INFO" />
<logger name="filodb.core" level="DEBUG" />
<logger name="filodb.memory" level="DEBUG" />
<logger name="filodb.query" level="DEBUG" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class ShardMapper(val numShards: Int) extends Serializable {
* Registers a new node to the given shards. Modifies state in place.
* Idempotent.
*/
private[coordinator] def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = {
def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = {
shards foreach {
case shard =>
//we always override the mapping. There was code earlier which prevent from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

import akka.actor.ActorRef
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon

Expand Down Expand Up @@ -65,6 +66,36 @@ class SingleClusterPlanner(val dataset: Dataset,
private val shardColumns = dsOptions.shardKeyColumns.sorted
private val dsRef = dataset.ref

private val shardPushdownCache: Option[Cache[(LogicalPlan, Option[Seq[Int]]), Option[Set[Int]]]] =
if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) {
Some(
Caffeine.newBuilder()
.maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize)
.recordStats()
.build()
)
} else {
None
}


private val tSchemaChangingCache: Option[Cache[(Seq[ColumnFilter], Long, Long), Some[Boolean]]] =
if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) {
Some(
Caffeine.newBuilder()
.maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize)
.recordStats()
.build()
)
} else {
None
}

private[queryplanner] def invalidateCaches(): Unit = {
shardPushdownCache.foreach(_.invalidateAll())
tSchemaChangingCache.foreach(_.invalidateAll())
}

// failed failover counter captures failovers which are not possible because at least one shard
// is down both on the primary and DR clusters, the query will get executed only when the
// partial results are acceptable otherwise an exception is thrown
Expand All @@ -80,14 +111,9 @@ class SingleClusterPlanner(val dataset: Dataset,
qContext.plannerParams.targetSchemaProviderOverride.getOrElse(_targetSchemaProvider)
}

/**
* Returns true iff a target-schema:
* (1) matches any shard-key matched by the argument filters, and
* (2) changes between the argument timestamps.
*/
def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
private def isTargetSchemaChangingInner(shardKeyFilters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
val keyToValues = shardKeyFilters.map { filter =>
val values = filter match {
case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) =>
Expand All @@ -97,6 +123,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}
(filter.column, values)
}.toMap

QueryUtils.makeAllKeyValueCombos(keyToValues).exists { shardKeys =>
// Replace any EqualsRegex shard-key filters with Equals.
val equalsFilters = shardKeys.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq
Expand All @@ -106,6 +133,25 @@ class SingleClusterPlanner(val dataset: Dataset,
}
}


/**
* Returns true iff a target-schema:
* (1) matches any shard-key matched by the argument filters, and
* (2) changes between the argument timestamps.
*/
def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
tSchemaChangingCache match {
case Some(cache) =>
cache.get((shardKeyFilters, startMs, endMs), _ => {
Some(isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext))
}).getOrElse(true)
case None =>
isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext)
}
}

/**
* Returns true iff a target-schema should be used to identify query shards.
* A target-schema should be used iff all of:
Expand Down Expand Up @@ -515,11 +561,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}
// scalastyle:on method.length

/**
* Returns a set of shards iff a plan can be pushed-down to each.
* See [[LogicalPlanUtils.getPushdownKeys]] for more info.
*/
private def getPushdownShards(qContext: QueryContext,
private def getPushdownShardsInner(qContext: QueryContext,
plan: LogicalPlan): Option[Set[Int]] = {
val getRawPushdownShards = (rs: RawSeries) => {
if (qContext.plannerParams.shardOverrides.isEmpty) {
Expand All @@ -534,6 +576,22 @@ class SingleClusterPlanner(val dataset: Dataset,
rs => LogicalPlan.getRawSeriesFilters(rs))
}

/**
* Returns a set of shards iff a plan can be pushed-down to each.
* See [[LogicalPlanUtils.getPushdownKeys]] for more info.
*/
private def getPushdownShards(qContext: QueryContext,
plan: LogicalPlan): Option[Set[Int]] = {
shardPushdownCache match {
case Some(cache) =>
cache.get((plan, qContext.plannerParams.shardOverrides), _ => {
getPushdownShardsInner(qContext, plan)
})
case None =>
getPushdownShardsInner(qContext, plan)
}
}

/**
* Materialize a BinaryJoin without the pushdown optimization.
* @param forceDispatcher If occupied, forces this BinaryJoin to be materialized with the dispatcher.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.coordinator.queryplanner

//scalastyle:off
import akka.actor.ActorSystem
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
Expand All @@ -21,10 +22,10 @@ import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import filodb.query.LogicalPlan.getRawSeriesFilters
import filodb.query.exec.aggregator.{CountRowAggregator, SumRowAggregator}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper


import scala.concurrent.duration._

object SingleClusterPlannerSpec {
Expand All @@ -51,7 +52,8 @@ object SingleClusterPlannerSpec {
}
}

class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFutures with PlanValidationSpec {
class SingleClusterPlannerSpec extends AnyFunSpec
with Matchers with ScalaFutures with BeforeAndAfterEach with PlanValidationSpec {

implicit val system = ActorSystem()
private val node = TestProbe().ref
Expand All @@ -71,6 +73,10 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
private val engine = new SingleClusterPlanner(dataset, schemas, mapperRef, earliestRetainedTimestampFn = 0,
queryConfig, "raw")

override def beforeEach(): Unit = {
engine.invalidateCaches()
}

/*
This is the PromQL
Expand Down Expand Up @@ -604,7 +610,6 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
}

it ("should pushdown BinaryJoins/Aggregates when valid") {

def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = {
Seq(SpreadChange(0, 1))
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ filodb {
# config to figure out the tenant column filter to enable max min column selection
max-min-tenant-column-filter = "_ws_"

single.cluster.cache {
enabled = true
# The maximum number of entries in the cache
cache-size = 2048
}
routing {
enable-remote-raw-exports = false
max-time-range-remote-raw-export = 180 minutes
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/filodb.core/query/QueryConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ object QueryConfig {
periodOfUncertaintyMs
)

val scCachingEnabled = queryConfig.as[Boolean]("single.cluster.cache.enabled")
val scCacheSize = queryConfig.as[Int]("single.cluster.cache.cache-size")
val cachingConfig = CachingConfig(scCachingEnabled, scCacheSize)

QueryConfig(askTimeout, staleSampleAfterMs, minStepMs, fastReduceMaxWindows, parser, translatePromToFilodbHistogram,
fasterRateEnabled, routingConfig.as[Option[String]]("partition_name"),
routingConfig.as[Option[Long]]("remote.http.timeout"),
Expand All @@ -52,7 +56,7 @@ object QueryConfig {
allowPartialResultsRangeQuery, allowPartialResultsMetadataQuery,
grpcDenyList.split(",").map(_.trim.toLowerCase).toSet,
None,
containerOverrides, rc)
containerOverrides, rc, cachingConfig)
}

import scala.concurrent.duration._
Expand Down Expand Up @@ -84,6 +88,11 @@ case class RoutingConfig(
periodOfUncertaintyMs: Long = (5 minutes).toMillis
)

case class CachingConfig(
singleClusterPlannerCachingEnabled: Boolean = true,
singleClusterPlannerCachingSize: Int = 2048
)

case class QueryConfig(askTimeout: FiniteDuration,
staleSampleAfterMs: Long,
minStepMs: Long,
Expand All @@ -102,4 +111,5 @@ case class QueryConfig(askTimeout: FiniteDuration,
grpcPartitionsDenyList: Set[String] = Set.empty,
plannerSelector: Option[String] = None,
recordContainerOverrides: Map[String, Int] = Map.empty,
routingConfig: RoutingConfig = RoutingConfig())
routingConfig: RoutingConfig = RoutingConfig(),
cachingConfig: CachingConfig = CachingConfig())
27 changes: 19 additions & 8 deletions core/src/main/scala/filodb.core/query/QueryUtils.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package filodb.core.query

import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Storage for utility functions.
*/
object QueryUtils {
val REGEX_CHARS = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\')
val REGEX_CHARS: Array[Char] = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\')
private val COMBO_CACHE_SIZE = 2048

private val regexCharsMinusPipe = (REGEX_CHARS.toSet - '|').toArray

private val comboCache: Cache[Map[String, Seq[String]], Seq[Map[String, String]]] =
Caffeine.newBuilder()
.maximumSize(COMBO_CACHE_SIZE)
.recordStats()
.build()


/**
* Returns true iff the argument string contains any special regex chars.
*/
Expand Down Expand Up @@ -72,7 +81,7 @@ object QueryUtils {
splits.append(left)
remaining = right
// count of all characters before the remaining suffix (+1 to account for pipe)
offset = offset + left.size + 1
offset = offset + left.length + 1
}
splits.append(remaining)
splits
Expand All @@ -89,11 +98,13 @@ object QueryUtils {
def makeAllKeyValueCombos(keyToValues: Map[String, Seq[String]]): Seq[Map[String, String]] = {
// Store the entries with some order, then find all possible value combos s.t. each combo's
// ith value is a value of the ith key.
val entries = keyToValues.toSeq
val keys = entries.map(_._1)
val vals = entries.map(_._2.toSeq)
val combos = QueryUtils.combinations(vals)
// Zip the ordered keys with the ordered values.
combos.map(keys.zip(_).toMap)
comboCache.get(keyToValues, _ => {
val entries = keyToValues.toSeq
val keys = entries.map(_._1)
val vals = entries.map(_._2.toSeq)
val combos = QueryUtils.combinations(vals)
// Zip the ordered keys with the ordered values.
combos.map(keys.zip(_).toMap)
})
}
}
9 changes: 8 additions & 1 deletion core/src/test/resources/application_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ filodb {
}
grpc {
partitions-deny-list = ""
}
}

single.cluster.cache {
enabled = true
# The maximum number of entries in the cache
cache-size = 2048
}

routing {
enable-remote-raw-exports = false
max-time-range-remote-raw-export = 30 minutes
Expand Down
Loading

0 comments on commit 7f008e7

Please sign in to comment.