Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge integration to Main #1676

Merged
merged 47 commits into from
Sep 27, 2023
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7679af3
Revert "maint(core): upgrade lucene to 9.7.0 (#1617)" (#1622)
alextheimer Jul 11, 2023
80245ce
feat(core): Simplify configuration to scale Filodb horizontally (#1610)
vishramachandran Jul 12, 2023
90303aa
filodb(core) add debugging info for empty histogram. (#1613)
yu-shipit Jul 13, 2023
5b05779
fix(core) make the error message more frendly to users. (#1593)
yu-shipit Jul 13, 2023
a93666d
Revert "filodb(core) add debugging info for empty histogram. (#1613)"…
yu-shipit Jul 13, 2023
a37bf5f
Adding logging statement when warning is produced. (#1625)
kvpetrov Jul 14, 2023
8ecf630
perf(query): Remove boxed Double allocations from NaN checks during d…
vishramachandran Jul 17, 2023
f14a13c
fix(core) make the error message more frendly to users. (#1593) (#1630)
yu-shipit Jul 17, 2023
6ac0255
fix nullpointer happened in cardinality busting job. (#1631)
yu-shipit Jul 18, 2023
b9ea680
filodb(core) add debugging info for empty histogram. (#1624)
yu-shipit Jul 18, 2023
eebd5f4
fix(query): prevent list.head on empty list (#1632)
alextheimer Jul 20, 2023
6c1693a
maint(kafka): update consumer client id (#1633)
alextheimer Jul 24, 2023
5dadfb9
fix(query): prevent list.head on empty list (#1632)
alextheimer Jul 20, 2023
8929fb2
Merge pull request #1634 from alextheimer/cherry-pick
amolnayak311 Jul 24, 2023
59cae2a
fix(core): Consolidate num-nodes duplicate config (#1635)
vishramachandran Jul 24, 2023
ea1644b
Fix memory alloc config (#1638)
vishramachandran Jul 24, 2023
955814e
fix(query) Regex equals .* must ignore the label and match series eve…
amolnayak311 Jul 28, 2023
f5018ae
fix(core) fix the binary join aggregation across different partitions…
yu-shipit Jul 31, 2023
84a185f
feat(query): Cardinality V2 API Query Plan changes (#1637)
sandeep6189 Aug 2, 2023
c7e26a9
fix(query) Fix regression with regex match (#1640)
amolnayak311 Aug 4, 2023
dd59325
fix(query) support unary operators(+/-) (#1642)
yu-shipit Aug 4, 2023
d84c6c8
fix(core): Bug in calculating size of SerializedRangeVector (#1643)
vishramachandran Aug 9, 2023
38c682c
perf(core): ~Two times throughput improvement for Lucene queries with…
vishramachandran Aug 9, 2023
f3352e2
cherry-pick histogram debugging message and cardinality job nullptr f…
yu-shipit Aug 15, 2023
bfbeb36
filodb(core) add debugging info for empty histogram. (#1613) (#1649)
yu-shipit Aug 15, 2023
523999c
feat(query): Cardinality V2 API Query Plan changes (#1637)
sandeep6189 Aug 2, 2023
9ed8685
Merge pull request #1650 from amolnayak311/integration
amolnayak311 Aug 16, 2023
594ffce
fix(query): Adding user datasets for Cardinality V2 RemoteMetadataExe…
sandeep6189 Aug 17, 2023
89bd678
Fix MultiPartition Card Queries (#1652)
sandeep6189 Aug 18, 2023
bdcfde7
fix(query): Adding user datasets for Cardinality V2 RemoteMetadataExe…
sandeep6189 Aug 17, 2023
17fb077
Fix MultiPartition Card Queries (#1652)
sandeep6189 Aug 18, 2023
225617c
Merge pull request #1654 from sandeep6189/integration
amolnayak311 Aug 18, 2023
89095e2
feat(core): Add Query CPU Time for Index Lookups (#1655)
vishramachandran Aug 21, 2023
1e56ef9
fix(metering): Overriding the cluster name .passed to SingleClusterPl…
sandeep6189 Aug 23, 2023
c77a95a
fix(metering): Overriding the cluster name .passed to SingleClusterPl…
sandeep6189 Aug 23, 2023
710c3d2
misc(core): add downsample support for aggregated data (#1661)
sherali42 Aug 25, 2023
df3922d
misc(core): add downsample support for aggregated data (#1661)
sherali42 Aug 25, 2023
14115cf
cherry-pic misc(core): add downsample support for aggregated data (#1…
sherali42 Aug 30, 2023
6cb5433
maint(core): upgrade to Lucene 9.7.0 (#1662)
alextheimer Aug 30, 2023
7adc382
bug(query): Streaming query execution allocated too much mem via RB (…
vishramachandran Sep 1, 2023
bf8ead0
perf(card): Adding config support for DS card flushCount and perf log…
sandeep6189 Sep 11, 2023
50d28ea
fix(core) fix the binary join aggregation across different partitions…
yu-shipit Sep 13, 2023
f7d60ac
Merge branch 'develop' into integration
Sep 19, 2023
ba0023f
Merge pull request #1673 from yu-shipit/integration
yu-shipit Sep 19, 2023
78a33f6
Bump filodb version to 0.9.23.
Sep 19, 2023
b5a2e40
Merge pull request #1674 from yu-shipit/integration
yu-shipit Sep 19, 2023
4e97f53
Merge branch 'integration'
Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(core): Bug in calculating size of SerializedRangeVector (#1643)
There was a bug in calculating size of SRV.
Earlier, for efficiency purposes, we were calculating the size of the containers associated with the SRV.
But actually, the container can home multiple SRVs. So the calculated size for several SRVs at a time can end up wrong with addition of cumulative counts.

The fix for now is to calculate the size by going through the records. It introduces a small inefficiency here, but submitting this PR for now since other ways to calculate this were more invasive and risk regression. We can have an optimization of this if really needed later. I have also reduced the number of calls to this method from two to one.

The unit tests didn't catch this since earlier since they played with one SRV only.
I have now added a unit test that adds multiple SRVs. It failed with earlier code.
vishramachandran authored Aug 9, 2023
commit d84c6c8aecb7e7da4978c42e8933a36a9e588dc8
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import scala.reflect.ClassTag
import debox.Buffer

import filodb.memory.{BinaryRegionConsumer, BinaryRegionLarge}
import filodb.memory.format.{RowReader, UnsafeUtils}
import filodb.memory.format.UnsafeUtils

/**
* A RecordContainer is a binary, wire-compatible container for BinaryRecords V2.
@@ -72,13 +72,13 @@ final class RecordContainer(val base: Any, val offset: Long, maxLength: Int,
* Iterates through each BinaryRecord as a RowReader. Results in two allocations: the Iterator
* as well as a BinaryRecordRowReader.
*/
final def iterate(schema: RecordSchema): Iterator[RowReader] = new Iterator[RowReader] {
final def iterate(schema: RecordSchema): Iterator[BinaryRecordRowReader] = new Iterator[BinaryRecordRowReader] {
val reader = new BinaryRecordRowReader(schema, base)
val endOffset = offset + 4 + numBytes
var curOffset = offset + ContainerHeaderLen

final def hasNext: Boolean = curOffset < endOffset
final def next: RowReader = {
final def next: BinaryRecordRowReader = {
val recordLen = BinaryRegionLarge.numBytes(base, curOffset)
reader.recordOffset = curOffset
curOffset += (recordLen + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes
Original file line number Diff line number Diff line change
@@ -624,7 +624,12 @@ trait BinaryRecordRowReaderBase extends RowReader {

final class BinaryRecordRowReader(val schema: RecordSchema,
var recordBase: Any = UnsafeUtils.ZeroPointer,
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase {
def recordLength: Int = {
val len = BinaryRegionLarge.numBytes(recordBase, recordOffset)
(len + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes
}
}

final class MultiSchemaBRRowReader(var recordBase: Any = UnsafeUtils.ZeroPointer,
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase {
10 changes: 2 additions & 8 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala
Original file line number Diff line number Diff line change
@@ -934,14 +934,8 @@ class TimeSeriesShard(val ref: DatasetRef,
markPartAsNotIngesting(p, odp = false)
if (storeConfig.meteringEnabled) {
val shardKey = p.schema.partKeySchema.colValues(p.partKeyBase, p.partKeyOffset,
p.schema.options.shardKeyColumns)
val newCard = cardTracker.modifyCount(shardKey, 0, -1)
// TODO remove temporary debugging since we are seeing some negative counts
if (newCard.exists(_.value.activeTsCount < 0) && p.partID % 100 < 5)
// log for 5% of the cases to reduce log volume
logger.error(s"For some reason, activeTs count negative when updating card for " +
s"partKey: ${p.stringPartition} newCard: $newCard oldActivelyIngestingSize=$oldActivelyIngestingSize " +
s"newActivelyIngestingSize=${activelyIngesting.size}")
p.schema.options.shardKeyColumns)
cardTracker.modifyCount(shardKey, 0, -1)
}
}
}
15 changes: 7 additions & 8 deletions core/src/main/scala/filodb.core/query/RangeVector.scala
Original file line number Diff line number Diff line change
@@ -213,8 +213,8 @@ sealed trait ScalarSingleValue extends ScalarRangeVector {
else it
}

// Negligible bytes sent over-the-wire.
override def estimateSerializedRowBytes: Long = 0
// Negligible bytes sent over-the-wire. Don't bother calculating accurately.
override def estimateSerializedRowBytes: Long = SerializableRangeVector.SizeOfDouble
}

/**
@@ -399,7 +399,10 @@ final class SerializedRangeVector(val key: RangeVectorKey,
} else it
}

override def estimateSerializedRowBytes: Long = containers.map(_.numBytes).sum
override def estimateSerializedRowBytes: Long =
containers.toIterator.flatMap(_.iterate(schema))
.slice(startRecordNo, startRecordNo + numRowsSerialized)
.foldLeft(0)(_ + _.recordLength)

def containersIterator : Iterator[RecordContainer] = containers.toIterator

@@ -487,11 +490,7 @@ object SerializedRangeVector extends StrictLogging {
case None => builder.allContainers.toList
case Some(firstContainer) => builder.allContainers.dropWhile(_ != firstContainer)
}
val srv = new SerializedRangeVector(rv.key, numRows, containers, schema, startRecordNo, rv.outputRange)
val resultSize = srv.estimatedSerializedBytes
SerializedRangeVector.queryResultBytes.record(resultSize)
queryStats.getResultBytesCounter(Nil).addAndGet(resultSize)
srv
new SerializedRangeVector(rv.key, numRows, containers, schema, startRecordNo, rv.outputRange)
} finally {
queryStats.getCpuNanosCounter(Nil).addAndGet(Utils.currentThreadCpuTimeNanos - startNs)
}
Original file line number Diff line number Diff line change
@@ -51,9 +51,9 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers {
val queryStats = QueryStats()
val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats)
queryStats.getCpuNanosCounter(Nil).get() > 0 shouldEqual true
queryStats.getResultBytesCounter(Nil).get() shouldEqual 108
srv.numRows shouldEqual Some(11)
srv.numRowsSerialized shouldEqual 4
srv.estimateSerializedRowBytes shouldEqual 80 // 4 non nan records each of 20 bytes
val res = srv.rows.map(r => (r.getLong(0), r.getDouble(1))).toList
res.length shouldEqual 11
res.map(_._1) shouldEqual (0 to 1000 by 100)
@@ -77,9 +77,9 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers {
val queryStats = QueryStats()
val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats)
queryStats.getCpuNanosCounter(Nil).get() > 0 shouldEqual true
queryStats.getResultBytesCounter(Nil).get() shouldEqual 248
srv.numRows shouldEqual Some(11)
srv.numRowsSerialized shouldEqual 11
srv.estimateSerializedRowBytes shouldEqual 220
val res = srv.rows.map(r => (r.getLong(0), r.getDouble(1))).toList
res.length shouldEqual 11
res.map(_._1) shouldEqual (0 to 1000 by 100)
@@ -105,7 +105,6 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers {
val queryStats = QueryStats()
val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats)
queryStats.getCpuNanosCounter(Nil).get() > 0 shouldEqual true
queryStats.getResultBytesCounter(Nil).get() shouldEqual 188
srv.numRows shouldEqual Some(11)
srv.numRowsSerialized shouldEqual 4
val res = srv.rows.map(r => (r.getLong(0), r.getHistogram(1))).toList
@@ -114,4 +113,26 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers {
res.map(_._2).filterNot(_.isEmpty) shouldEqual Seq(h1, h1, h1, h1)
}

it("should calculate estimateSerializedRowBytes correctly when builder is used for several SRVs") {
val builder = SerializedRangeVector.newBuilder()
val recSchema = new RecordSchema(Seq(ColumnInfo("time", ColumnType.TimestampColumn),
ColumnInfo("value", ColumnType.DoubleColumn)))
val keysMap = Map(UTF8Str("key1") -> UTF8Str("val1"),
UTF8Str("key2") -> UTF8Str("val2"))
val key = CustomRangeVectorKey(keysMap)

(0 to 200).foreach { i =>
val rv = toRv(Seq((0, Double.NaN), (100, 1.0), (200, Double.NaN),
(300, 3.0), (400, Double.NaN),
(500, 5.0), (600, 6.0),
(700, Double.NaN), (800, Double.NaN),
(900, Double.NaN), (1000, Double.NaN)), key,
RvRange(1000, 100, 1000))
val queryStats = QueryStats()
val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats)
srv.numRowsSerialized shouldEqual 11
srv.estimateSerializedRowBytes shouldEqual 220
}
}

}
10 changes: 8 additions & 2 deletions query/src/main/scala/filodb/query/exec/ExecPlan.scala
Original file line number Diff line number Diff line change
@@ -251,7 +251,9 @@ trait ExecPlan extends QueryCommand {
// fail the query instead of limiting range vectors and returning incomplete/inaccurate results
numResultSamples += srv.numRowsSerialized
checkSamplesLimit(numResultSamples, querySession.warnings)
resultSize += srv.estimatedSerializedBytes
val srvBytes = srv.estimatedSerializedBytes
resultSize += srvBytes
querySession.queryStats.getResultBytesCounter(Nil).addAndGet(srvBytes)
checkResultBytes(resultSize, querySession.queryConfig, querySession.warnings)
srv
}
@@ -263,6 +265,7 @@ trait ExecPlan extends QueryCommand {
MeasurementUnit.time.milliseconds)
.withTag("plan", getClass.getSimpleName)
.record(Math.max(0, System.currentTimeMillis - startExecute))
SerializedRangeVector.queryResultBytes.record(resultSize)
// recording and adding step1 to queryStats at the end of execution since the grouping
// for stats is not formed yet at the beginning
querySession.queryStats.getCpuNanosCounter(Nil).getAndAdd(step1CpuTime)
@@ -461,14 +464,17 @@ trait ExecPlan extends QueryCommand {
// fail the query instead of limiting range vectors and returning incomplete/inaccurate results
numResultSamples += srv.numRowsSerialized
checkSamplesLimit(numResultSamples, querySession.warnings)
resultSize += srv.estimatedSerializedBytes
val srvBytes = srv.estimatedSerializedBytes
resultSize += srvBytes
querySession.queryStats.getResultBytesCounter(Nil).addAndGet(srvBytes)
checkResultBytes(resultSize, querySession.queryConfig, querySession.warnings)
srv
}
.filter(_.numRowsSerialized > 0)
.guarantee(Task.eval(span.mark("after-last-materialized-result-rv")))
.toListL
.map { r =>
SerializedRangeVector.queryResultBytes.record(resultSize)
Kamon.histogram("query-execute-time-elapsed-step2-result-materialized",
MeasurementUnit.time.milliseconds)
.withTag("plan", getClass.getSimpleName)
Original file line number Diff line number Diff line change
@@ -158,12 +158,10 @@ class TopBottomKRowAggregator(k: Int, bottomK: Boolean) extends RowAggregator wi
resRvs.map { case (key, builder) =>
val numRows = builder.allContainers.map(_.countRecords()).sum
logger.debug(s"TopkPresent before creating SRV key = ${key.labelValues.mkString(",")}")
val srv = new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0,
new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0,
Some(RvRange(rangeParams.startSecs * 1000,
rangeParams.stepSecs * 1000,
rangeParams.endSecs * 1000)))
queryStats.getResultBytesCounter(Nil).getAndAdd(srv.estimatedSerializedBytes)
srv
}.toSeq
} finally {
queryStats.getCpuNanosCounter(Nil).getAndAdd(Utils.currentThreadCpuTimeNanos - startNs)
Original file line number Diff line number Diff line change
@@ -138,13 +138,18 @@ class MultiSchemaPartitionsExecSpec extends AnyFunSpec with Matchers with ScalaF
val execPlan = MultiSchemaPartitionsExec(QueryContext(), dummyDispatcher,
dsRef, 0, filters, TimeRangeChunkScan(startTime, endTime), "_metric_")

querySession.queryStats.clear() // so this can be run as a standalone test
val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue
val result = resp.asInstanceOf[QueryResult]
result.result.size shouldEqual 1
val dataRead = result.result(0).rows.map(r=>(r.getLong(0), r.getDouble(1))).toList
dataRead shouldEqual tuples.take(11)
val partKeyRead = result.result(0).key.labelValues.map(lv => (lv._1.asNewString, lv._2.asNewString))
partKeyRead shouldEqual partKeyKVWithMetric
querySession.queryStats.getResultBytesCounter().get() shouldEqual 297
querySession.queryStats.getCpuNanosCounter().get() > 0 shouldEqual true
querySession.queryStats.getDataBytesScannedCounter().get() shouldEqual 48
querySession.queryStats.getTimeSeriesScannedCounter().get() shouldEqual 1
}

it("should get empty schema if query returns no results") {
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ class PromQLGrpcRemoteExecSpec extends AnyFunSpec with Matchers with ScalaFuture
deserializedSrv.numRowsSerialized shouldEqual 4
val res = deserializedSrv.rows.map(r => (r.getLong(0), r.getDouble(1))).toList
deserializedSrv.key shouldEqual rvKey
qr.queryStats.getResultBytesCounter(List()).get()shouldEqual 108
// queryStats ResultBytes counter increment is not done as part of SRV constructor, so skipping that assertion
(qr.queryStats.getCpuNanosCounter(List()).get() > 0) shouldEqual true
res.length shouldEqual 11
res.map(_._1) shouldEqual (0 to 1000 by 100)