From 776ce91100123dea3e460cb83c75c1ee8dbb7e93 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Wed, 6 Nov 2024 21:19:37 -0800 Subject: [PATCH] fix(core): Auto-adjust lookback for downsampled data when querying rate --- .../memstore/TimeSeriesPartition.scala | 4 +- .../query/exec/PeriodicSamplesMapper.scala | 42 ++++++++++++------- .../rangefn/PeriodicRateFunctionsSpec.scala | 38 +++++++++++++++-- 3 files changed, 62 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala index 819ea0e253..d2b4cf7e99 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala @@ -74,9 +74,7 @@ extends ChunkMap(initMapSize) with ReadablePartition { def partKeyBase: Array[Byte] = UnsafeUtils.ZeroPointer.asInstanceOf[Array[Byte]] def partKeyOffset: Long = partitionKey - def publishInterval: Option[Long] = { - publishIntervalFinder.findPublishIntervalMs(schema.partition.hash, UnsafeUtils.ZeroArray, partitionKey) - } + def publishInterval: Option[Long] = None /** * Incoming, unencoded data gets appended to these BinaryAppendableVectors. * There is one element for each column of the schema. All of them have the same chunkId. diff --git a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala index 7ae37bfd56..7ec7355273 100644 --- a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala +++ b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala @@ -6,6 +6,7 @@ import org.jctools.queues.SpscUnboundedArrayQueue import filodb.core.GlobalConfig.systemConfig import filodb.core.metadata.Column.ColumnType +import filodb.core.metadata.Schemas import filodb.core.query._ import filodb.core.store.WindowedChunkIterator import filodb.memory.format._ @@ -165,23 +166,32 @@ final case class PeriodicSamplesMapper(startMs: Long, //scalastyle:on method.length /** - * If a counter function is used (increase or rate) along with a step multiple notation, - * the idea is to extend lookback by one publish interval so that the increase between - * adjacent lookback windows is also accounted for. This error would be especially - * pronounced if [1i] notation was used and step == lookback. + * If a rate function is used in the downsample dataset where publish interval is known, + * and the requested window length is less than 2*publishInterval then extend lookback automatically + * so we don't return empty result for rate. + * Typically, you don't modify the query parameters, but this is a special case only for the downsample + * dataset. We do this only for rate since it is already a per-second normalized value. Doing this for increase + * will lead to incorrect results. */ - private def extendLookback(rv: RangeVector, window: Long): Long = { - window - // TODO There is a code path is used by Histogram bucket extraction path where - // underlying vector need not be a RawDataRangeVector. For those cases, we may - // not able to reliably extend lookback. - // Much more thought and work needed - so punting the bug -// val pubInt = rv match { -// case rvrd: RawDataRangeVector if (functionId.exists(_.onCumulCounter) && stepMultipleNotationUsed) -// => rvrd.publishInterval.getOrElse(0L) -// case _ => 0L -// } -// window + pubInt + private[exec] def extendLookback(rv: RangeVector, window: Long): Long = { + if (functionId.contains(InternalRangeFunction.Rate)) { // only extend for rate function + val pubInt = rv match { + case rvrd: RawDataRangeVector => + if (rvrd.partition.schema == Schemas.promCounter || + rvrd.partition.schema == Schemas.promHistogram || + rvrd.partition.schema == Schemas.otelCumulativeHistogram || + rvrd.partition.schema == Schemas.otelCumulativeHistogram) // only extend for cumulative schemas + rvrd.partition.publishInterval.getOrElse(0L) + else 0L + case _ => 0L + } + if (window < 2 * pubInt) 2 * pubInt // 2 * publish interval since we want 2 samples for rate + else window + } else { + window + } + // TODO consider returning an error when known publish interval is less than 2*windowLength for increase function + // instead of silently returning empty rate (and implicitly zero for sum functions) for the window } // Transform source double or long to double schema diff --git a/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala b/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala index bae64fba1b..20109e4b2e 100644 --- a/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala +++ b/query/src/test/scala/filodb/query/exec/rangefn/PeriodicRateFunctionsSpec.scala @@ -2,19 +2,22 @@ package filodb.query.exec.rangefn import filodb.core.{MachineMetricsData, TestData} import filodb.core.binaryrecord2.RecordBuilder -import filodb.core.memstore.WriteBufferPool +import filodb.core.memstore.{PagedReadablePartition, WriteBufferPool} import filodb.core.metadata.{Dataset, Schemas} -import filodb.core.query.{QueryContext, ResultSchema, TransientRow} +import filodb.core.query.{CustomRangeVectorKey, QueryContext, RawDataRangeVector, ResultSchema, TransientRow} +import filodb.core.store.{InMemoryChunkScan, RawPartData} import filodb.memory.format.vectors.{LongHistogram, MutableHistogram} import filodb.memory.format.ZeroCopyUTF8String import filodb.query.exec.{ChunkedWindowIteratorD, ChunkedWindowIteratorH, QueueBasedWindow} import filodb.query.exec -import filodb.query.exec.InternalRangeFunction.Increase +import filodb.query.exec.InternalRangeFunction.{Increase, Rate} import filodb.query.util.IndexedArrayQueue import monix.execution.Scheduler.Implicits.global import monix.reactive.Observable import org.scalatest.concurrent.ScalaFutures +import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.duration._ import scala.util.Random class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures { @@ -187,6 +190,35 @@ class PeriodicRateFunctionsSpec extends RawDataWindowingSpec with ScalaFutures { it.next.getDouble(1) shouldEqual expectedDelta } + it("should extend lookback window for rate function when lookback < 2 * publishInterval if it exists") { + val publishInterval = 1.minute.toMillis.toInt + val rawData = RawPartData(Array.empty, Seq.empty) + // simulate downsampled partition with resolution set as publishInterval + val p1 = new PagedReadablePartition(Schemas.promCounter, 0, -1, rawData, publishInterval) + val rv = RawDataRangeVector(CustomRangeVectorKey(Map.empty), p1, InMemoryChunkScan, Array(), new AtomicLong(0), 0, "") + + // looback is extended to 2 * publishInterval when less than 2 * publishInterval + val periodicSamplesVectorFnMapper = exec.PeriodicSamplesMapper(500000L, 400000L, 1300000L, + Some(publishInterval), Some(Rate), QueryContext(), stepMultipleNotationUsed = false, Nil, None) + periodicSamplesVectorFnMapper.extendLookback(rv, 3) shouldEqual 2 * publishInterval + + // lookback is not extended when lookback >= 2 * publishInterval + val periodicSamplesVectorFnMapper2 = exec.PeriodicSamplesMapper(500000L, 400000L, 1300000L, + Some(publishInterval), Some(Rate), QueryContext(), stepMultipleNotationUsed = false, Nil, None) + periodicSamplesVectorFnMapper2.extendLookback(rv, publishInterval * 2 + 1) shouldEqual (2 * publishInterval + 1) + + // lookback is not extended for increase function + val periodicSamplesVectorFnMapper3 = exec.PeriodicSamplesMapper(500000L, 400000L, 1300000L, + Some(publishInterval), Some(Increase), QueryContext(), stepMultipleNotationUsed = false, Nil, None) + periodicSamplesVectorFnMapper3.extendLookback(rv, 3) shouldEqual 3 + + // lookback is not extended for delta schemas + val p2 = new PagedReadablePartition(Schemas.otelDeltaHistogram, 0, -1, rawData, publishInterval) + val rv2 = RawDataRangeVector(CustomRangeVectorKey(Map.empty), p2, InMemoryChunkScan, Array(), new AtomicLong(0), 0, "") + periodicSamplesVectorFnMapper3.extendLookback(rv, 3) shouldEqual 3 + + } + it("appropriate increase function is used for prom-counter type") { val samples = Seq( 100000L -> 100d,