Skip to content

Commit

Permalink
perf(query) Eliminate the allocation of memory for RepeatValueVector
Browse files Browse the repository at this point in the history
  • Loading branch information
amolnayak311 committed Nov 5, 2024
1 parent 7f008e7 commit 1666ba1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
49 changes: 26 additions & 23 deletions core/src/main/scala/filodb.core/query/RangeVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package filodb.core.query

import java.time.{LocalDateTime, YearMonth, ZoneOffset}
import java.util.concurrent.atomic.AtomicLong

import com.typesafe.scalalogging.StrictLogging
import debox.Buffer
import kamon.Kamon
import kamon.metric.MeasurementUnit
import org.joda.time.DateTime

import filodb.core.binaryrecord2.{MapItemConsumer, RecordBuilder, RecordContainer, RecordSchema}
import filodb.core.metadata.Column
import filodb.core.metadata.Column.ColumnType._
Expand All @@ -18,6 +16,8 @@ import filodb.memory.data.ChunkMap
import filodb.memory.format.{RowReader, ZeroCopyUTF8String => UTF8Str}
import filodb.memory.format.vectors.Histogram

import scala.util.Using

/**
* Identifier for a single RangeVector.
* Sub-classes must be a case class or override equals/hashcode since this class is used in a
Expand Down Expand Up @@ -213,12 +213,6 @@ final class RepeatValueVector(rangeVectorKey: RangeVectorKey,
override def outputRange: Option[RvRange] = Some(RvRange(startMs, stepMs, endMs))
override val numRows: Option[Int] = Some((endMs - startMs) / math.max(1, stepMs) + 1).map(_.toInt)

lazy val containers: Seq[RecordContainer] = {
val builder = new RecordBuilder(MemFactory.onHeapFactory, RecordBuilder.MinContainerSize)
rowReader.map(builder.addFromReader(_, schema, 0))
builder.allContainers.toList
}

val recordSchema: RecordSchema = schema

// There is potential for optimization.
Expand Down Expand Up @@ -260,7 +254,23 @@ final class RepeatValueVector(rangeVectorKey: RangeVectorKey,
/**
* Estimates the total size (in bytes) of all rows after serialization.
*/
override def estimateSerializedRowBytes: Long = containers.size
override def estimateSerializedRowBytes: Long =
this.schema.columns.zipWithIndex.map { case (col, idx) =>
col.colType match {
case DoubleColumn => SerializableRangeVector.SizeOfDouble
case LongColumn => SerializableRangeVector.SizeOfLong
case IntColumn => SerializableRangeVector.SizeOfInt
case StringColumn => this.rowReader.map(rr => rr.getString(idx).length).getOrElse(0)
case TimestampColumn => SerializableRangeVector.SizeOfLong
case BinaryRecordColumn => this.rowReader.map(rr => rr.getBlobNumBytes(idx)).getOrElse(0)
// We will take the worst case where histogram has buckets, each bucket has 2 doubles, one for the bucket
// itself and one for the bin count. We will have 4 more columns for sum, total, min and max,
case HistogramColumn => this.rowReader.map(
rr => rr.getHistogram(idx).numBuckets * SerializableRangeVector.SizeOfDouble * 2
+ SerializableRangeVector.SizeOfDouble * 4).getOrElse(0)
case MapColumn => 0 // Not supported yet
}
}.sum
}

object RepeatValueVector extends StrictLogging {
Expand All @@ -272,21 +282,14 @@ object RepeatValueVector extends StrictLogging {
queryStats: QueryStats): RepeatValueVector = {
val startNs = Utils.currentThreadCpuTimeNanos
try {
var nextRow: Option[RowReader] = None
try {
ChunkMap.validateNoSharedLocks(execPlan)
val rows = rv.rows()
if (rows.hasNext) {
nextRow = Some(rows.next())
}
} finally {
rv.rows().close()
// clear exec plan
// When the query is done, clean up lingering shared locks caused by iterator limit.
ChunkMap.releaseAllSharedLocks()
}
new RepeatValueVector(rv.key, startMs, stepMs, endMs, nextRow, schema)
ChunkMap.validateNoSharedLocks(execPlan)
Using(rv.rows()){
rows =>
val nextRow = if (rows.hasNext) Some(rows.next()) else None
new RepeatValueVector(rv.key, startMs, stepMs, endMs, nextRow, schema)
}.get
} finally {
ChunkMap.releaseAllSharedLocks()
queryStats.getCpuNanosCounter(Nil).addAndGet(Utils.currentThreadCpuTimeNanos - startNs)
}
}
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/range_vector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ message SerializedRangeVector {

message RepeatValueVector {
RangeVectorKey key = 1;
// Record containers are kept for backward compatibility and wont be supported in subsequent versions
repeated bytes recordContainers = 2;
RecordSchema recordSchema = 3;
optional RvRange rvRange = 4;
Expand Down
4 changes: 0 additions & 4 deletions query/src/main/scala/filodb/query/ProtoConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -927,10 +927,6 @@ object ProtoConverters {
rv.outputRange.map(_.toProto).map(builder.setRvRange)
builder.setRecordSchema(rv.recordSchema.toProto)
builder.setKey(rv.key.toProto)
builder.addAllRecordContainers(rv.containers.map(
container => ByteString.copyFrom(
if (container.hasArray) container.array else container.trimmedArray)
).asJava)
builder.build()
}
}
Expand Down

0 comments on commit 1666ba1

Please sign in to comment.