Skip to content

Commit

Permalink
Approximately equal check only enabled in remote raw data export path
Browse files Browse the repository at this point in the history
  • Loading branch information
amolnayak311 committed Jan 4, 2024
1 parent 63ea481 commit 89c0008
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
val newPromQlParams = params.copy(promQl = LogicalPlanParser.convertToQuery(lp))
StitchRvsExec(qContext.copy(origQueryParams = newPromQlParams)
, inProcessPlanDispatcher, None,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]))
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]),
enableApproximatelyEqualCheck = true)
}
)
)
Expand Down
29 changes: 19 additions & 10 deletions query/src/main/scala/filodb/query/exec/StitchRvsExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ object StitchRvsExec {
private class RVCursorImpl(
vectors: Iterable[RangeVectorCursor],
outputRange: Option[RvRange],
enableApproximatelyEqualCheck: Boolean = true,
toleranceNumDecimal: Int = 10)
enableApproximatelyEqualCheck: Boolean,
toleranceNumDecimal: Int)
extends RangeVectorCursor {
private val weight = math.pow(10, toleranceNumDecimal)
private val bVectors = vectors.map(_.buffered)
Expand Down Expand Up @@ -81,10 +81,14 @@ object StitchRvsExec {
// The second condition checks if these values are equal within the tolerable limits and if yes, do not
// emit NaN.
// TODO: Make the second check and tolerance configurable?
if (minsWithoutNan.size == 1 ||
!enableApproximatelyEqualCheck || // Approximately equal check is disabled
minsWithoutNan.map(x => (x.getDouble(1) * weight).toLong / weight).toSet.size == 1)
minsWithoutNan.head else nanResult
if (minsWithoutNan.tail.isEmpty) {
minsWithoutNan.head
} else if (enableApproximatelyEqualCheck &&
minsWithoutNan.map(x => (x.getDouble(1) * weight).toLong / weight).toSet.size == 1) {
minsWithoutNan.head
} else {
nanResult
}
}
}

Expand All @@ -98,8 +102,10 @@ object StitchRvsExec {
IteratorBackedRangeVector(v1.key, rows, outputRvRange)
}

def merge(vectors: Iterable[RangeVectorCursor], outputRange: Option[RvRange]): RangeVectorCursor = {
new RVCursorImpl(vectors, outputRange)
def merge(vectors: Iterable[RangeVectorCursor], outputRange: Option[RvRange],
enableApproximatelyEqualCheck: Boolean = false,
toleranceNumDecimal: Int = 10): RangeVectorCursor = {
new RVCursorImpl(vectors, outputRange, enableApproximatelyEqualCheck, toleranceNumDecimal)
}
}

Expand All @@ -110,7 +116,9 @@ object StitchRvsExec {
final case class StitchRvsExec(queryContext: QueryContext,
dispatcher: PlanDispatcher,
outputRvRange: Option[RvRange],
children: Seq[ExecPlan]) extends NonLeafExecPlan {
children: Seq[ExecPlan],
enableApproximatelyEqualCheck: Boolean = false,
toleranceNumDecimal: Int = 10) extends NonLeafExecPlan {
require(children.nonEmpty)

outputRvRange match {
Expand All @@ -131,7 +139,8 @@ final case class StitchRvsExec(queryContext: QueryContext,
val stitched = childResponses.map(_._1.result).toListL.map(_.flatten).map { srvs =>
val groups = srvs.groupBy(_.key.labelValues)
groups.mapValues { toMerge =>
val rows = StitchRvsExec.merge(toMerge.map(_.rows()), outputRvRange)
val rows = StitchRvsExec.merge(toMerge.map(_.rows()), outputRvRange,
enableApproximatelyEqualCheck, toleranceNumDecimal)
val key = toMerge.head.key
IteratorBackedRangeVector(key, rows, outputRvRange)
}.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ class BinaryJoinExecSpec extends AnyFunSpec with Matchers with ScalaFutures {
"namespace".utf8 -> "aci-telemetry-prod1".utf8,
"res".utf8 -> "res-val".utf8))
result.head.rows().map(r => (r.getLong(0), r.getDouble(1).toString)).toList shouldEqual
List((4800,"4.0"), (4900,"4.0"), (5000,"4.0"), (5100,"4.0"), (5200,"4.0"), (5300,"4.0"), (5400,"4.0"),
(5500,"4.0"), (5600,"4.0"), (5700,"4.0"), (5800,"4.0"), (5900,"4.0"), (6000,"4.0"))
List((4800,"4.0"), (4900,"4.0"), (5000,"4.0"), (5100,"4.0"), (5200,"NaN"), (5300,"NaN"),
(5400,"NaN"), (5500,"NaN"), (5600,"NaN"), (5700,"4.0"), (5800,"4.0"), (5900,"4.0"), (6000,"4.0"))
}
}
// scalastyle:on number.of.methods

0 comments on commit 89c0008

Please sign in to comment.