diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala index 4900c35b63..6f028fb5d5 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala @@ -422,9 +422,9 @@ object LogicalPlanUtils extends StrictLogging { * - all plan RawSeries share the same target-schema columns. * - no target-schema definition changes during the query. */ - private def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan, - targetSchemaProvider: TargetSchemaProvider, - getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]) + def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan, + targetSchemaProvider: TargetSchemaProvider, + getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]) : Option[Seq[String]] = { // compose a stream of Options for each RawSeries-- // the options contain a target-schema iff it is defined and unchanging. diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanUtilsSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanUtilsSpec.scala new file mode 100644 index 0000000000..fa013144db --- /dev/null +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanUtilsSpec.scala @@ -0,0 +1,61 @@ +package filodb.coordinator.queryplanner + +import filodb.coordinator.client.QueryCommands.FunctionalTargetSchemaProvider +import filodb.core.TargetSchemaChange +import filodb.core.query.ColumnFilter +import filodb.core.query.Filter.Equals +import filodb.prometheus.ast.TimeStepParams +import filodb.prometheus.parse.Parser +import filodb.query.LogicalPlan +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +class LogicalPlanUtilsSpec extends AnyFunSpec with Matchers { + it ("should correctly determine whether-or-not plan has same/unchanging target-schema columns") { + val timeParamsSec = TimeStepParams(1000, 10, 10000) + val timeParamsMs = TimeStepParams( + 1000 * timeParamsSec.start, + 1000 * timeParamsSec.step, + 1000 * timeParamsSec.end) + val query = """foo{operand="lhs"} + bar{operand="rhs"}""" + + val getResult = (tschemaProviderFunc: Seq[ColumnFilter] => Seq[TargetSchemaChange]) => { + val tschemaProvider = FunctionalTargetSchemaProvider(tschemaProviderFunc) + val lp = Parser.queryRangeToLogicalPlan(query, timeParamsSec) + LogicalPlanUtils.sameRawSeriesTargetSchemaColumns(lp, tschemaProvider, LogicalPlan.getRawSeriesFilters) + } + + val unchangingSingle = (colFilters: Seq[ColumnFilter]) => { + Seq(TargetSchemaChange(0, Seq("hello"))) + } + getResult(unchangingSingle) shouldEqual Some(Seq("hello")) + + val unchangingMultiple = (colFilters: Seq[ColumnFilter]) => { + Seq(TargetSchemaChange(0, Seq("hello", "goodbye"))) + } + getResult(unchangingMultiple) shouldEqual Some(Seq("hello", "goodbye")) + + val changingSingle = (colFilters: Seq[ColumnFilter]) => { + Seq(TargetSchemaChange(timeParamsMs.start + timeParamsMs.step, Seq("hello"))) + } + getResult(changingSingle) shouldEqual None + + val oneChanges = (colFilters: Seq[ColumnFilter]) => { + if (colFilters.contains(ColumnFilter("operand", Equals("lhs")))) { + Seq(TargetSchemaChange(0, Seq("hello"))) + } else { + Seq(TargetSchemaChange(timeParamsMs.start + timeParamsMs.step, Seq("hello"))) + } + } + getResult(oneChanges) shouldEqual None + + val differentCols = (colFilters: Seq[ColumnFilter]) => { + if (colFilters.contains(ColumnFilter("operand", Equals("lhs")))) { + Seq(TargetSchemaChange(0, Seq("hello"))) + } else { + Seq(TargetSchemaChange(0, Seq("goodbye"))) + } + } + getResult(differentCols) shouldEqual None + } +}