From 89c00082eb955dc05a9f2bc858182896d62d7549 Mon Sep 17 00:00:00 2001 From: Amol Nayak Date: Wed, 3 Jan 2024 17:20:34 -0800 Subject: [PATCH] Approximately equal check only enabled in remote raw data export path --- .../queryplanner/MultiPartitionPlanner.scala | 3 +- .../filodb/query/exec/StitchRvsExec.scala | 29 ++++++++++++------- .../query/exec/BinaryJoinExecSpec.scala | 4 +-- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 65515e0da0..b0fa1fe7ad 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -143,7 +143,8 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider val newPromQlParams = params.copy(promQl = LogicalPlanParser.convertToQuery(lp)) StitchRvsExec(qContext.copy(origQueryParams = newPromQlParams) , inProcessPlanDispatcher, None, - execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec])) + execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]), + enableApproximatelyEqualCheck = true) } ) ) diff --git a/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala b/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala index d8e67799b5..88e960d42d 100644 --- a/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala +++ b/query/src/main/scala/filodb/query/exec/StitchRvsExec.scala @@ -15,8 +15,8 @@ object StitchRvsExec { private class RVCursorImpl( vectors: Iterable[RangeVectorCursor], outputRange: Option[RvRange], - enableApproximatelyEqualCheck: Boolean = true, - toleranceNumDecimal: Int = 10) + enableApproximatelyEqualCheck: Boolean, + toleranceNumDecimal: Int) extends RangeVectorCursor { private val weight = math.pow(10, toleranceNumDecimal) private val bVectors = vectors.map(_.buffered) @@ -81,10 +81,14 @@ object StitchRvsExec { // The second condition checks if these values are equal within the tolerable limits and if yes, do not // emit NaN. // TODO: Make the second check and tolerance configurable? - if (minsWithoutNan.size == 1 || - !enableApproximatelyEqualCheck || // Approximately equal check is disabled - minsWithoutNan.map(x => (x.getDouble(1) * weight).toLong / weight).toSet.size == 1) - minsWithoutNan.head else nanResult + if (minsWithoutNan.tail.isEmpty) { + minsWithoutNan.head + } else if (enableApproximatelyEqualCheck && + minsWithoutNan.map(x => (x.getDouble(1) * weight).toLong / weight).toSet.size == 1) { + minsWithoutNan.head + } else { + nanResult + } } } @@ -98,8 +102,10 @@ object StitchRvsExec { IteratorBackedRangeVector(v1.key, rows, outputRvRange) } - def merge(vectors: Iterable[RangeVectorCursor], outputRange: Option[RvRange]): RangeVectorCursor = { - new RVCursorImpl(vectors, outputRange) + def merge(vectors: Iterable[RangeVectorCursor], outputRange: Option[RvRange], + enableApproximatelyEqualCheck: Boolean = false, + toleranceNumDecimal: Int = 10): RangeVectorCursor = { + new RVCursorImpl(vectors, outputRange, enableApproximatelyEqualCheck, toleranceNumDecimal) } } @@ -110,7 +116,9 @@ object StitchRvsExec { final case class StitchRvsExec(queryContext: QueryContext, dispatcher: PlanDispatcher, outputRvRange: Option[RvRange], - children: Seq[ExecPlan]) extends NonLeafExecPlan { + children: Seq[ExecPlan], + enableApproximatelyEqualCheck: Boolean = false, + toleranceNumDecimal: Int = 10) extends NonLeafExecPlan { require(children.nonEmpty) outputRvRange match { @@ -131,7 +139,8 @@ final case class StitchRvsExec(queryContext: QueryContext, val stitched = childResponses.map(_._1.result).toListL.map(_.flatten).map { srvs => val groups = srvs.groupBy(_.key.labelValues) groups.mapValues { toMerge => - val rows = StitchRvsExec.merge(toMerge.map(_.rows()), outputRvRange) + val rows = StitchRvsExec.merge(toMerge.map(_.rows()), outputRvRange, + enableApproximatelyEqualCheck, toleranceNumDecimal) val key = toMerge.head.key IteratorBackedRangeVector(key, rows, outputRvRange) }.values diff --git a/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala b/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala index 4655c12578..578ec0da8c 100644 --- a/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/BinaryJoinExecSpec.scala @@ -687,8 +687,8 @@ class BinaryJoinExecSpec extends AnyFunSpec with Matchers with ScalaFutures { "namespace".utf8 -> "aci-telemetry-prod1".utf8, "res".utf8 -> "res-val".utf8)) result.head.rows().map(r => (r.getLong(0), r.getDouble(1).toString)).toList shouldEqual - List((4800,"4.0"), (4900,"4.0"), (5000,"4.0"), (5100,"4.0"), (5200,"4.0"), (5300,"4.0"), (5400,"4.0"), - (5500,"4.0"), (5600,"4.0"), (5700,"4.0"), (5800,"4.0"), (5900,"4.0"), (6000,"4.0")) + List((4800,"4.0"), (4900,"4.0"), (5000,"4.0"), (5100,"4.0"), (5200,"NaN"), (5300,"NaN"), + (5400,"NaN"), (5500,"NaN"), (5600,"NaN"), (5700,"4.0"), (5800,"4.0"), (5900,"4.0"), (6000,"4.0")) } } // scalastyle:on number.of.methods \ No newline at end of file