Skip to content

Commit

Permalink
more merge resolution stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Jan 25, 2024
1 parent 294beaf commit 9703fea
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ object LogicalPlanUtils extends StrictLogging {
val resolvedFilters: Seq[Seq[ColumnFilter]] = filters.map { filter =>
filter.filter match {
// Take care of pipe-joined values here -- create one Equals filter per value.
case EqualsRegex(values: String) if QueryUtils.isPipeOnlyRegex(values) =>
case EqualsRegex(values: String) if QueryUtils.containsPipeOnlyRegex(values) =>
QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value)))
case _ => Seq(filter)
}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package filodb.coordinator.queryplanner

import java.util.concurrent.ConcurrentHashMap

import scala.collection.concurrent.{Map => ConcurrentMap}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import com.typesafe.scalalogging.StrictLogging
import io.grpc.ManagedChannel

import filodb.coordinator.queryplanner.LogicalPlanUtils._
import filodb.coordinator.queryplanner.PlannerUtil.rewritePlanWithRemoteRawExport
import filodb.core.metadata.{Dataset, DatasetOptions, Schemas}
import filodb.core.query.{ColumnFilter, PromQlQueryParams, QueryConfig, QueryContext, RvRange}
import filodb.core.query.{ColumnFilter, PromQlQueryParams, QueryConfig, QueryContext, RangeParams, RvRange}
import filodb.grpc.GrpcCommonUtils
import filodb.query._
import filodb.query.LogicalPlan._
Expand Down Expand Up @@ -261,13 +258,12 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv

private def getRoutingKeys(logicalPlan: LogicalPlan) = {
LogicalPlan.getNonMetricShardKeyFilters(logicalPlan, dataset.options.nonMetricShardColumns)
.flatMap(LogicalPlanUtils.resolveShardKeyFilters(_, shardKeyMatcher))
}

private def generateRemoteExecParams(queryContext: QueryContext, startMs: Long, endMs: Long) = {
val queryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
queryContext.copy(
origQueryParams = queryParams.copy(promQl = promQl, startSecs = startMs / 1000, endSecs = endMs / 1000),
origQueryParams = queryParams.copy(startSecs = startMs / 1000, endSecs = endMs / 1000),
plannerParams = queryContext.plannerParams.copy(processMultiPartition = false))
}

Expand Down Expand Up @@ -435,11 +431,11 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// TODO: Move to config later
val tailRanges = filteredAssignments.tail.takeWhile(_.timeRange.startMs < queryRange.endMs).map { assign =>
val startMs = if (stepMsOpt.nonEmpty) {
snapToStep(timestamp = part.timeRange.startMs + lookbackMs + offsetMs,
snapToStep(timestamp = assign.timeRange.startMs + lookbackMs + offsetMs,
step = stepMsOpt.get,
origin = range.startMs)
origin = queryRange.startMs)
} else {
part.timeRange.startMs + lookbackMs + offsetMs
assign.timeRange.startMs + lookbackMs + offsetMs
}
val endMs = math.min(queryRange.endMs, assign.timeRange.endMs + offsetMs)
val periodOfUncertaintyMs = if (queryConfig.routingConfig.supportRemoteRawExport)
Expand Down Expand Up @@ -848,7 +844,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
private def createMetadataRemoteExec(qContext: QueryContext, partitionAssignment: PartitionAssignment,
urlParams: Map[String, String]) = {
val finalQueryContext = generateRemoteExecParams(
qContext, "<metadata>", partitionAssignment.timeRange.startMs, partitionAssignment.timeRange.endMs)
qContext, partitionAssignment.timeRange.startMs, partitionAssignment.timeRange.endMs)
val httpEndpoint = partitionAssignment.httpEndPoint +
finalQueryContext.origQueryParams.asInstanceOf[PromQlQueryParams].remoteQueryPath.getOrElse("")
MetadataRemoteExec(httpEndpoint, remoteHttpTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package filodb.coordinator.queryplanner

import filodb.coordinator.queryplanner.LogicalPlanUtils.getLookBackMillis
import filodb.core.metadata.Dataset
import filodb.core.query.{ColumnFilter, PromQlQueryParams, QueryUtils}
import filodb.core.query.{PromQlQueryParams, QueryUtils}
import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.query.LogicalPlan
import filodb.query.exec.{ExecPlan, PromQlRemoteExec}
Expand Down Expand Up @@ -41,7 +41,7 @@ abstract class PartitionLocationPlanner(dataset: Dataset,
val values = filter.filter match {
case Equals(value) => Seq(value.toString)
// Split '|'-joined values if pipes are the only regex chars used.
case EqualsRegex(value: String) if QueryUtils.isPipeOnlyRegex(value) =>
case EqualsRegex(value: String) if QueryUtils.containsPipeOnlyRegex(value) =>
QueryUtils.splitAtUnescapedPipes(value)
case _ => throw new IllegalArgumentException(
s"""shard keys must be filtered by equality or "|"-only regex. filter=${filter}""")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class SingleClusterPlanner(val dataset: Dataset,
case Some(ColumnFilter(_, Filter.Equals(filtVal: String))) =>
Seq(filtVal)
case Some(ColumnFilter(_, Filter.EqualsRegex(filtVal: String)))
if QueryUtils.isPipeOnlyRegex(filtVal) => QueryUtils.splitAtUnescapedPipes(filtVal)
if QueryUtils.containsPipeOnlyRegex(filtVal) => QueryUtils.splitAtUnescapedPipes(filtVal)
case Some(ColumnFilter(_, filter)) =>
throw new BadQueryException(s"Found filter for shard column $shardCol but " +
s"$filter cannot be used for shard key routing")
Expand Down Expand Up @@ -858,7 +858,7 @@ class SingleClusterPlanner(val dataset: Dataset,
case Some(ColumnFilter(_, Filter.Equals(_: String))) => true
case Some(ColumnFilter(_, Filter.EqualsRegex(value: String))) =>
// Make sure no regex chars except the pipe, which can be used to concatenate values.
QueryUtils.isPipeOnlyRegex(value)
QueryUtils.containsPipeOnlyRegex(value)
case _ => false
}
}
Expand Down

0 comments on commit 9703fea

Please sign in to comment.