diff --git a/cassandra/src/test/scala/filodb.cassandra/MemstoreCassandraSinkSpec.scala b/cassandra/src/test/scala/filodb.cassandra/MemstoreCassandraSinkSpec.scala index b23bb1623a..f11dffa5d9 100644 --- a/cassandra/src/test/scala/filodb.cassandra/MemstoreCassandraSinkSpec.scala +++ b/cassandra/src/test/scala/filodb.cassandra/MemstoreCassandraSinkSpec.scala @@ -34,7 +34,7 @@ class MemstoreCassandraSinkSpec extends AllTablesTest { } it("should flush MemStore data to C*, and be able to read back data from C* directly") { - memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf) + memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf, 1) memStore.store.sinkStats.chunksetsWritten.get shouldEqual 0 // Flush every ~50 records diff --git a/cassandra/src/test/scala/filodb.cassandra/columnstore/OdpSpec.scala b/cassandra/src/test/scala/filodb.cassandra/columnstore/OdpSpec.scala index 4216d83486..9891688269 100644 --- a/cassandra/src/test/scala/filodb.cassandra/columnstore/OdpSpec.scala +++ b/cassandra/src/test/scala/filodb.cassandra/columnstore/OdpSpec.scala @@ -96,7 +96,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala val policy = new FixedMaxPartitionsEvictionPolicy(20) val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy)) try { - memStore.setup(dataset.ref, schemas, 0, TestData.storeConf) + memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1) memStore.recoverIndex(dataset.ref, 0).futureValue memStore.refreshIndexForTesting(dataset.ref) @@ -112,7 +112,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala val policy = new FixedMaxPartitionsEvictionPolicy(20) val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy)) try { - memStore.setup(dataset.ref, schemas, 0, TestData.storeConf) + memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1) memStore.recoverIndex(dataset.ref, 0).futureValue memStore.refreshIndexForTesting(dataset.ref) @@ -134,7 +134,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala val policy = new FixedMaxPartitionsEvictionPolicy(20) val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy)) try { - memStore.setup(dataset.ref, schemas, 0, TestData.storeConf) + memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1) memStore.recoverIndex(dataset.ref, 0).futureValue memStore.refreshIndexForTesting(dataset.ref) @@ -157,7 +157,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala val policy = new FixedMaxPartitionsEvictionPolicy(20) val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy)) try { - memStore.setup(dataset.ref, schemas, 0, TestData.storeConf) + memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1) memStore.recoverIndex(dataset.ref, 0).futureValue memStore.refreshIndexForTesting(dataset.ref) diff --git a/conf/promperf-filodb-server.conf b/conf/promperf-filodb-server.conf index 9359b8d366..15de43e558 100644 --- a/conf/promperf-filodb-server.conf +++ b/conf/promperf-filodb-server.conf @@ -5,8 +5,8 @@ filodb { "conf/promperf-source.conf" ] + min-num-nodes-in-cluster = 1 cluster-discovery { - num-nodes = 1 failure-detection-interval = 20s host-list = [ "127.0.0.1:2552" diff --git a/conf/timeseries-dev-source.conf b/conf/timeseries-dev-source.conf index ea1a09d11d..da732b47ed 100644 --- a/conf/timeseries-dev-source.conf +++ b/conf/timeseries-dev-source.conf @@ -6,7 +6,10 @@ # Should not change once dataset has been set up on the server and data has been persisted to cassandra num-shards = 4 + # deprecated in favor of min-num-nodes-in-cluster config in filodb server config + # To be removed eventually. There is no reason to set a value for this for each dataset min-num-nodes = 2 + # Length of chunks to be written, roughly sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory" diff --git a/conf/timeseries-filodb-server.conf b/conf/timeseries-filodb-server.conf index c36293bd30..c2ce99a7fa 100644 --- a/conf/timeseries-filodb-server.conf +++ b/conf/timeseries-filodb-server.conf @@ -1,9 +1,9 @@ dataset-prometheus = { include required("timeseries-dev-source.conf") } filodb { + min-num-nodes-in-cluster = 2 v2-cluster-enabled = false cluster-discovery { - num-nodes = 2 failure-detection-interval = 20s host-list = [ "127.0.0.1:2552", diff --git a/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala b/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala index e543ccf8b8..4b411b61ac 100755 --- a/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala +++ b/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala @@ -48,13 +48,13 @@ final class FilodbSettings(val conf: Config) { lazy val datasetConfPaths = config.as[Seq[String]]("dataset-configs") - lazy val numNodes = config.getInt("cluster-discovery.num-nodes") lazy val k8sHostFormat = config.as[Option[String]]("cluster-discovery.k8s-stateful-sets-hostname-format") // used for development mode only lazy val hostList = config.as[Option[Seq[String]]]("cluster-discovery.host-list") lazy val localhostOrdinal = config.as[Option[Int]]("cluster-discovery.localhost-ordinal") + lazy val minNumNodes = config.as[Option[Int]]("min-num-nodes-in-cluster") /** * Returns IngestionConfig/dataset configuration from parsing dataset-configs file paths. diff --git a/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala b/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala index 0c67cedab5..183ca08ec7 100644 --- a/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala @@ -37,8 +37,9 @@ object IngestionActor { source: NodeClusterActor.IngestionSource, downsample: DownsampleConfig, storeConfig: StoreConfig, + numShards: Int, statusActor: ActorRef): Props = - Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, statusActor)) + Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, numShards, statusActor)) } /** @@ -62,6 +63,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef, source: NodeClusterActor.IngestionSource, downsample: DownsampleConfig, storeConfig: StoreConfig, + numShards: Int, statusActor: ActorRef) extends BaseActor { import IngestionActor._ @@ -170,7 +172,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef, // scalastyle:off method.length private def startIngestion(shard: Int): Unit = { - try tsStore.setup(ref, schemas, shard, storeConfig, downsample) catch { + try tsStore.setup(ref, schemas, shard, storeConfig, numShards, downsample) catch { case ShardAlreadySetup(ds, s) => logger.warn(s"dataset=$ds shard=$s already setup, skipping....") return diff --git a/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala b/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala index f3abf68bbc..6896d73ae3 100644 --- a/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala @@ -115,6 +115,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, setupDataset( dataset, ingestConfig.storeConfig, + ingestConfig.numShards, IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig), ingestConfig.downsampleConfig) } @@ -141,6 +142,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, */ private def setupDataset(dataset: Dataset, storeConf: StoreConfig, + numShards: Int, source: IngestionSource, downsample: DownsampleConfig, schemaOverride: Boolean = false): Unit = { @@ -154,7 +156,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!") val props = IngestionActor.props(dataset.ref, schemas, memStore, - source, downsample, storeConf, statusActor.get) + source, downsample, storeConf, numShards, statusActor.get) val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}") context.watch(ingester) ingesters(ref) = ingester @@ -187,7 +189,9 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, def ingestHandlers: Receive = LoggingReceive { case SetupDataset(dataset, resources, source, storeConf, downsample) => // used only in unit tests - if (!(ingesters contains dataset.ref)) { setupDataset(dataset, storeConf, source, downsample, true) } + if (!(ingesters contains dataset.ref)) { + setupDataset(dataset, storeConf, resources.numShards, source, downsample, true) + } case IngestRows(dataset, shard, rows) => withIngester(sender(), dataset) { _ ! IngestionActor.IngestRows(sender(), shard, rows) } diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 12a68e24b7..dd63e13ab8 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -402,7 +402,8 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, ackTo.foreach(_ ! DatasetExists(dataset.ref)) Map.empty case None => - val resources = DatasetResourceSpec(ingestConfig.numShards, ingestConfig.minNumNodes) + val minNumNodes = settings.minNumNodes.getOrElse(ingestConfig.minNumNodes) + val resources = DatasetResourceSpec(ingestConfig.numShards, minNumNodes) val mapper = new ShardMapper(resources.numShards) _shardMappers(dataset.ref) = mapper // Access the shardmapper through the HashMap so even if it gets replaced it will update the shard stats diff --git a/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala b/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala index f91968bf45..9db3f19776 100644 --- a/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala +++ b/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala @@ -40,11 +40,13 @@ class FiloDbClusterDiscovery(settings: FilodbSettings, } def shardsForOrdinal(ordinal: Int, numShards: Int): Seq[Int] = { - require(ordinal < settings.numNodes, s"Ordinal $ordinal was not expected. Number of nodes is ${settings.numNodes}") - val numShardsPerHost = numShards / settings.numNodes + require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided") + require(ordinal < settings.minNumNodes.get, s"Ordinal $ordinal was not expected. " + + s"Number of nodes is ${settings.minNumNodes.get}") + val numShardsPerHost = numShards / settings.minNumNodes.get // Suppose we have a total of 8 shards and 2 hosts, assuming the hostnames are host-0 and host-1, we will map // host-0 to shard [0,1,2,3] and host-1 to shard [4,5,6,7] - val numExtraShardsToAssign = numShards % settings.numNodes + val numExtraShardsToAssign = numShards % settings.minNumNodes.get val (firstShardThisNode, numShardsThisHost) = if (numExtraShardsToAssign != 0) { logger.warn("For stateful shard assignment, numShards should be a multiple of nodes per shard, " + "using default strategy") @@ -69,8 +71,9 @@ class FiloDbClusterDiscovery(settings: FilodbSettings, def shardsForLocalhost(numShards: Int): Seq[Int] = shardsForOrdinal(ordinalOfLocalhost, numShards) lazy private val hostNames = { + require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided") if (settings.k8sHostFormat.isDefined) { - (0 until settings.numNodes).map(i => String.format(settings.k8sHostFormat.get, i.toString)) + (0 until settings.minNumNodes.get).map(i => String.format(settings.k8sHostFormat.get, i.toString)) } else if (settings.hostList.isDefined) { settings.hostList.get.sorted // sort to make order consistent on all nodes of cluster } else throw new IllegalArgumentException("Cluster Discovery mechanism not defined") diff --git a/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala b/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala index 6462b356e4..e96b054160 100644 --- a/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala +++ b/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala @@ -84,7 +84,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, .foreach { downsampleDataset => memStore.store.initialize(downsampleDataset, ingestConfig.numShards) } setupDataset( dataset, - ingestConfig.storeConfig, + ingestConfig.storeConfig, ingestConfig.numShards, IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig), ingestConfig.downsampleConfig) initShards(dataset, ingestConfig) @@ -121,6 +121,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, */ private def setupDataset(dataset: Dataset, storeConf: StoreConfig, + numShards: Int, source: IngestionSource, downsample: DownsampleConfig, schemaOverride: Boolean = false): Unit = { @@ -132,7 +133,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!") val props = IngestionActor.props(dataset.ref, schemas, memStore, - source, downsample, storeConf, self) + source, downsample, storeConf, numShards, self) val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}") context.watch(ingester) ingestionActors(ref) = ingester diff --git a/coordinator/src/test/scala/filodb.coordinator/FiloDbClusterDiscoverySpec.scala b/coordinator/src/test/scala/filodb.coordinator/FiloDbClusterDiscoverySpec.scala index cade043c61..2c2c729e22 100644 --- a/coordinator/src/test/scala/filodb.coordinator/FiloDbClusterDiscoverySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/FiloDbClusterDiscoverySpec.scala @@ -11,7 +11,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec { val config = ConfigFactory.parseString( """ - |filodb.cluster-discovery.num-nodes = 4 + |filodb.min-num-nodes-in-cluster = 4 |""".stripMargin) val settings = new FilodbSettings(config) @@ -29,7 +29,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec { "Should allocate the extra n shards to first n nodes" in { val config = ConfigFactory.parseString( """ - |filodb.cluster-discovery.num-nodes = 5 + |filodb.min-num-nodes-in-cluster = 5 |""".stripMargin) val settings = new FilodbSettings(config) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index e2cac29f02..1d92e10964 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -1,11 +1,16 @@ filodb { v2-cluster-enabled = false + + # Number of nodes in cluster; used to calculate per-shard resources based + # on how many shards assigned to node. + # Required config if v2-clustering or automatic memory-alloc is enabled + # min-num-nodes-in-cluster = 2 + cluster-discovery { // set this to a smaller value (like 30s) at thee query entry points // if FiloDB HTTP API is indeed the query entry point, this should be overridden to small value // so that failure detection is quick. failure-detection-interval = 15 minutes - num-nodes = 2 # one of the two below properties should be enabled # enable this to use the k8s stateful sets mode and its hostname to extract ordinal @@ -691,6 +696,41 @@ filodb { # Note: this memory is shared across all configued datasets on a node. ingestion-buffer-mem-size = 200MB + memory-alloc { + # automatic memory allocation is enabled if true + automatic-alloc-enabled = false + + # if not provided this is calculated as ContainerOrNodeMemory - os-memory-needs - CurrentJVMHeapMemory + # available-memory-bytes = 5GB + + # memory dedicated for proper functioning of OS + os-memory-needs = 500MB + + # NOTE: In the three configs below, + # lucene-memory-percent + native-memory-manager-percent + block-memory-manager-percent + # should equal 100 + ############################################################## + # # # # + # LuceneMemPercent # NativeMemPercent # BlockMemPercent # + # # # # + ############################################################## + + # Memory percent of available-memory reserved for Lucene memory maps. + # Note we do not use this config to explicitly allocate space for lucene. + # But reserving this space ensures that more of the lucene memory maps are stored in memory + lucene-memory-percent = 5 + + # memory percent of available-memory reserved for native memory manager + # (used for partKeys, chunkMaps, chunkInfos, writeBuffers) + native-memory-manager-percent = 24 + + # Memory percent of available-memory reserved for block memory manager + # (used for storing chunks) + # This is divvied amongst datasets on the node per configuration for dataset + # The shards of the dataset on the node get even amount of memory from this fraction + block-memory-manager-percent = 71 + } + # At the cost of some extra heap memory, we can track queries holding shared lock for a long time # and starving the exclusive access of lock for eviction track-queries-holding-eviction-lock = true diff --git a/core/src/main/scala/filodb.core/Utils.scala b/core/src/main/scala/filodb.core/Utils.scala index 724c4556fb..7968fcd257 100644 --- a/core/src/main/scala/filodb.core/Utils.scala +++ b/core/src/main/scala/filodb.core/Utils.scala @@ -2,6 +2,7 @@ package filodb.core import java.lang.management.ManagementFactory +import com.typesafe.config.{Config, ConfigRenderOptions} import com.typesafe.scalalogging.StrictLogging object Utils extends StrictLogging { @@ -13,4 +14,28 @@ object Utils extends StrictLogging { if (cpuTimeEnabled) threadMbean.getCurrentThreadCpuTime else System.nanoTime() } + + def calculateAvailableOffHeapMemory(filodbConfig: Config): Long = { + val containerMemory = ManagementFactory.getOperatingSystemMXBean() + .asInstanceOf[com.sun.management.OperatingSystemMXBean].getTotalPhysicalMemorySize() + val currentJavaHeapMemory = Runtime.getRuntime().maxMemory() + val osMemoryNeeds = filodbConfig.getMemorySize("memstore.memory-alloc.os-memory-needs").toBytes + logger.info(s"Detected available memory containerMemory=$containerMemory" + + s" currentJavaHeapMemory=$currentJavaHeapMemory osMemoryNeeds=$osMemoryNeeds") + + logger.info(s"Memory Alloc Options: " + + s"${filodbConfig.getConfig("memstore.memory-alloc").root().render(ConfigRenderOptions.concise())}") + + val availableMem = if (filodbConfig.hasPath("memstore.memory-alloc.available-memory-bytes")) { + val avail = filodbConfig.getMemorySize("memstore.memory-alloc.available-memory-bytes").toBytes + logger.info(s"Using automatic-memory-config using overridden memory-alloc.available-memory $avail") + avail + } else { + logger.info(s"Using automatic-memory-config using without available memory override") + containerMemory - currentJavaHeapMemory - osMemoryNeeds + } + logger.info(s"Available memory calculated or configured as $availableMem") + availableMem + } + } diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala index 86f96146fd..c2da9e672f 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordContainer.scala @@ -7,7 +7,7 @@ import scala.reflect.ClassTag import debox.Buffer import filodb.memory.{BinaryRegionConsumer, BinaryRegionLarge} -import filodb.memory.format.{RowReader, UnsafeUtils} +import filodb.memory.format.UnsafeUtils /** * A RecordContainer is a binary, wire-compatible container for BinaryRecords V2. @@ -72,13 +72,13 @@ final class RecordContainer(val base: Any, val offset: Long, maxLength: Int, * Iterates through each BinaryRecord as a RowReader. Results in two allocations: the Iterator * as well as a BinaryRecordRowReader. */ - final def iterate(schema: RecordSchema): Iterator[RowReader] = new Iterator[RowReader] { + final def iterate(schema: RecordSchema): Iterator[BinaryRecordRowReader] = new Iterator[BinaryRecordRowReader] { val reader = new BinaryRecordRowReader(schema, base) val endOffset = offset + 4 + numBytes var curOffset = offset + ContainerHeaderLen final def hasNext: Boolean = curOffset < endOffset - final def next: RowReader = { + final def next: BinaryRecordRowReader = { val recordLen = BinaryRegionLarge.numBytes(base, curOffset) reader.recordOffset = curOffset curOffset += (recordLen + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala index 42dddfd2ca..01a1252f7c 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala @@ -624,7 +624,12 @@ trait BinaryRecordRowReaderBase extends RowReader { final class BinaryRecordRowReader(val schema: RecordSchema, var recordBase: Any = UnsafeUtils.ZeroPointer, - var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase + var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase { + def recordLength: Int = { + val len = BinaryRegionLarge.numBytes(recordBase, recordOffset) + (len + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes + } +} final class MultiSchemaBRRowReader(var recordBase: Any = UnsafeUtils.ZeroPointer, var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase { diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala index 7f3aca62e0..66174a3582 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala @@ -1,13 +1,14 @@ package filodb.core.downsample +import java.util +import java.util.concurrent.atomic.AtomicLong + import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import com.typesafe.config.Config import com.typesafe.scalalogging.StrictLogging -import java.util -import java.util.concurrent.atomic.AtomicLong import kamon.Kamon import kamon.metric.MeasurementUnit import kamon.tag.TagSet @@ -16,12 +17,12 @@ import monix.execution.{CancelableFuture, Scheduler, UncaughtExceptionReporter} import monix.reactive.Observable import org.apache.lucene.search.CollectionTerminatedException -import filodb.core.{DatasetRef, Types} +import filodb.core.{DatasetRef, Types, Utils} import filodb.core.binaryrecord2.RecordSchema import filodb.core.memstore._ import filodb.core.memstore.ratelimit.{CardinalityManager, CardinalityRecord, QuotaSource} import filodb.core.metadata.Schemas -import filodb.core.query.{ColumnFilter, Filter, QueryContext, QueryLimitException, QuerySession, QueryWarnings} +import filodb.core.query._ import filodb.core.store._ import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String} import filodb.memory.format.ZeroCopyUTF8String._ @@ -341,6 +342,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, // Second iteration is for query result evaluation. Loading everything to heap // is expensive, but we do it to handle data sizing for metrics that have // continuous churn. See capDataScannedPerShardCheck method. + val startNs = Utils.currentThreadCpuTimeNanos val recs = partKeyIndex.partKeyRecordsFromFilters(filters, chunkMethod.startTime, chunkMethod.endTime) val _schema = recs.headOption.map { pkRec => RecordSchema.schemaID(pkRec.partKey, UnsafeUtils.arayOffset) @@ -353,9 +355,10 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, val metricGroupBy = deploymentPartitionName +: clusterType +: metricShardKeys.map { col => filters.collectFirst { case ColumnFilter(c, Filter.Equals(filtVal: String)) if c == col => filtVal - }.getOrElse("unknown") + }.getOrElse("multiple") }.toList querySession.queryStats.getTimeSeriesScannedCounter(metricGroupBy).addAndGet(recs.length) + querySession.queryStats.getCpuNanosCounter(metricGroupBy).addAndGet(Utils.currentThreadCpuTimeNanos - startNs) val chunksReadCounter = querySession.queryStats.getDataBytesScannedCounter(metricGroupBy) PartLookupResult(shardNum, chunkMethod, debox.Buffer.empty, diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesStore.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesStore.scala index 9702442bec..5f83c06c9c 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesStore.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesStore.scala @@ -59,7 +59,7 @@ extends TimeSeriesStore with StrictLogging { override def metastore: MetaStore = ??? // Not needed // TODO: Change the API to return Unit Or ShardAlreadySetup, instead of throwing. Make idempotent. - def setup(ref: DatasetRef, schemas: Schemas, shard: Int, storeConf: StoreConfig, + def setup(ref: DatasetRef, schemas: Schemas, shard: Int, storeConf: StoreConfig, numShards: Int, downsampleConfig: DownsampleConfig = DownsampleConfig.disabled): Unit = synchronized { val shards = datasets.getOrElseUpdate(ref, new NonBlockingHashMapLong[DownsampledTimeSeriesShard](32, false)) val quotaSource = quotaSources.getOrElseUpdate(ref, diff --git a/core/src/main/scala/filodb.core/memstore/OnDemandPagingShard.scala b/core/src/main/scala/filodb.core/memstore/OnDemandPagingShard.scala index 91ee5c0e70..760fd9e823 100644 --- a/core/src/main/scala/filodb.core/memstore/OnDemandPagingShard.scala +++ b/core/src/main/scala/filodb.core/memstore/OnDemandPagingShard.scala @@ -26,6 +26,7 @@ import filodb.memory.NativeMemoryManager class OnDemandPagingShard(ref: DatasetRef, schemas: Schemas, storeConfig: StoreConfig, + numShards: Int, quotaSource: QuotaSource, shardNum: Int, bufferMemoryManager: NativeMemoryManager, @@ -34,7 +35,7 @@ class OnDemandPagingShard(ref: DatasetRef, evictionPolicy: PartitionEvictionPolicy, filodbConfig: Config) (implicit ec: ExecutionContext) extends -TimeSeriesShard(ref, schemas, storeConfig, quotaSource, shardNum, bufferMemoryManager, rawStore, +TimeSeriesShard(ref, schemas, storeConfig, numShards, quotaSource, shardNum, bufferMemoryManager, rawStore, metastore, evictionPolicy, filodbConfig)(ec) { import TimeSeriesShard._ import FiloSchedulers._ diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 89b5210ed2..d67f640c8d 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -46,7 +46,11 @@ import filodb.memory.{BinaryRegionLarge, UTF8StringMedium, UTF8StringShort} import filodb.memory.format.{UnsafeUtils, ZeroCopyUTF8String => UTF8Str} object PartKeyLuceneIndex { - final val PART_ID = "__partId__" + // NOTE: these partId fields need to be separate because Lucene 9.7.0 enforces consistent types for document + // field values (i.e. a field cannot have both numeric and string values). Additional details can be found + // here: https://github.com/apache/lucene/pull/11 + final val PART_ID_DV = "__partIdDv__" + final val PART_ID_FIELD = "__partIdField__" final val START_TIME = "__startTime__" final val END_TIME = "__endTime__" final val PART_KEY = "__partKey__" @@ -54,7 +58,7 @@ object PartKeyLuceneIndex { final val FACET_FIELD_PREFIX = "$facet_" final val LABEL_LIST_FACET = FACET_FIELD_PREFIX + LABEL_LIST - final val ignoreIndexNames = HashSet(START_TIME, PART_KEY, END_TIME, PART_ID) + final val ignoreIndexNames = HashSet(START_TIME, PART_KEY, END_TIME, PART_ID_FIELD, PART_ID_DV) val MAX_STR_INTERN_ENTRIES = 10000 val MAX_TERMS_TO_ITERATE = 10000 @@ -279,8 +283,8 @@ class PartKeyLuceneIndex(ref: DatasetRef, var facetsConfig: FacetsConfig = _ val document = new Document() - private[memstore] val partIdField = new StringField(PART_ID, "0", Store.NO) - private val partIdDv = new NumericDocValuesField(PART_ID, 0) + private[memstore] val partIdField = new StringField(PART_ID_FIELD, "0", Store.NO) + private val partIdDv = new NumericDocValuesField(PART_ID_DV, 0) private val partKeyDv = new BinaryDocValuesField(PART_KEY, new BytesRef()) private val startTimeField = new LongPoint(START_TIME, 0L) private val startTimeDv = new NumericDocValuesField(START_TIME, 0L) @@ -328,6 +332,13 @@ class PartKeyLuceneIndex(ref: DatasetRef, partIdDv.setLongValue(partId) document.add(partIdDv) } + + /* + * As of this writing, this documentId will be set as one of two values: + * - In TimeSeriesShard: the string representation of a partId (e.g. "42") + * - In DownsampledTimeSeriesShard: the base64-encoded sha256 of the document ID. This is used to support + * persistence of the downsample index; ephemeral partIds cannot be used. + */ partIdField.setStringValue(documentId) startTimeField.setLongValue(startTime) @@ -468,7 +479,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, cforRange { 0 until partIds.length } { i => terms.add(new BytesRef(partIds(i).toString.getBytes(StandardCharsets.UTF_8))) } - indexWriter.deleteDocuments(new TermInSetQuery(PART_ID, terms)) + indexWriter.deleteDocuments(new TermInSetQuery(PART_ID_FIELD, terms)) } } @@ -646,7 +657,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum") val doc = luceneDocument.get() val docToAdd = doc.facetsConfig.build(doc.document) - val term = new Term(PART_ID, documentId) + val term = new Term(PART_ID_FIELD, documentId) indexWriter.updateDocument(term, docToAdd) } @@ -711,7 +722,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, */ def partKeyFromPartId(partId: Int): Option[BytesRef] = { val collector = new SinglePartKeyCollector() - withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID, partId.toString)), collector) ) + withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID_FIELD, partId.toString)), collector) ) Option(collector.singleResult) } @@ -720,7 +731,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, */ def startTimeFromPartId(partId: Int): Long = { val collector = new NumericDocValueCollector(PartKeyLuceneIndex.START_TIME) - withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID, partId.toString)), collector)) + withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID_FIELD, partId.toString)), collector)) collector.singleResult } @@ -738,7 +749,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, } // dont use BooleanQuery which will hit the 1024 term limit. Instead use TermInSetQuery which is // more efficient within Lucene - withNewSearcher(s => s.search(new TermInSetQuery(PART_ID, terms), collector)) + withNewSearcher(s => s.search(new TermInSetQuery(PART_ID_FIELD, terms), collector)) span.tag(s"num-partitions-to-page", terms.size()) val latency = System.nanoTime - startExecute span.mark(s"index-startTimes-for-odp-lookup-latency=${latency}ns") @@ -753,7 +764,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, */ def endTimeFromPartId(partId: Int): Long = { val collector = new NumericDocValueCollector(PartKeyLuceneIndex.END_TIME) - withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID, partId.toString)), collector)) + withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID_FIELD, partId.toString)), collector)) collector.singleResult } @@ -777,7 +788,6 @@ class PartKeyLuceneIndex(ref: DatasetRef, coll.numHits } - def foreachPartKeyMatchingFilter(columnFilters: Seq[ColumnFilter], startTime: Long, endTime: Long, func: (BytesRef) => Unit): Int = { @@ -805,7 +815,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, logger.debug(s"Updating document ${partKeyString(documentId, partKeyOnHeapBytes, partKeyBytesRefOffset)} " + s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum") val docToAdd = doc.facetsConfig.build(doc.document) - indexWriter.updateDocument(new Term(PART_ID, partId.toString), docToAdd) + indexWriter.updateDocument(new Term(PART_ID_FIELD, partId.toString), docToAdd) } /** @@ -817,12 +827,41 @@ class PartKeyLuceneIndex(ref: DatasetRef, logger.info(s"Refreshed index searchers to make reads consistent for dataset=$ref shard=$shardNum") } + val regexChars = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\') + val regexCharsMinusPipe = (regexChars.toSet - '|').toArray + + // scalastyle:off method.length private def leafFilter(column: String, filter: Filter): Query = { + + def equalsQuery(value: String): Query = { + if (value.nonEmpty) new TermQuery(new Term(column, value)) + else leafFilter(column, NotEqualsRegex(".+")) // value="" means the label is absent or has an empty value. + } + filter match { case EqualsRegex(value) => val regex = removeRegexAnchors(value.toString) - if (regex.nonEmpty) new RegexpQuery(new Term(column, regex), RegExp.NONE) - else leafFilter(column, NotEqualsRegex(".+")) // value="" means the label is absent or has an empty value. + if (regex == "") { + // if label=~"" then match empty string or label not present condition too + leafFilter(column, NotEqualsRegex(".+")) + } else if (regex.replaceAll("\\.\\*", "") == "") { + // if label=~".*" then match all docs since promQL matches .* with absent label too + new MatchAllDocsQuery + } else if (regex.forall(c => !regexChars.contains(c))) { + // if all regex special chars absent, then treat like Equals + equalsQuery(regex) + } else if (regex.forall(c => !regexCharsMinusPipe.contains(c))) { + // if pipe is only regex special char present, then convert to IN query + new TermInSetQuery(column, regex.split('|').map(t => new BytesRef(t)): _*) + } else if (regex.endsWith(".*") && regex.length > 2 && + regex.dropRight(2).forall(c => !regexChars.contains(c))) { + // if suffix is .* and no regex special chars present in non-empty prefix, then use prefix query + new PrefixQuery(new Term(column, regex.dropRight(2))) + } else { + // regular non-empty regex query + new RegexpQuery(new Term(column, regex), RegExp.NONE) + } + case NotEqualsRegex(value) => val term = new Term(column, removeRegexAnchors(value.toString)) val allDocs = new MatchAllDocsQuery @@ -830,9 +869,10 @@ class PartKeyLuceneIndex(ref: DatasetRef, booleanQuery.add(allDocs, Occur.FILTER) booleanQuery.add(new RegexpQuery(term, RegExp.NONE), Occur.MUST_NOT) booleanQuery.build() + case Equals(value) => - if (value.toString.nonEmpty) new TermQuery(new Term(column, value.toString)) - else leafFilter(column, NotEqualsRegex(".+")) // value="" means the label is absent or has an empty value. + equalsQuery(value.toString) + case NotEquals(value) => val str = value.toString val term = new Term(column, str) @@ -849,22 +889,18 @@ class PartKeyLuceneIndex(ref: DatasetRef, booleanQuery.build() case In(values) => - if (values.size < 2) - throw new IllegalArgumentException("In filter should have atleast 2 values") - val booleanQuery = new BooleanQuery.Builder - values.foreach { value => - booleanQuery.add(new TermQuery(new Term(column, value.toString)), Occur.SHOULD) - } - booleanQuery.build() + new TermInSetQuery(column, values.toArray.map(t => new BytesRef(t.toString)): _*) + case And(lhs, rhs) => val andQuery = new BooleanQuery.Builder andQuery.add(leafFilter(column, lhs), Occur.FILTER) andQuery.add(leafFilter(column, rhs), Occur.FILTER) andQuery.build() + case _ => throw new UnsupportedOperationException } } - + //scalastyle:on method.length def partIdsFromFilters(columnFilters: Seq[ColumnFilter], startTime: Long, endTime: Long): debox.Buffer[Int] = { @@ -910,7 +946,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, val startExecute = System.nanoTime() val span = Kamon.currentSpan() - val query: BooleanQuery = colFiltersToQuery(columnFilters, startTime, endTime) + val query = colFiltersToQuery(columnFilters, startTime, endTime) logger.debug(s"Querying dataset=$ref shard=$shardNum partKeyIndex with: $query") withNewSearcher(s => s.search(query, collector)) val latency = System.nanoTime - startExecute @@ -927,7 +963,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, booleanQuery.add(LongPoint.newRangeQuery(START_TIME, 0, endTime), Occur.FILTER) booleanQuery.add(LongPoint.newRangeQuery(END_TIME, startTime, Long.MaxValue), Occur.FILTER) val query = booleanQuery.build() - query + new ConstantScoreQuery(query) // disable scoring } def partIdFromPartKeySlow(partKeyBase: Any, @@ -1063,7 +1099,7 @@ class SinglePartIdCollector extends SimpleCollector { // gets called for each segment override def doSetNextReader(context: LeafReaderContext): Unit = { - partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID) + partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID_DV) } // gets called for each matching document in current segment @@ -1101,7 +1137,7 @@ class TopKPartIdsCollector(limit: Int) extends Collector with StrictLogging { def getLeafCollector(context: LeafReaderContext): LeafCollector = { logger.trace("New segment inspected:" + context.id) endTimeDv = DocValues.getNumeric(context.reader, END_TIME) - partIdDv = DocValues.getNumeric(context.reader, PART_ID) + partIdDv = DocValues.getNumeric(context.reader, PART_ID_DV) new LeafCollector() { override def setScorer(scorer: Scorable): Unit = {} @@ -1150,7 +1186,7 @@ class PartIdCollector extends SimpleCollector { override def doSetNextReader(context: LeafReaderContext): Unit = { //set the subarray of the numeric values for all documents in the context - partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID) + partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID_DV) } override def collect(doc: Int): Unit = { @@ -1171,7 +1207,7 @@ class PartIdStartTimeCollector extends SimpleCollector { override def doSetNextReader(context: LeafReaderContext): Unit = { //set the subarray of the numeric values for all documents in the context - partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID) + partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID_DV) startTimeDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.START_TIME) } @@ -1220,7 +1256,7 @@ class ActionCollector(action: (Int, BytesRef) => Unit) extends SimpleCollector { override def scoreMode(): ScoreMode = ScoreMode.COMPLETE_NO_SCORES override def doSetNextReader(context: LeafReaderContext): Unit = { - partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID) + partIdDv = context.reader().getNumericDocValues(PartKeyLuceneIndex.PART_ID_DV) partKeyDv = context.reader().getBinaryDocValues(PartKeyLuceneIndex.PART_KEY) } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesMemStore.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesMemStore.scala index fd3f018b91..3a178dce69 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesMemStore.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesMemStore.scala @@ -11,7 +11,7 @@ import monix.execution.{CancelableFuture, Scheduler} import monix.reactive.Observable import org.jctools.maps.NonBlockingHashMapLong -import filodb.core.{DatasetRef, QueryTimeoutException, Response, Types} +import filodb.core.{DatasetRef, QueryTimeoutException, Response, Types, Utils} import filodb.core.downsample.DownsampleConfig import filodb.core.memstore.ratelimit.{CardinalityRecord, ConfigQuotaSource} import filodb.core.metadata.Schemas @@ -33,7 +33,6 @@ extends TimeSeriesStore with StrictLogging { type Shards = NonBlockingHashMapLong[TimeSeriesShard] private val datasets = new HashMap[DatasetRef, Shards] - private val datasetMemFactories = new HashMap[DatasetRef, NativeMemoryManager] private val quotaSources = new HashMap[DatasetRef, ConfigQuotaSource] val stats = new ChunkSourceStats @@ -44,7 +43,18 @@ extends TimeSeriesStore with StrictLogging { private val partEvictionPolicy = evictionPolicy.getOrElse( new CompositeEvictionPolicy(ensureTspHeadroomPercent, ensureNmmHeadroomPercent)) - private lazy val ingestionMemory = filodbConfig.getMemorySize("memstore.ingestion-buffer-mem-size").toBytes + private[memstore] lazy val ingestionMemory = { + if (filodbConfig.getBoolean("memstore.memory-alloc.automatic-alloc-enabled")) { + val availableMemoryBytes: Long = Utils.calculateAvailableOffHeapMemory(filodbConfig) + val nativeMemoryManagerPercent = filodbConfig.getDouble("memstore.memory-alloc.native-memory-manager-percent") + val blockMemoryManagerPercent = filodbConfig.getDouble("memstore.memory-alloc.block-memory-manager-percent") + val lucenePercent = filodbConfig.getDouble("memstore.memory-alloc.lucene-memory-percent") + require(Math.abs(nativeMemoryManagerPercent + blockMemoryManagerPercent + lucenePercent - 100) < 0.001, + s"Configured Block($nativeMemoryManagerPercent), Native($blockMemoryManagerPercent) " + + s"and Lucene($lucenePercent) memory percents don't sum to 100.0") + (availableMemoryBytes * nativeMemoryManagerPercent / 100).toLong + } else filodbConfig.getMemorySize("memstore.ingestion-buffer-mem-size").toBytes + } private[this] lazy val ingestionMemFactory: NativeMemoryManager = { logger.info(s"Allocating $ingestionMemory bytes for WriteBufferPool/PartitionKeys") @@ -67,7 +77,7 @@ extends TimeSeriesStore with StrictLogging { } // TODO: Change the API to return Unit Or ShardAlreadySetup, instead of throwing. Make idempotent. - def setup(ref: DatasetRef, schemas: Schemas, shard: Int, storeConf: StoreConfig, + def setup(ref: DatasetRef, schemas: Schemas, shard: Int, storeConf: StoreConfig, numShards: Int, downsample: DownsampleConfig = DownsampleConfig.disabled): Unit = synchronized { val shards = datasets.getOrElseUpdate(ref, new NonBlockingHashMapLong[TimeSeriesShard](32, false)) val quotaSource = quotaSources.getOrElseUpdate(ref, @@ -75,8 +85,8 @@ extends TimeSeriesStore with StrictLogging { if (shards.containsKey(shard)) { throw ShardAlreadySetup(ref, shard) } else { - val tsdb = new OnDemandPagingShard(ref, schemas, storeConf, quotaSource, shard, ingestionMemFactory, store, - metastore, partEvictionPolicy, filodbConfig) + val tsdb = new OnDemandPagingShard(ref, schemas, storeConf, numShards, quotaSource, shard, + ingestionMemFactory, store, metastore, partEvictionPolicy, filodbConfig) shards.put(shard, tsdb) } } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 2c5dc84bad..4293fef734 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -258,6 +258,7 @@ case class TimeSeriesShardInfo(shardNum: Int, class TimeSeriesShard(val ref: DatasetRef, val schemas: Schemas, val storeConfig: StoreConfig, + numShards: Int, quotaSource: QuotaSource, val shardNum: Int, val bufferMemoryManager: NativeMemoryManager, @@ -365,7 +366,25 @@ class TimeSeriesShard(val ref: DatasetRef, val ingestSched = Scheduler.singleThread(s"$IngestSchedName-$ref-$shardNum", reporter = UncaughtExceptionReporter(logger.error("Uncaught Exception in TimeSeriesShard.ingestSched", _))) - private val blockMemorySize = storeConfig.shardMemSize + private[memstore] val blockMemorySize = { + val size = if (filodbConfig.getBoolean("memstore.memory-alloc.automatic-alloc-enabled")) { + val numNodes = filodbConfig.getInt("min-num-nodes-in-cluster") + val availableMemoryBytes: Long = Utils.calculateAvailableOffHeapMemory(filodbConfig) + val blockMemoryManagerPercent = filodbConfig.getDouble("memstore.memory-alloc.block-memory-manager-percent") + val blockMemForDatasetPercent = storeConfig.shardMemPercent // fraction of block memory for this dataset + val numShardsPerNode = Math.ceil(numShards / numNodes.toDouble) + logger.info(s"Calculating Block memory size with automatic allocation strategy. " + + s"Dataset dataset=$ref has blockMemForDatasetPercent=$blockMemForDatasetPercent " + + s"numShardsPerNode=$numShardsPerNode") + (availableMemoryBytes * blockMemoryManagerPercent * + blockMemForDatasetPercent / 100 / 100 / numShardsPerNode).toLong + } else { + storeConfig.shardMemSize + } + logger.info(s"Block Memory for dataset=$ref shard=$shardNum bytesAllocated=$size") + size + } + protected val numGroups = storeConfig.groupsPerShard private val chunkRetentionHours = (storeConfig.diskTTLSeconds / 3600).toInt private[memstore] val pagingEnabled = storeConfig.demandPagingEnabled @@ -915,14 +934,8 @@ class TimeSeriesShard(val ref: DatasetRef, markPartAsNotIngesting(p, odp = false) if (storeConfig.meteringEnabled) { val shardKey = p.schema.partKeySchema.colValues(p.partKeyBase, p.partKeyOffset, - p.schema.options.shardKeyColumns) - val newCard = cardTracker.modifyCount(shardKey, 0, -1) - // TODO remove temporary debugging since we are seeing some negative counts - if (newCard.exists(_.value.activeTsCount < 0) && p.partID % 100 < 5) - // log for 5% of the cases to reduce log volume - logger.error(s"For some reason, activeTs count negative when updating card for " + - s"partKey: ${p.stringPartition} newCard: $newCard oldActivelyIngestingSize=$oldActivelyIngestingSize " + - s"newActivelyIngestingSize=${activelyIngesting.size}") + p.schema.options.shardKeyColumns) + cardTracker.modifyCount(shardKey, 0, -1) } } } @@ -1756,15 +1769,18 @@ class TimeSeriesShard(val ref: DatasetRef, startTime: Long, querySession: QuerySession, limit: Int): Iterator[ZeroCopyUTF8String] = { - if (indexFacetingEnabledAllLabels || + val metricShardKeys = schemas.part.options.shardKeyColumns + val metricGroupBy = deploymentPartitionName +: clusterType +: shardKeyValuesFromFilter(metricShardKeys, filters) + val startNs = Utils.currentThreadCpuTimeNanos + val res = if (indexFacetingEnabledAllLabels || (indexFacetingEnabledShardKeyLabels && schemas.part.options.shardKeyColumns.contains(label))) { partKeyIndex.labelValuesEfficient(filters, startTime, endTime, label, limit).iterator.map(_.utf8) } else { - val metricShardKeys = schemas.part.options.shardKeyColumns - val metricGroupBy = deploymentPartitionName +: clusterType +: shardKeyValuesFromFilter(metricShardKeys, filters) SingleLabelValuesResultIterator(partKeyIndex.partIdsFromFilters(filters, startTime, endTime), label, querySession, metricGroupBy, limit) } + querySession.queryStats.getCpuNanosCounter(metricGroupBy).addAndGet(startNs - Utils.currentThreadCpuTimeNanos) + res } /** @@ -1921,9 +1937,11 @@ class TimeSeriesShard(val ref: DatasetRef, val chunksReadCounter = querySession.queryStats.getDataBytesScannedCounter(metricGroupBy) // No matter if there are filters or not, need to run things through Lucene so we can discover potential // TSPartitions to read back from disk + val startNs = Utils.currentThreadCpuTimeNanos val matches = partKeyIndex.partIdsFromFilters(filters, chunkMethod.startTime, chunkMethod.endTime) shardStats.queryTimeRangeMins.record((chunkMethod.endTime - chunkMethod.startTime) / 60000 ) querySession.queryStats.getTimeSeriesScannedCounter(metricGroupBy).addAndGet(matches.length) + querySession.queryStats.getCpuNanosCounter(metricGroupBy).addAndGet(Utils.currentThreadCpuTimeNanos - startNs) Kamon.currentSpan().tag(s"num-partitions-from-index-$shardNum", matches.length) // first find out which partitions are being queried for data not in memory diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesStore.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesStore.scala index dfa0f1f0cb..c84a7298c9 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesStore.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesStore.scala @@ -75,7 +75,7 @@ trait TimeSeriesStore extends ChunkSource { * @param downsampleConfig configuration for downsampling operation. By default it is disabled. */ def setup(ref: DatasetRef, schemas: Schemas, shard: Int, - storeConf: StoreConfig, + storeConf: StoreConfig, numShards: Int, downsampleConfig: DownsampleConfig = DownsampleConfig.disabled): Unit /** diff --git a/core/src/main/scala/filodb.core/memstore/ratelimit/CardinalityManager.scala b/core/src/main/scala/filodb.core/memstore/ratelimit/CardinalityManager.scala index 92ea92bb19..dfe1d6897c 100644 --- a/core/src/main/scala/filodb.core/memstore/ratelimit/CardinalityManager.scala +++ b/core/src/main/scala/filodb.core/memstore/ratelimit/CardinalityManager.scala @@ -32,6 +32,15 @@ class CardinalityManager(datasetRef: DatasetRef, // physical resources for duplicate calculation private var isCardTriggered: Boolean = false + // number of partKeys to aggregate in memory at any given time, before we write the cardinality information to + // rocksDB. This is done to reduce the time taken to calculate cardinality of partitions with high number of + // TS by reducing the number writes issued to RocksDB ( From our profiling, high number of RocksDB writes lead to + // performance bottlenecks) + // For cardFlushCount = 500,000, we conservatively expect to consume - 128 bytes * 500,000 = ~64 MB + private val cardFlushCount: Option[Int] = + if (filodbConfig.hasPath("card-flush-count")) { + Some(filodbConfig.getInt("card-flush-count")) + } else None /** * `dataset-configs` is an string array where each string is a file path for a dataset config. This function reads @@ -132,9 +141,6 @@ class CardinalityManager(datasetRef: DatasetRef, /** * Triggers cardinalityCount if metering is enabled and the required criteria matches. - * It creates a new instance of CardinalityTracker and uses the PartKeyLuceneIndex to calculate cardinality count - * and store data in a local CardinalityStore. We then close the previous instance of CardinalityTracker and switch - * it with the new one we created in this call. * @param indexRefreshCount The number of time the indexRefresh has already happened. This is used in the logic of * shouldTriggerCardinalityCount */ @@ -143,34 +149,7 @@ class CardinalityManager(datasetRef: DatasetRef, try { if (shouldTriggerCardinalityCount(shardNum, numShardsPerNode, indexRefreshCount)) { isCardTriggered = true - val newCardTracker = getNewCardTracker() - var cardCalculationComplete = false - try { - partKeyIndex.calculateCardinality(partSchema, newCardTracker) - cardCalculationComplete = true - } catch { - case ex: Exception => - logger.error(s"[CardinalityManager]Error while calculating cardinality using" + - s" PartKeyLuceneIndex! shardNum=$shardNum indexRefreshCount=$indexRefreshCount", ex) - // cleanup resources used by the newCardTracker tracker to avoid leaking of resources - newCardTracker.close() - } - if (cardCalculationComplete) { - try { - // close the cardinality store and release the physical resources of the current cardinality store - close() - cardTracker = Some(newCardTracker) - logger.info(s"[CardinalityManager] Triggered cardinality count successfully for" + - s" shardNum=$shardNum indexRefreshCount=$indexRefreshCount") - } catch { - case ex: Exception => - // Very unlikely scenario, but can happen if the disk call fails. - logger.error(s"[CardinalityManager]Error closing card tracker! shardNum=$shardNum", ex) - // setting cardTracker to None in this case, since the exception happened on the close. We - // can't rely on the card store. The next trigger should re-build the card store and card tracker - cardTracker = None - } - } + createNewCardinalityTrackerAndCalculate(indexRefreshCount) isCardTriggered = false } else { @@ -189,6 +168,50 @@ class CardinalityManager(datasetRef: DatasetRef, } } + /** + * Creates a new instance of CardinalityTracker and uses the PartKeyLuceneIndex to calculate cardinality count + * and store data in a local CardinalityStore. We then close the previous instance of CardinalityTracker and switch + * it with the new one we created in this call. + * + * @param indexRefreshCount The number of time the indexRefresh has already happened. This is used in the logic of + * shouldTriggerCardinalityCount + */ + private def createNewCardinalityTrackerAndCalculate(indexRefreshCount: Int): Unit = { + val newCardTracker = getNewCardTracker() + var cardCalculationComplete = false + val startTimeMs = System.currentTimeMillis() + try { + logger.info(s"[CardinalityManager]Triggering cardinality count for shardNum=$shardNum " + + s"indexRefreshCount=$indexRefreshCount") + partKeyIndex.calculateCardinality(partSchema, newCardTracker) + cardCalculationComplete = true + } catch { + case ex: Exception => + logger.error(s"[CardinalityManager]Error while calculating cardinality using" + + s" PartKeyLuceneIndex! shardNum=$shardNum indexRefreshCount=$indexRefreshCount", ex) + // cleanup resources used by the newCardTracker tracker to avoid leaking of resources + newCardTracker.close() + } + if (cardCalculationComplete) { + try { + // close and release the physical resources of the outdated/previous cardinality store + close() + // reassign the cardTracker with the newly created CardinalityTracker object + cardTracker = Some(newCardTracker) + val timeTakenInSeconds = ((System.currentTimeMillis() - startTimeMs)/1000.0) + logger.info(s"[CardinalityManager] Triggered cardinality count successfully for" + + s" shardNum=$shardNum indexRefreshCount=$indexRefreshCount timeTakenInSeconds=$timeTakenInSeconds") + } catch { + case ex: Exception => + // Very unlikely scenario, but can happen if the disk call fails. + logger.error(s"[CardinalityManager]Error closing card tracker! shardNum=$shardNum", ex) + // setting cardTracker to None in this case, since the exception happened on the close. We + // can't rely on the card store. The next trigger should re-build the card store and card tracker + cardTracker = None + } + } + } + /** * Helper method to create a CardinalityTracker instance * @@ -197,7 +220,9 @@ class CardinalityManager(datasetRef: DatasetRef, private def getNewCardTracker(): CardinalityTracker = { val cardStore = new RocksDbCardinalityStore(datasetRef, shardNum) val defaultQuota = quotaSource.getDefaults(datasetRef) - val tracker = new CardinalityTracker(datasetRef, shardNum, shardKeyLen, defaultQuota, cardStore) + logger.info(s"[CardinalityManager] Creating new CardinalityTracker with flushCount=$cardFlushCount") + val tracker = new CardinalityTracker(datasetRef, shardNum, shardKeyLen, defaultQuota, cardStore, + flushCount = cardFlushCount) quotaSource.getQuotas(datasetRef).foreach { q => tracker.setQuota(q.shardKeyPrefix, q.quota) } diff --git a/core/src/main/scala/filodb.core/query/PartitionTimeRangeReader.scala b/core/src/main/scala/filodb.core/query/PartitionTimeRangeReader.scala index a86ad2a56a..e48711b368 100644 --- a/core/src/main/scala/filodb.core/query/PartitionTimeRangeReader.scala +++ b/core/src/main/scala/filodb.core/query/PartitionTimeRangeReader.scala @@ -47,8 +47,8 @@ final class PartitionTimeRangeReader(part: ReadablePartition, s"""${info.debugString} """ } message += "]" - logger.error(s"message ${message}") - throw new IllegalArgumentException(message) + logger.error(s"message $message") + throw new IllegalArgumentException("requirement length > 0 failed.") } } } diff --git a/core/src/main/scala/filodb.core/query/RangeVector.scala b/core/src/main/scala/filodb.core/query/RangeVector.scala index bba7fdd240..b338accf1c 100644 --- a/core/src/main/scala/filodb.core/query/RangeVector.scala +++ b/core/src/main/scala/filodb.core/query/RangeVector.scala @@ -213,8 +213,8 @@ sealed trait ScalarSingleValue extends ScalarRangeVector { else it } - // Negligible bytes sent over-the-wire. - override def estimateSerializedRowBytes: Long = 0 + // Negligible bytes sent over-the-wire. Don't bother calculating accurately. + override def estimateSerializedRowBytes: Long = SerializableRangeVector.SizeOfDouble } /** @@ -399,7 +399,10 @@ final class SerializedRangeVector(val key: RangeVectorKey, } else it } - override def estimateSerializedRowBytes: Long = containers.map(_.numBytes).sum + override def estimateSerializedRowBytes: Long = + containers.toIterator.flatMap(_.iterate(schema)) + .slice(startRecordNo, startRecordNo + numRowsSerialized) + .foldLeft(0)(_ + _.recordLength) def containersIterator : Iterator[RecordContainer] = containers.toIterator @@ -462,7 +465,7 @@ object SerializedRangeVector extends StrictLogging { val nextRow = rows.next() // Don't encode empty / NaN data over the wire if (!canRemoveEmptyRows(rv.outputRange, schema) || - schema.columns(1).colType == DoubleColumn && !nextRow.getDouble(1).isNaN || + schema.columns(1).colType == DoubleColumn && !java.lang.Double.isNaN(nextRow.getDouble(1)) || schema.columns(1).colType == HistogramColumn && !nextRow.getHistogram(1).isEmpty) { numRows += 1 builder.addFromReader(nextRow, schema, 0) @@ -487,11 +490,7 @@ object SerializedRangeVector extends StrictLogging { case None => builder.allContainers.toList case Some(firstContainer) => builder.allContainers.dropWhile(_ != firstContainer) } - val srv = new SerializedRangeVector(rv.key, numRows, containers, schema, startRecordNo, rv.outputRange) - val resultSize = srv.estimatedSerializedBytes - SerializedRangeVector.queryResultBytes.record(resultSize) - queryStats.getResultBytesCounter(Nil).addAndGet(resultSize) - srv + new SerializedRangeVector(rv.key, numRows, containers, schema, startRecordNo, rv.outputRange) } finally { queryStats.getCpuNanosCounter(Nil).addAndGet(Utils.currentThreadCpuTimeNanos - startNs) } diff --git a/core/src/main/scala/filodb.core/store/IngestionConfig.scala b/core/src/main/scala/filodb.core/store/IngestionConfig.scala index 1d530c133c..06802c329e 100644 --- a/core/src/main/scala/filodb.core/store/IngestionConfig.scala +++ b/core/src/main/scala/filodb.core/store/IngestionConfig.scala @@ -17,6 +17,7 @@ final case class StoreConfig(flushInterval: FiniteDuration, maxBlobBufferSize: Int, // Number of bytes to allocate to chunk storage in each shard shardMemSize: Long, + shardMemPercent: Double, maxBufferPoolSize: Int, groupsPerShard: Int, numPagesPerBlock: Int, @@ -45,6 +46,7 @@ final case class StoreConfig(flushInterval: FiniteDuration, "max-chunks-size" -> maxChunksSize, "max-blob-buffer-size" -> maxBlobBufferSize, "shard-mem-size" -> shardMemSize, + "shard-mem-percent" -> shardMemPercent, "max-buffer-pool-size" -> maxBufferPoolSize, "groups-per-shard" -> groupsPerShard, "max-chunk-time" -> (maxChunkTime.toSeconds + "s"), @@ -81,6 +83,7 @@ object StoreConfig { |max-blob-buffer-size = 15000 |max-buffer-pool-size = 10000 |groups-per-shard = 60 + |shard-mem-percent = 100 # assume only one dataset per node by default |num-block-pages = 100 |failure-retries = 3 |retry-delay = 15 seconds @@ -119,6 +122,7 @@ object StoreConfig { config.getInt("max-chunks-size"), config.getInt("max-blob-buffer-size"), config.getMemorySize("shard-mem-size").toBytes, + config.getDouble("shard-mem-percent"), config.getInt("max-buffer-pool-size"), config.getInt("groups-per-shard"), config.getInt("num-block-pages"), diff --git a/core/src/test/resources/application_test.conf b/core/src/test/resources/application_test.conf index 52e14fbebf..dae84e3bdf 100644 --- a/core/src/test/resources/application_test.conf +++ b/core/src/test/resources/application_test.conf @@ -88,6 +88,15 @@ filodb { spark.dataset-ops-timeout = 15s memstore { + memory-alloc { + automatic-alloc-enabled = false + available-memory-bytes = 1GB + os-memory-needs = 500MB + lucene-memory-percent = 20 + native-memory-manager-percent = 20 + block-memory-manager-percent = 60 + } + flush-task-parallelism = 1 ensure-block-memory-headroom-percent = 5 ensure-tsp-count-headroom-percent = 5 @@ -97,6 +106,7 @@ filodb { track-queries-holding-eviction-lock = false index-faceting-enabled-shard-key-labels = true index-faceting-enabled-for-all-labels = true + } tasks { diff --git a/core/src/test/scala/filodb.core/memstore/DemandPagedChunkStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/DemandPagedChunkStoreSpec.scala index ad13ef95ac..defaf44269 100644 --- a/core/src/test/scala/filodb.core/memstore/DemandPagedChunkStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/DemandPagedChunkStoreSpec.scala @@ -26,7 +26,7 @@ class DemandPagedChunkStoreSpec extends AnyFunSpec with AsyncTest { |shard-mem-size = 200MB""".stripMargin) .withFallback(TestData.sourceConf.getConfig("store")) - memStore.setup(dataset1.ref, Schemas(schema1), 0, StoreConfig(sourceConf)) + memStore.setup(dataset1.ref, Schemas(schema1), 0, StoreConfig(sourceConf), 1) val onDemandPartMaker = memStore.getShardE(dataset1.ref, 0).partitionMaker after { diff --git a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala index 6eb71613b2..e776700b3f 100644 --- a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala @@ -282,7 +282,6 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte } - it("should add part keys and removePartKeys (without partIds) correctly for more than 1024 keys with del count") { // Identical to test // it("should add part keys and fetch partIdsEndedBefore and removePartKeys correctly for more than 1024 keys") @@ -345,7 +344,6 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte // it is no longer required to have partIds in the index on non unit test setup } - it("should update part keys with endtime and parse filters correctly") { val start = System.currentTimeMillis() // Add the first ten keys and row numbers @@ -447,6 +445,32 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte partNums2 shouldEqual debox.Buffer.empty[Int] } + it("should be able to convert pipe regex to TermInSetQuery") { + // Add the first ten keys and row numbers + partKeyFromRecords(dataset6, records(dataset6, readers.take(99)), Some(partBuilder)) + .zipWithIndex.foreach { case (addr, i) => + keyIndex.addPartKey(partKeyOnHeap(dataset6.partKeySchema, ZeroPointer, addr), i, System.currentTimeMillis())() + } + keyIndex.refreshReadersBlocking() + + val filters1 = Seq(ColumnFilter("Actor2Code", EqualsRegex("GOV|KHM|LAB|MED".utf8))) + val partNums1 = keyIndex.partIdsFromFilters(filters1, 0, Long.MaxValue) + partNums1 shouldEqual debox.Buffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 22, 23, 24, 25, 26, 28, 29, 73, 81, 90) + } + + it("should be able to convert prefix regex to PrefixQuery") { + // Add the first ten keys and row numbers + partKeyFromRecords(dataset6, records(dataset6, readers.take(99)), Some(partBuilder)) + .zipWithIndex.foreach { case (addr, i) => + keyIndex.addPartKey(partKeyOnHeap(dataset6.partKeySchema, ZeroPointer, addr), i, System.currentTimeMillis())() + } + keyIndex.refreshReadersBlocking() + + val filters1 = Seq(ColumnFilter("Actor2Name", EqualsRegex("C.*".utf8))) + val partNums1 = keyIndex.partIdsFromFilters(filters1, 0, Long.MaxValue) + partNums1 shouldEqual debox.Buffer(3, 12, 22, 23, 24, 31, 59, 60, 66, 69, 72, 78, 79, 80, 88, 89) + } + it("should ignore unsupported columns and return empty filter") { val index2 = new PartKeyLuceneIndex(dataset1.ref, dataset1.schema.partition, true, true, 0, 1.hour.toMillis) partKeyFromRecords(dataset1, records(dataset1, readers.take(10))).zipWithIndex.foreach { case (addr, i) => @@ -1008,4 +1032,61 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte // close CardinalityTracker to avoid leaking of resources cardTracker.close() } + + it("should match records without label when .* is provided on a non existent label") { + + val pkrs = partKeyFromRecords(dataset6, records(dataset6, readers.take(10)), Some(partBuilder)) + .zipWithIndex.map { case (addr, i) => + val pk = partKeyOnHeap(dataset6.schema.partKeySchema, ZeroPointer, addr) + keyIndex.addPartKey(pk, i, i, i + 10)() + PartKeyLuceneIndexRecord(pk, i, i + 10) + } + keyIndex.refreshReadersBlocking() + + + // Query with just the existing Label name + val filter1 = ColumnFilter("Actor2Code", Equals("GOV".utf8)) + val result1 = keyIndex.partKeyRecordsFromFilters(Seq(filter1), 0, Long.MaxValue) + val expected1 = Seq(pkrs(7), pkrs(8), pkrs(9)) + + result1.map(_.partKey.toSeq) shouldEqual expected1.map(_.partKey.toSeq) + result1.map(p => (p.startTime, p.endTime)) shouldEqual expected1.map(p => (p.startTime, p.endTime)) + + // Query with non existent label name with an empty regex + val filter2 = ColumnFilter("dummy", EqualsRegex(".*".utf8)) + val filter3 = ColumnFilter("Actor2Code", Equals("GOV".utf8)) + val result2 = keyIndex.partKeyRecordsFromFilters(Seq(filter2, filter3), 0, Long.MaxValue) + val expected2 = Seq(pkrs(7), pkrs(8), pkrs(9)) + + result2.map(_.partKey.toSeq) shouldEqual expected2.map(_.partKey.toSeq) + result2.map(p => (p.startTime, p.endTime)) shouldEqual expected2.map(p => (p.startTime, p.endTime)) + + // Query with non existent label name with an regex matching at least 1 character + val filter4 = ColumnFilter("dummy", EqualsRegex(".+".utf8)) + val filter5 = ColumnFilter("Actor2Code", Equals("GOV".utf8)) + val result3 = keyIndex.partKeyRecordsFromFilters(Seq(filter4, filter5), 0, Long.MaxValue) + result3 shouldEqual Seq() + + // Query with non existent label name with an empty regex + val filter6 = ColumnFilter("dummy", EqualsRegex("".utf8)) + val filter7 = ColumnFilter("Actor2Code", Equals("GOV".utf8)) + val result4 = keyIndex.partKeyRecordsFromFilters(Seq(filter6, filter7), 0, Long.MaxValue) + val expected4 = Seq(pkrs(7), pkrs(8), pkrs(9)) + result4.map(_.partKey.toSeq) shouldEqual expected4.map(_.partKey.toSeq) + result4.map(p => (p.startTime, p.endTime)) shouldEqual expected4.map(p => (p.startTime, p.endTime)) + + // Query with non existent label name with an empty equals + val filter8 = ColumnFilter("dummy", Equals("".utf8)) + val filter9 = ColumnFilter("Actor2Code", Equals("GOV".utf8)) + val result5 = keyIndex.partKeyRecordsFromFilters(Seq(filter8, filter9), 0, Long.MaxValue) + val expected5 = Seq(pkrs(7), pkrs(8), pkrs(9)) + result5.map(_.partKey.toSeq) shouldEqual expected5.map(_.partKey.toSeq) + result5.map(p => (p.startTime, p.endTime)) shouldEqual expected5.map(p => (p.startTime, p.endTime)) + + + val filter10 = ColumnFilter("Actor2Code", EqualsRegex(".*".utf8)) + val result10= keyIndex.partKeyRecordsFromFilters(Seq(filter10), 0, Long.MaxValue) + result10.map(_.partKey.toSeq) shouldEqual pkrs.map(_.partKey.toSeq) + result10.map(p => (p.startTime, p.endTime)) shouldEqual pkrs.map(p => (p.startTime, p.endTime)) + } } \ No newline at end of file diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreForMetadataSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreForMetadataSpec.scala index 8a3797557b..f1500e0c1e 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreForMetadataSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreForMetadataSpec.scala @@ -39,7 +39,7 @@ class TimeSeriesMemStoreForMetadataSpec extends AnyFunSpec with Matchers with Sc val container = createRecordContainer(0, 10) override def beforeAll(): Unit = { - memStore.setup(timeseriesDataset.ref, Schemas(timeseriesSchema), 0, TestData.storeConf) + memStore.setup(timeseriesDataset.ref, Schemas(timeseriesSchema), 0, TestData.storeConf, 1) memStore.ingest(timeseriesDataset.ref, 0, SomeData(container, 0)) memStore.refreshIndexForTesting(timeseriesDataset.ref) } diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala index b199600fbd..1654c81c51 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala @@ -46,9 +46,9 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte val schemas1 = Schemas(schema1) it("should detect duplicate setup") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) try { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) fail() } catch { case e: ShardAlreadySetup => { } // expected @@ -58,7 +58,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte // Look mama! Real-time time series ingestion and querying across multiple partitions! it("should ingest into multiple series and be able to query across all partitions in real time") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val rawData = multiSeriesData().take(20) val data = records(dataset1, rawData) // 2 records per series x 10 series memStore.ingest(dataset1.ref, 0, data) @@ -81,7 +81,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should ingest into multiple series and query across partitions") { - memStore.setup(dataset1.ref, schemas1, 1, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 1, TestData.storeConf, 1) val data = records(dataset1, linearMultiSeries().take(20)) // 2 records per series x 10 series memStore.ingest(dataset1.ref, 1, data) @@ -97,7 +97,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should ingest map/tags column as partition key and aggregate") { - memStore.setup(dataset2.ref, schemas2h, 0, TestData.storeConf) + memStore.setup(dataset2.ref, schemas2h, 0, TestData.storeConf, 1) val data = records(dataset2, withMap(linearMultiSeries().take(20))) // 2 records per series x 10 series memStore.ingest(dataset2.ref, 0, data) @@ -109,7 +109,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should ingest histograms and read them back properly") { - memStore.setup(histDataset.ref, schemas2h, 0, TestData.storeConf) + memStore.setup(histDataset.ref, schemas2h, 0, TestData.storeConf, 1) val data = linearHistSeries().take(40) memStore.ingest(histDataset.ref, 0, records(histDataset, data)) memStore.refreshIndexForTesting(histDataset.ref) @@ -137,7 +137,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte it("should ingest multiple schemas simultaneously into one shard") { val ref = dataset2.ref - memStore.setup(ref, schemas2h, 0, TestData.storeConf) + memStore.setup(ref, schemas2h, 0, TestData.storeConf, 1) val data = linearHistSeries().take(40) memStore.ingest(ref, 0, records(histDataset, data)) val data2 = records(dataset2, withMap(linearMultiSeries()).take(30)) // 3 records per series x 10 series @@ -151,14 +151,14 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should be able to handle nonexistent partition keys") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val q = memStore.scanRows(dataset1, Seq(1), SinglePartitionScan(Array[Byte]())) q.toBuffer.length should equal (0) } it("should ingest into multiple series and be able to query on one partition in real time") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val data = multiSeriesData().take(20) // 2 records per series x 10 series memStore.ingest(dataset1.ref, 0, records(dataset1, data)) @@ -174,7 +174,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should query on multiple partitions using filters") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val data = records(dataset1, linearMultiSeries().take(20)) // 2 records per series x 10 series memStore.ingest(dataset1.ref, 0, data) memStore.refreshIndexForTesting(dataset1.ref) @@ -186,8 +186,8 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should ingest into multiple shards, getScanSplits, query, get index info from shards") { - memStore.setup(dataset2.ref, schemas2h, 0, TestData.storeConf) - memStore.setup(dataset2.ref, schemas2h, 1, TestData.storeConf) + memStore.setup(dataset2.ref, schemas2h, 0, TestData.storeConf, 2) + memStore.setup(dataset2.ref, schemas2h, 1, TestData.storeConf, 2) val data = records(dataset2, withMap(linearMultiSeries()).take(20)) // 2 records per series x 10 series memStore.ingest(dataset2.ref, 0, data) val data2 = records(dataset2, withMap(linearMultiSeries(200000L, 6), 6).take(20)) // 5 series only @@ -214,7 +214,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should handle errors from ingestStream") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val errStream = Observable.fromIterable(groupedRecords(dataset1, linearMultiSeries())) .endWithError(new NumberFormatException) val fut = memStore.startIngestion(dataset1.ref, 0, errStream, s) @@ -224,7 +224,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should ingestStream and flush on interval") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val initChunksWritten = chunksetsWritten val stream = Observable.fromIterable(groupedRecords(dataset1, linearMultiSeries())) @@ -245,7 +245,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte it("should flush dirty part keys during start-ingestion, end-ingestion and re-ingestion") { memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf.copy(groupsPerShard = 2, diskTTLSeconds = 1.hour.toSeconds.toInt, - flushInterval = 10.minutes)) + flushInterval = 10.minutes), 1) Thread sleep 1000 val numPartKeysWritten = partKeysWritten val tsShard = memStore.asInstanceOf[TimeSeriesMemStore].getShard(dataset1.ref, 0).get @@ -302,6 +302,54 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte 10.until(20).foreach {i => tsShard.activelyIngesting(i) shouldEqual false} } + it("should configure memory automatically when enabled") { + val colStore = new NullColumnStore() + + val config1 = ConfigFactory.parseString( + """ + |min-num-nodes-in-cluster = 32 + |memstore { + | memory-alloc { + | automatic-alloc-enabled = true + | available-memory-bytes = 1GB + | lucene-memory-percent = 10 + | native-memory-manager-percent = 30 + | block-memory-manager-percent = 60 + | } + |} + |""".stripMargin).withFallback(config) + + val memStore1 = new TimeSeriesMemStore(config1, colStore, new InMemoryMetaStore()) + memStore1.setup(dataset1.ref, schemas1, 0, TestData.storeConf.copy(groupsPerShard = 2, + diskTTLSeconds = 1.hour.toSeconds.toInt, + flushInterval = 10.minutes, shardMemPercent = 40), 256) + + memStore1.ingestionMemory shouldEqual 300000000 // 1GB * 30% + val tsShard = memStore1.getShard(dataset1.ref, 0).get + tsShard.blockMemorySize shouldEqual 30000000 // 1GB * 60% (for block memory) * 40% (for dataset memory) / ceil(256/32) + + memStore1.shutdown() + + // Expand cluster by only changing min-num-nodes-in-cluster + // now each shard should get more memory since fewer shards per node + + val config2 = ConfigFactory.parseString( + """ + |min-num-nodes-in-cluster = 45 + |""".stripMargin).withFallback(config1) + + val memStore2 = new TimeSeriesMemStore(config2, colStore, new InMemoryMetaStore()) + memStore2.setup(dataset1.ref, schemas1, 0, TestData.storeConf.copy(groupsPerShard = 2, + diskTTLSeconds = 1.hour.toSeconds.toInt, + flushInterval = 10.minutes, shardMemPercent = 40), 256) + + memStore2.ingestionMemory shouldEqual 300000000 // 1GB * 30% + val tsShard2 = memStore2.getShard(dataset1.ref, 0).get + tsShard2.blockMemorySize shouldEqual 40000000 // 1GB * 60% (for block memory) * 40% (for dataset memory) / ceil(256/45) + + memStore2.shutdown() + } + it("should recover index data from col store correctly") { val partBuilder = new RecordBuilder(TestData.nativeMem) @@ -323,7 +371,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore()) memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf.copy(groupsPerShard = 2, diskTTLSeconds = 1.hour.toSeconds.toInt, - flushInterval = 10.minutes)) + flushInterval = 10.minutes), 1) Thread sleep 1000 val tsShard = memStore.asInstanceOf[TimeSeriesMemStore].getShard(dataset1.ref, 0).get @@ -346,7 +394,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should lookupPartitions and return correct PartLookupResult") { - memStore.setup(dataset2.ref, schemas2h, 0, TestData.storeConf) + memStore.setup(dataset2.ref, schemas2h, 0, TestData.storeConf, 1) val data = records(dataset2, withMap(linearMultiSeries().take(20))) // 2 records per series x 10 series memStore.ingest(dataset2.ref, 0, data) @@ -372,7 +420,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte // 0 -> 2L, 1 -> 4L, 2 -> 6L, 3 -> 8L // A whole bunch of records should be skipped. However, remember that each group of 5 records gets one offset. - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val initChunksWritten = chunksetsWritten val checkpoints = Map(0 -> 2L, 1 -> 21L, 2 -> 6L, 3 -> 8L) @@ -396,7 +444,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should recoverStream after timeout, returns endOffset to start normal ingestion") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val checkpoints = Map(0 -> 2L, 1 -> 21L, 2 -> 6L, 3 -> 8L) val stream = Observable.never // "never" to mimic no data in stream source @@ -410,7 +458,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should truncate shards properly") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val data = records(dataset1, multiSeriesData().take(20)) // 2 records per series x 10 series memStore.ingest(dataset1.ref, 0, data) @@ -437,7 +485,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte } it("should be able to evict partitions properly on ingestion and on ODP") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val shard = memStore.getShardE(dataset1.ref, 0) // Ingest normal multi series data with 10 partitions. Should have 10 partitions. @@ -526,7 +574,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte it("should assign same previously assigned partId using bloom filter when evicted series starts re-ingesting") { - memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + memStore.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val shard0 = memStore.getShardE(dataset1.ref, 0) // Ingest normal multi series data with 10 partitions. Should have 10 partitions. @@ -568,7 +616,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte try { // Ingest >250 partitions. Note how much memory is left after all the allocations - store2.setup(dataset1.ref, schemas1, 0, TestData.storeConf) + store2.setup(dataset1.ref, schemas1, 0, TestData.storeConf, 1) val shard = store2.getShardE(dataset1.ref, 0) // Ingest normal multi series data with 10 partitions. Should have 10 partitions. diff --git a/core/src/test/scala/filodb.core/query/SerializedRangeVectorSpec.scala b/core/src/test/scala/filodb.core/query/SerializedRangeVectorSpec.scala index 46915cc20b..7ab7aa8e6e 100644 --- a/core/src/test/scala/filodb.core/query/SerializedRangeVectorSpec.scala +++ b/core/src/test/scala/filodb.core/query/SerializedRangeVectorSpec.scala @@ -51,9 +51,9 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers { val queryStats = QueryStats() val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats) queryStats.getCpuNanosCounter(Nil).get() > 0 shouldEqual true - queryStats.getResultBytesCounter(Nil).get() shouldEqual 108 srv.numRows shouldEqual Some(11) srv.numRowsSerialized shouldEqual 4 + srv.estimateSerializedRowBytes shouldEqual 80 // 4 non nan records each of 20 bytes val res = srv.rows.map(r => (r.getLong(0), r.getDouble(1))).toList res.length shouldEqual 11 res.map(_._1) shouldEqual (0 to 1000 by 100) @@ -77,9 +77,9 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers { val queryStats = QueryStats() val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats) queryStats.getCpuNanosCounter(Nil).get() > 0 shouldEqual true - queryStats.getResultBytesCounter(Nil).get() shouldEqual 248 srv.numRows shouldEqual Some(11) srv.numRowsSerialized shouldEqual 11 + srv.estimateSerializedRowBytes shouldEqual 220 val res = srv.rows.map(r => (r.getLong(0), r.getDouble(1))).toList res.length shouldEqual 11 res.map(_._1) shouldEqual (0 to 1000 by 100) @@ -105,7 +105,6 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers { val queryStats = QueryStats() val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats) queryStats.getCpuNanosCounter(Nil).get() > 0 shouldEqual true - queryStats.getResultBytesCounter(Nil).get() shouldEqual 188 srv.numRows shouldEqual Some(11) srv.numRowsSerialized shouldEqual 4 val res = srv.rows.map(r => (r.getLong(0), r.getHistogram(1))).toList @@ -114,4 +113,26 @@ class SerializedRangeVectorSpec extends AnyFunSpec with Matchers { res.map(_._2).filterNot(_.isEmpty) shouldEqual Seq(h1, h1, h1, h1) } + it("should calculate estimateSerializedRowBytes correctly when builder is used for several SRVs") { + val builder = SerializedRangeVector.newBuilder() + val recSchema = new RecordSchema(Seq(ColumnInfo("time", ColumnType.TimestampColumn), + ColumnInfo("value", ColumnType.DoubleColumn))) + val keysMap = Map(UTF8Str("key1") -> UTF8Str("val1"), + UTF8Str("key2") -> UTF8Str("val2")) + val key = CustomRangeVectorKey(keysMap) + + (0 to 200).foreach { i => + val rv = toRv(Seq((0, Double.NaN), (100, 1.0), (200, Double.NaN), + (300, 3.0), (400, Double.NaN), + (500, 5.0), (600, 6.0), + (700, Double.NaN), (800, Double.NaN), + (900, Double.NaN), (1000, Double.NaN)), key, + RvRange(1000, 100, 1000)) + val queryStats = QueryStats() + val srv = SerializedRangeVector.apply(rv, builder, recSchema, "someExecPlan", queryStats) + srv.numRowsSerialized shouldEqual 11 + srv.estimateSerializedRowBytes shouldEqual 220 + } + } + } diff --git a/core/src/test/scala/filodb.core/store/ColumnStoreSpec.scala b/core/src/test/scala/filodb.core/store/ColumnStoreSpec.scala index 57a15afecf..8257cb8691 100644 --- a/core/src/test/scala/filodb.core/store/ColumnStoreSpec.scala +++ b/core/src/test/scala/filodb.core/store/ColumnStoreSpec.scala @@ -126,7 +126,7 @@ with BeforeAndAfter with BeforeAndAfterAll with ScalaFutures { // TODO: FilteredPartitionScan() for ColumnStores does not work without an index right now ignore should "filter rows written with single partition key" in { import GdeltTestData._ - memStore.setup(dataset2.ref, schemas, 0, TestData.storeConf) + memStore.setup(dataset2.ref, schemas, 0, TestData.storeConf, 1) val stream = Observable.now(records(dataset2)) // Force flush of all groups at end memStore.startIngestion(dataset2.ref, 0, stream, s, Task {}).futureValue @@ -143,7 +143,7 @@ with BeforeAndAfter with BeforeAndAfterAll with ScalaFutures { // "rangeVectors api" should "return Range Vectors for given filter and read all rows" in { ignore should "return Range Vectors for given filter and read all rows" in { import GdeltTestData._ - memStore.setup(dataset2.ref, schemas, 0, TestData.storeConf) + memStore.setup(dataset2.ref, schemas, 0, TestData.storeConf, 1) val stream = Observable.now(records(dataset2)) // Force flush of all groups at end memStore.startIngestion(dataset2.ref, 0, stream, s, Task {}).futureValue diff --git a/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala b/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala index e4eb963d14..1f5437952b 100644 --- a/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/HistogramIngestBenchmark.scala @@ -68,8 +68,8 @@ class HistogramIngestBenchmark { val policy = new FixedMaxPartitionsEvictionPolicy(1000) val memStore = new TimeSeriesMemStore(config, new NullColumnStore, new InMemoryMetaStore(), Some(policy)) val ingestConf = TestData.storeConf.copy(shardMemSize = 512 * 1024 * 1024, maxChunksSize = 100) - memStore.setup(histDataset.ref, Schemas(histDataset.schema), 0, ingestConf) - memStore.setup(promDataset.ref, Schemas(promDataset.schema), 0, ingestConf) + memStore.setup(histDataset.ref, Schemas(histDataset.schema), 0, ingestConf, 1) + memStore.setup(promDataset.ref, Schemas(promDataset.schema), 0, ingestConf, 1) val hShard = memStore.getShardE(histDataset.ref, 0) val pShard = memStore.getShardE(promDataset.ref, 0) diff --git a/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala index d87b2b7ca8..545027a85a 100644 --- a/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/HistogramQueryBenchmark.scala @@ -53,7 +53,7 @@ class HistogramQueryBenchmark { histSchemaData.take(10 * 180).foreach(histSchemaBuilder.addFromReader(_, histDataset.schema)) val histSchemas = Schemas(histDataset.schema) - memStore.setup(histDataset.ref, histSchemas, 0, ingestConf) + memStore.setup(histDataset.ref, histSchemas, 0, ingestConf, 1) val hShard = memStore.getShardE(histDataset.ref, 0) histSchemaBuilder.allContainers.foreach { c => hShard.ingest(c, 0) } memStore.refreshIndexForTesting(histDataset.ref) // commit lucene index @@ -66,7 +66,7 @@ class HistogramQueryBenchmark { val promBuilder = new RecordBuilder(MemFactory.onHeapFactory, 4200000) promData.take(10*66*180).foreach(promBuilder.addFromReader(_, promDataset.schema)) - memStore.setup(promDataset.ref, promSchemas, 0, ingestConf) + memStore.setup(promDataset.ref, promSchemas, 0, ingestConf, 1) val pShard = memStore.getShardE(promDataset.ref, 0) promBuilder.allContainers.foreach { c => pShard.ingest(c, 0) } memStore.refreshIndexForTesting(promDataset.ref) // commit lucene index diff --git a/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala b/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala index 286e607c5e..2d3c86b129 100644 --- a/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/IngestionBenchmark.scala @@ -61,7 +61,7 @@ class IngestionBenchmark { val policy = new FixedMaxPartitionsEvictionPolicy(100) val memStore = new TimeSeriesMemStore(config, new NullColumnStore, new InMemoryMetaStore(), Some(policy)) val ingestConf = TestData.storeConf.copy(shardMemSize = 512 * 1024 * 1024, maxChunksSize = 200) - memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, ingestConf) + memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, ingestConf, 1) val shard = memStore.getShardE(dataset1.ref, 0) diff --git a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala index 5fd16ab097..41b036bf0c 100644 --- a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala @@ -74,7 +74,7 @@ class PartKeyIndexBenchmark { partKeyIndex.partIdsFromFilters( Seq(ColumnFilter("_ns_", Filter.Equals(s"App-$i")), ColumnFilter("_ws_", Filter.Equals("demo")), - ColumnFilter("host", Filter.EqualsRegex("H0")), + ColumnFilter("host", Filter.Equals("H0")), ColumnFilter("_metric_", Filter.Equals("heap_usage0"))), now, now + 1000) @@ -90,7 +90,7 @@ class PartKeyIndexBenchmark { partKeyIndex.partIdsFromFilters( Seq(ColumnFilter("_ns_", Filter.Equals(s"App-${i + 200}")), ColumnFilter("_ws_", Filter.Equals("demo")), - ColumnFilter("host", Filter.EqualsRegex("H0")), + ColumnFilter("host", Filter.Equals("H0")), ColumnFilter("_metric_", Filter.Equals("heap_usage0"))), now, now + 1000) @@ -101,7 +101,7 @@ class PartKeyIndexBenchmark { @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) @OperationsPerInvocation(8) - def partIdsLookupWithSuffixRegexFilters(): Unit = { + def partIdsLookupWithPrefixRegexFilters(): Unit = { cforRange ( 0 until 8 ) { i => partKeyIndex.partIdsFromFilters( Seq(ColumnFilter("_ns_", Filter.Equals(s"App-$i")), @@ -117,7 +117,7 @@ class PartKeyIndexBenchmark { @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) @OperationsPerInvocation(8) - def partIdsLookupWithPrefixRegexFilters(): Unit = { + def partIdsLookupWithSuffixRegexFilters(): Unit = { cforRange ( 0 until 8 ) { i => partKeyIndex.partIdsFromFilters( Seq(ColumnFilter("_ns_", Filter.Equals(s"App-$i")), @@ -129,6 +129,25 @@ class PartKeyIndexBenchmark { } } + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(8) + def partIdsLookupWithEnumRegexFilter(): Unit = { + cforRange(0 until 8) { i => + val c = partKeyIndex.partIdsFromFilters( + Seq(ColumnFilter("_ns_", Filter.Equals(s"App-0")), + ColumnFilter("_ws_", Filter.Equals("demo")), + ColumnFilter("_metric_", Filter.Equals("heap_usage0")), + ColumnFilter("instance", + Filter.EqualsRegex("Instance-1|Instance-2|Instance-3|Instance-4|Instance-5|Instance-6|Instance-7|Instance-8|Instance-9|Instance-10|" + + "Instance-11|Instance-12|Instance-13|Instance-14|Instance-15|Instance-16|Instance-17|Instance-18|Instance-19|Instance-20|" + + "Instance-21|Instance-22|Instance-23|Instance-24|Instance-25|Instance-26|Instance-27|Instance-28|Instance-29|Instance-30"))), + now, + now + 1000).length + } + } + @Benchmark @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) diff --git a/kafka/src/main/scala/filodb/kafka/KafkaIngestionStream.scala b/kafka/src/main/scala/filodb/kafka/KafkaIngestionStream.scala index 66c5bb2ae9..07f9e0b409 100644 --- a/kafka/src/main/scala/filodb/kafka/KafkaIngestionStream.scala +++ b/kafka/src/main/scala/filodb/kafka/KafkaIngestionStream.scala @@ -59,6 +59,7 @@ class KafkaIngestionStream(config: Config, if (sourceConfig.LogConfig) logger.info(s"Consumer properties: $props") blocking { + props.put("client.id", s"${props.get("group.id")}.${System.getenv("INSTANCE_ID")}.$shard") val consumer = new KafkaConsumer(props) consumer.assign(List(topicPartition).asJava) offset.foreach { off => consumer.seek(topicPartition, off) } diff --git a/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala index e5db5d6e4a..f749ee236f 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/DoubleVector.scala @@ -242,7 +242,7 @@ object DoubleVectorDataReader64 extends DoubleVectorDataReader { val nextDbl = acc.getDouble(addr) // There are many possible values of NaN. Use a function to ignore them reliably. if (!java.lang.Double.isNaN(nextDbl)) { - if (sum.isNaN) sum = 0d + if (java.lang.Double.isNaN(sum)) sum = 0d sum += nextDbl } addr += 8 @@ -323,7 +323,7 @@ extends DoubleVectorDataReader { var last = Double.MinValue cforRange { 0 until len } { pos => var nextVal = it.next - if (nextVal.isNaN) nextVal = 0 // explicit counter reset due to end of time series marker + if (java.lang.Double.isNaN(nextVal)) nextVal = 0 // explicit counter reset due to end of time series marker if (nextVal < last) { // reset! _correction += last _drops += pos @@ -443,9 +443,9 @@ class DoubleCounterAppender(addr: BinaryRegion.NativePointer, maxBytes: Int, dis extends DoubleAppendingVector(addr, maxBytes, dispose) { private var last = Double.MinValue override final def addData(data: Double): AddResponse = { - if (data.isNaN || data < last) + if (java.lang.Double.isNaN(data) || data < last) PrimitiveVectorReader.markDrop(MemoryAccessor.nativePtrAccessor, addr) - if (!data.isNaN) + if (!java.lang.Double.isNaN(data)) last = data super.addData(data) } @@ -544,7 +544,7 @@ object DoubleLongWrapDataReader extends DoubleVectorDataReader { final def changes(acc: MemoryReader, vector: BinaryVectorPtr, start: Int, end: Int, prev: Double, ignorePrev: Boolean = false): (Double, Double) = { - val ignorePrev = if (prev.isNaN) true + val ignorePrev = if (java.lang.Double.isNaN(prev)) true else false val changes = LongBinaryVector(acc, vector).changes(acc, vector, start, end, prev.toLong, ignorePrev) (changes._1.toDouble, changes._2.toDouble) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index 4ebb711d6c..3e8bd77a8b 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -228,7 +228,7 @@ final case class MutableHistogram(buckets: HistogramBuckets, values: Array[Doubl // Allow addition when type of bucket is different if (buckets.similarForMath(other.buckets)) { // If it was NaN before, reset to 0 to sum another hist - if (values(0).isNaN) java.util.Arrays.fill(values, 0.0) + if (java.lang.Double.isNaN(values(0))) java.util.Arrays.fill(values, 0.0) cforRange { 0 until numBuckets } { b => values(b) += other.bucketValue(b) } @@ -272,7 +272,7 @@ final case class MutableHistogram(buckets: HistogramBuckets, values: Array[Doubl cforRange { 0 until values.size } { b => // When bucket no longer used NaN will be seen. Non-increasing values can be seen when // newer buckets are introduced and not all instances are updated with that bucket. - if (values(b) < max || values(b).isNaN) values(b) = max // assign previous max + if (values(b) < max || java.lang.Double.isNaN(values(b))) values(b) = max // assign previous max else if (values(b) > max) max = values(b) // update max } } diff --git a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala index 2a6f230e86..d9ca52558c 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala @@ -583,7 +583,6 @@ class SectDeltaHistogramReader(acc2: MemoryReader, histVect: Ptr.U8) throw EmptyHistogramException(s"""length = $length memory=${toHexString(acc, histVect.addr)}""") } - require(length > 0) val histPtr = locate(index) // Just return the base histogram if we are at start of section diff --git a/prometheus/src/main/scala/filodb/prometheus/ast/Expressions.scala b/prometheus/src/main/scala/filodb/prometheus/ast/Expressions.scala index 1d0e963f98..553b9162b9 100644 --- a/prometheus/src/main/scala/filodb/prometheus/ast/Expressions.scala +++ b/prometheus/src/main/scala/filodb/prometheus/ast/Expressions.scala @@ -3,8 +3,16 @@ package filodb.prometheus.ast import filodb.core.query.RangeParams import filodb.query._ -case class UnaryExpression(operator: Operator, operand: Expression) extends Expression { +case class UnaryExpression(operator: Operator, operand: Expression) extends Expression with PeriodicSeries { //TODO Need to pass an operator to a series + override def toSeriesPlan(timeParams: TimeRangeParams): PeriodicSeriesPlan = { + if (operator != Add && operator != Sub) { + throw new IllegalArgumentException(s"operator=$operator is not allowed in expression=$operand") + } + // use binary expression to implement the unary operators. + // eg. -foo is implemented through (0 - foo). + BinaryExpression(Scalar(0), operator, None, operand).toSeriesPlan(timeParams) + } } case class PrecedenceExpression(expression: Expression) extends Expression diff --git a/prometheus/src/test/scala/filodb/prometheus/parse/ParserSpec.scala b/prometheus/src/test/scala/filodb/prometheus/parse/ParserSpec.scala index f4e2e94700..f68edf7b4c 100644 --- a/prometheus/src/test/scala/filodb/prometheus/parse/ParserSpec.scala +++ b/prometheus/src/test/scala/filodb/prometheus/parse/ParserSpec.scala @@ -555,6 +555,30 @@ class ParserSpec extends AnyFunSpec with Matchers { ) } + it("parse expressions with unary operators") { + parseWithAntlr("-1", """ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988))""") + parseWithAntlr("-foo", """ScalarVectorBinaryOperation(SUB,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true)""") + parseWithAntlr("foo * -1","""BinaryJoin(PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),MUL,OneToOne,ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + parseWithAntlr("-1 * foo", """BinaryJoin(ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),MUL,OneToOne,PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),List(),List(),List())""") + parseWithAntlr("-1 * -foo", """BinaryJoin(ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),MUL,OneToOne,ScalarVectorBinaryOperation(SUB,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true),List(),List(),List())""") + parseWithAntlr("sum(foo) < -1", """BinaryJoin(Aggregate(Sum,PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),List(),None),LSS,OneToOne,ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + parseWithAntlr("-sum(foo) < -1", "BinaryJoin(ScalarVectorBinaryOperation(SUB,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),Aggregate(Sum,PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),List(),None),true),LSS,OneToOne,ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())") + parseWithAntlr("sum(-foo) < -1", """BinaryJoin(Aggregate(Sum,ScalarVectorBinaryOperation(SUB,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true),List(),None),LSS,OneToOne,ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + parseWithAntlr("-sum(-foo) < -1", """BinaryJoin(ScalarVectorBinaryOperation(SUB,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),Aggregate(Sum,ScalarVectorBinaryOperation(SUB,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true),List(),None),true),LSS,OneToOne,ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + + parseWithAntlr("+1", """ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988))""") + parseWithAntlr("+foo", """ScalarVectorBinaryOperation(ADD,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true)""") + parseWithAntlr("foo * +1", """BinaryJoin(PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),MUL,OneToOne,ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + parseWithAntlr("+1 * foo", """BinaryJoin(ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),MUL,OneToOne,PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),List(),List(),List())""") + parseWithAntlr("+1 * +foo", """BinaryJoin(ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),MUL,OneToOne,ScalarVectorBinaryOperation(ADD,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true),List(),List(),List())""") + parseWithAntlr("sum(foo) < +1", """BinaryJoin(Aggregate(Sum,PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),List(),None),LSS,OneToOne,ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + parseWithAntlr("+sum(foo) < +1", "BinaryJoin(ScalarVectorBinaryOperation(ADD,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),Aggregate(Sum,PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),List(),None),true),LSS,OneToOne,ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())") + parseWithAntlr("sum(+foo) < +1", """BinaryJoin(Aggregate(Sum,ScalarVectorBinaryOperation(ADD,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true),List(),None),LSS,OneToOne,ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + parseWithAntlr("+sum(+foo) < +1", """BinaryJoin(ScalarVectorBinaryOperation(ADD,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),Aggregate(Sum,ScalarVectorBinaryOperation(ADD,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(foo))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),true),List(),None),true),LSS,OneToOne,ScalarBinaryOperation(ADD,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),List(),List(),List())""") + + parseWithAntlr("+-1", """ScalarVectorBinaryOperation(ADD,ScalarFixedDoublePlan(0.0,RangeParams(1524855988,1000,1524855988)),ScalarBinaryOperation(SUB,Left(0.0),Left(1.0),RangeParams(1524855988,1000,1524855988)),true)""") + } + it("Should be able to make logical plans for Series Expressions") { val queryToLpString = Map( "http_requests_total + time()" -> "ScalarVectorBinaryOperation(ADD,ScalarTimeBasedPlan(Time,RangeParams(1524855988,1000,1524855988)),PeriodicSeries(RawSeries(IntervalSelector(1524855988000,1524855988000),List(ColumnFilter(__name__,Equals(http_requests_total))),List(),Some(300000),None),1524855988000,1000000,1524855988000,None),false)", diff --git a/query/src/main/scala/filodb/query/exec/ExecPlan.scala b/query/src/main/scala/filodb/query/exec/ExecPlan.scala index d1b889a6f6..5abf255e16 100644 --- a/query/src/main/scala/filodb/query/exec/ExecPlan.scala +++ b/query/src/main/scala/filodb/query/exec/ExecPlan.scala @@ -235,34 +235,34 @@ trait ExecPlan extends QueryCommand { @volatile var numResultSamples = 0 // BEWARE - do not modify concurrently!! @volatile var resultSize = 0L val queryResults = rv.doOnStart(_ => Task.eval(span.mark("before-first-materialized-result-rv"))) - .map { - case srvable: SerializableRangeVector => srvable - case rv: RangeVector => - // materialize, and limit rows per RV - val execPlanString = queryWithPlanName(queryContext) - val builder = SerializedRangeVector.newBuilder(maxRecordContainerSize(querySession.queryConfig)) - val srv = SerializedRangeVector(rv, builder, recordSchema, execPlanString, querySession.queryStats) - if (rv.outputRange.isEmpty) - qLogger.debug(s"Empty rangevector found. Rv class is: ${rv.getClass.getSimpleName}, " + - s"execPlan is: $execPlanString, execPlan children ${this.children}") + .bufferTumbling(querySession.queryConfig.numRvsPerResultMessage) + .map { f => + val builder = SerializedRangeVector.newBuilder(maxRecordContainerSize(querySession.queryConfig)) + val srvs = f.map { + case srvable: SerializableRangeVector => srvable + case rv: RangeVector => + val execPlanString = queryWithPlanName(queryContext) + SerializedRangeVector(rv, builder, recordSchema, execPlanString, querySession.queryStats) + } + .map { srv => + // fail the query instead of limiting range vectors and returning incomplete/inaccurate results + numResultSamples += srv.numRowsSerialized + checkSamplesLimit(numResultSamples, querySession.warnings) + val srvBytes = srv.estimatedSerializedBytes + resultSize += srvBytes + querySession.queryStats.getResultBytesCounter(Nil).addAndGet(srvBytes) + checkResultBytes(resultSize, querySession.queryConfig, querySession.warnings) srv + } + .filter(_.numRowsSerialized > 0) + StreamQueryResult(queryContext.queryId, planId, srvs) } - .map { srv => - // fail the query instead of limiting range vectors and returning incomplete/inaccurate results - numResultSamples += srv.numRowsSerialized - checkSamplesLimit(numResultSamples, querySession.warnings) - resultSize += srv.estimatedSerializedBytes - checkResultBytes(resultSize, querySession.queryConfig, querySession.warnings) - srv - } - .filter(_.numRowsSerialized > 0) - .bufferTumbling(querySession.queryConfig.numRvsPerResultMessage) - .map(StreamQueryResult(queryContext.queryId, planId, _)) .guarantee(Task.eval { Kamon.histogram("query-execute-time-elapsed-step2-result-materialized", MeasurementUnit.time.milliseconds) .withTag("plan", getClass.getSimpleName) .record(Math.max(0, System.currentTimeMillis - startExecute)) + SerializedRangeVector.queryResultBytes.record(resultSize) // recording and adding step1 to queryStats at the end of execution since the grouping // for stats is not formed yet at the beginning querySession.queryStats.getCpuNanosCounter(Nil).getAndAdd(step1CpuTime) @@ -439,7 +439,6 @@ trait ExecPlan extends QueryCommand { } } - def makeResult( rv : Observable[RangeVector], recordSchema: RecordSchema, resultSchema: ResultSchema ): Task[QueryResult] = { @@ -462,7 +461,9 @@ trait ExecPlan extends QueryCommand { // fail the query instead of limiting range vectors and returning incomplete/inaccurate results numResultSamples += srv.numRowsSerialized checkSamplesLimit(numResultSamples, querySession.warnings) - resultSize += srv.estimatedSerializedBytes + val srvBytes = srv.estimatedSerializedBytes + resultSize += srvBytes + querySession.queryStats.getResultBytesCounter(Nil).addAndGet(srvBytes) checkResultBytes(resultSize, querySession.queryConfig, querySession.warnings) srv } @@ -470,6 +471,7 @@ trait ExecPlan extends QueryCommand { .guarantee(Task.eval(span.mark("after-last-materialized-result-rv"))) .toListL .map { r => + SerializedRangeVector.queryResultBytes.record(resultSize) Kamon.histogram("query-execute-time-elapsed-step2-result-materialized", MeasurementUnit.time.milliseconds) .withTag("plan", getClass.getSimpleName) @@ -530,7 +532,9 @@ trait ExecPlan extends QueryCommand { } protected def queryWithPlanName(queryContext: QueryContext): String = { - s"${this.getClass.getSimpleName}-${queryContext.origQueryParams}" + // Disabling this since it showed up in local method profiles. Re-enable if needed for debugging + // s"${this.getClass.getSimpleName}-${queryContext.origQueryParams}" + s"${queryContext.queryId}:$planId" } def curNodeText(level: Int): String = diff --git a/query/src/main/scala/filodb/query/exec/PromQlRemoteExec.scala b/query/src/main/scala/filodb/query/exec/PromQlRemoteExec.scala index f6a3cb264d..9412813801 100644 --- a/query/src/main/scala/filodb/query/exec/PromQlRemoteExec.scala +++ b/query/src/main/scala/filodb/query/exec/PromQlRemoteExec.scala @@ -153,7 +153,7 @@ case class PromQlRemoteExec(queryEndpoint: String, } // dont add this size to queryStats since it was already added by callee use dummy QueryStats() SerializedRangeVector(rv, builder, recordSchema.get("default").get, - queryWithPlanName(queryContext), dummyQueryStats) + planId, dummyQueryStats) // TODO: Handle stitching with verbose flag } QueryResult( @@ -235,7 +235,7 @@ case class PromQlRemoteExec(queryEndpoint: String, } // dont add this size to queryStats since it was already added by callee use dummy QueryStats() SerializedRangeVector(rv, builder, recordSchema.get(Avg.entryName).get, - queryWithPlanName(queryContext), dummyQueryStats) + planId, dummyQueryStats) } // TODO: Handle stitching with verbose flag @@ -273,7 +273,7 @@ case class PromQlRemoteExec(queryEndpoint: String, } // dont add this size to queryStats since it was already added by callee use dummy QueryStats() SerializedRangeVector(rv, builder, recordSchema.get(QueryFunctionConstants.stdVal).get, - queryWithPlanName(queryContext), dummyQueryStats) + planId, dummyQueryStats) } // TODO: Handle stitching with verbose flag diff --git a/query/src/main/scala/filodb/query/exec/aggregator/CountValuesRowAggregator.scala b/query/src/main/scala/filodb/query/exec/aggregator/CountValuesRowAggregator.scala index eed6afdd51..22b84ffbed 100644 --- a/query/src/main/scala/filodb/query/exec/aggregator/CountValuesRowAggregator.scala +++ b/query/src/main/scala/filodb/query/exec/aggregator/CountValuesRowAggregator.scala @@ -2,6 +2,7 @@ package filodb.query.exec.aggregator import scala.collection.mutable +import filodb.core.Utils import filodb.core.binaryrecord2.RecordBuilder import filodb.core.memstore.FiloSchedulers import filodb.core.memstore.FiloSchedulers.QuerySchedName @@ -83,36 +84,42 @@ class CountValuesRowAggregator(label: String, limit: Int = 1000) extends RowAggr def present(aggRangeVector: RangeVector, limit: Int, rangeParams: RangeParams, queryStats: QueryStats): Seq[RangeVector] = { - val colSchema = Seq(ColumnInfo("timestamp", ColumnType.TimestampColumn), - ColumnInfo("value", ColumnType.DoubleColumn)) - val recSchema = SerializedRangeVector.toSchema(colSchema) - val resRvs = mutable.Map[RangeVectorKey, RecordBuilder]() + val startNs = Utils.currentThreadCpuTimeNanos try { - FiloSchedulers.assertThreadName(QuerySchedName) - // aggRangeVector.rows.take below triggers the ChunkInfoIterator which requires lock/release - ChunkMap.validateNoSharedLocks(s"CountValues-$label") - aggRangeVector.rows.take(limit).foreach { row => - val rowMap = CountValuesSerDeser.deserialize(row.getBlobBase(1), - row.getBlobNumBytes(1), row.getBlobOffset(1)) - rowMap.foreach { (k, v) => - val rvk = CustomRangeVectorKey(aggRangeVector.key.labelValues + - (label.utf8 -> k.toString.utf8)) - val builder = resRvs.getOrElseUpdate(rvk, SerializedRangeVector.newBuilder()) - builder.startNewRecord(recSchema) - builder.addLong(row.getLong(0)) - builder.addDouble(v) - builder.endRecord() + val colSchema = Seq(ColumnInfo("timestamp", ColumnType.TimestampColumn), + ColumnInfo("value", ColumnType.DoubleColumn)) + val recSchema = SerializedRangeVector.toSchema(colSchema) + val resRvs = mutable.Map[RangeVectorKey, RecordBuilder]() + try { + FiloSchedulers.assertThreadName(QuerySchedName) + // aggRangeVector.rows.take below triggers the ChunkInfoIterator which requires lock/release + ChunkMap.validateNoSharedLocks(s"CountValues-$label") + aggRangeVector.rows.take(limit).foreach { row => + val rowMap = CountValuesSerDeser.deserialize(row.getBlobBase(1), + row.getBlobNumBytes(1), row.getBlobOffset(1)) + rowMap.foreach { (k, v) => + val rvk = CustomRangeVectorKey(aggRangeVector.key.labelValues + + (label.utf8 -> k.toString.utf8)) + val builder = resRvs.getOrElseUpdate(rvk, SerializedRangeVector.newBuilder()) + builder.startNewRecord(recSchema) + builder.addLong(row.getLong(0)) + builder.addDouble(v) + builder.endRecord() + } } + } finally { + aggRangeVector.rows.close() + ChunkMap.releaseAllSharedLocks() } + resRvs.map { case (key, builder) => + val numRows = builder.allContainers.map(_.countRecords()).sum + val srv = new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0, None) + queryStats.getResultBytesCounter(Nil).getAndAdd(srv.estimatedSerializedBytes) + srv + }.toSeq + } finally { + queryStats.getCpuNanosCounter(Nil).getAndAdd(Utils.currentThreadCpuTimeNanos - startNs) } - finally { - aggRangeVector.rows.close() - ChunkMap.releaseAllSharedLocks() - } - resRvs.map { case (key, builder) => - val numRows = builder.allContainers.map(_.countRecords()).sum - new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0, None) - }.toSeq } def reductionSchema(source: ResultSchema): ResultSchema = { diff --git a/query/src/main/scala/filodb/query/exec/aggregator/SumRowAggregator.scala b/query/src/main/scala/filodb/query/exec/aggregator/SumRowAggregator.scala index 638d01e949..a482b69664 100644 --- a/query/src/main/scala/filodb/query/exec/aggregator/SumRowAggregator.scala +++ b/query/src/main/scala/filodb/query/exec/aggregator/SumRowAggregator.scala @@ -21,8 +21,8 @@ object SumRowAggregator extends RowAggregator { def map(rvk: RangeVectorKey, item: RowReader, mapInto: MutableRowReader): RowReader = item def reduceAggregate(acc: SumHolder, aggRes: RowReader): SumHolder = { acc.timestamp = aggRes.getLong(0) - if (!aggRes.getDouble(1).isNaN) { - if (acc.sum.isNaN) acc.sum = 0 + if (!java.lang.Double.isNaN(aggRes.getDouble(1))) { + if (java.lang.Double.isNaN(acc.sum)) acc.sum = 0 acc.sum += aggRes.getDouble(1) } acc diff --git a/query/src/main/scala/filodb/query/exec/aggregator/TopBottomKRowAggregator.scala b/query/src/main/scala/filodb/query/exec/aggregator/TopBottomKRowAggregator.scala index de4479e8d1..726692fafc 100644 --- a/query/src/main/scala/filodb/query/exec/aggregator/TopBottomKRowAggregator.scala +++ b/query/src/main/scala/filodb/query/exec/aggregator/TopBottomKRowAggregator.scala @@ -158,12 +158,10 @@ class TopBottomKRowAggregator(k: Int, bottomK: Boolean) extends RowAggregator wi resRvs.map { case (key, builder) => val numRows = builder.allContainers.map(_.countRecords()).sum logger.debug(s"TopkPresent before creating SRV key = ${key.labelValues.mkString(",")}") - val srv = new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0, + new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0, Some(RvRange(rangeParams.startSecs * 1000, rangeParams.stepSecs * 1000, rangeParams.endSecs * 1000))) - queryStats.getResultBytesCounter(Nil).getAndAdd(srv.estimatedSerializedBytes) - srv }.toSeq } finally { queryStats.getCpuNanosCounter(Nil).getAndAdd(Utils.currentThreadCpuTimeNanos - startNs) diff --git a/query/src/test/scala/filodb/query/exec/InProcessPlanDispatcherSpec.scala b/query/src/test/scala/filodb/query/exec/InProcessPlanDispatcherSpec.scala index ca6cceb872..607e94d8c3 100644 --- a/query/src/test/scala/filodb/query/exec/InProcessPlanDispatcherSpec.scala +++ b/query/src/test/scala/filodb/query/exec/InProcessPlanDispatcherSpec.scala @@ -36,13 +36,13 @@ class InProcessPlanDispatcherSpec extends AnyFunSpec import filodb.core.{MachineMetricsData => MMD} override def beforeAll(): Unit = { - memStore.setup(timeseriesDataset.ref, Schemas(timeseriesSchema), 0, TestData.storeConf) + memStore.setup(timeseriesDataset.ref, Schemas(timeseriesSchema), 0, TestData.storeConf, 1) memStore.ingest(timeseriesDataset.ref, 0, SomeData(container, 0)) - memStore.setup(MMD.dataset1.ref, Schemas(MMD.schema1), 0, TestData.storeConf) + memStore.setup(MMD.dataset1.ref, Schemas(MMD.schema1), 0, TestData.storeConf, 1) memStore.ingest(MMD.dataset1.ref, 0, mmdSomeData) - memStore.setup(MMD.histDataset.ref, Schemas(MMD.histDataset.schema), 0, TestData.storeConf) + memStore.setup(MMD.histDataset.ref, Schemas(MMD.histDataset.schema), 0, TestData.storeConf, 1) memStore.ingest(MMD.histDataset.ref, 0, MMD.records(MMD.histDataset, histData)) - memStore.setup(MMD.histMaxDS.ref, Schemas(MMD.histMaxDS.schema), 0, TestData.storeConf) + memStore.setup(MMD.histMaxDS.ref, Schemas(MMD.histMaxDS.schema), 0, TestData.storeConf, 1) memStore.ingest(MMD.histMaxDS.ref, 0, MMD.records(MMD.histMaxDS, histMaxData)) memStore.refreshIndexForTesting(timeseriesDataset.ref) memStore.refreshIndexForTesting(MMD.dataset1.ref) diff --git a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala index 1f19b21fd8..84da458bb7 100644 --- a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala @@ -86,7 +86,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B tuples.map { t => SeqRowReader(Seq(t._1, t._2, metric, partTagsUTF8)) } .foreach(builder.addFromReader(_, Schemas.promCounter)) } - memStore.setup(timeseriesDatasetMultipleShardKeys.ref, Schemas(Schemas.promCounter), ishard, TestData.storeConf) + memStore.setup(timeseriesDatasetMultipleShardKeys.ref, Schemas(Schemas.promCounter), ishard, TestData.storeConf, 1) memStore.ingest(timeseriesDatasetMultipleShardKeys.ref, ishard, SomeData(builder.allContainers.head, 0)) builder.reset() diff --git a/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala b/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala index 7e9ba0d567..092c309583 100644 --- a/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MultiSchemaPartitionsExecSpec.scala @@ -88,16 +88,16 @@ class MultiSchemaPartitionsExecSpec extends AnyFunSpec with Matchers with ScalaF implicit val execTimeout = 5.seconds override def beforeAll(): Unit = { - memStore.setup(dsRef, schemas, 0, TestData.storeConf) + memStore.setup(dsRef, schemas, 0, TestData.storeConf, 2) memStore.ingest(dsRef, 0, SomeData(container, 0)) memStore.ingest(dsRef, 0, MMD.records(MMD.histDataset, histData)) // set up shard, but do not ingest data to simulate an empty shard - memStore.setup(dsRef, schemas, 1, TestData.storeConf) + memStore.setup(dsRef, schemas, 1, TestData.storeConf, 2) - memStore.setup(MMD.dataset1.ref, Schemas(MMD.schema1), 0, TestData.storeConf) + memStore.setup(MMD.dataset1.ref, Schemas(MMD.schema1), 0, TestData.storeConf, 1) memStore.ingest(MMD.dataset1.ref, 0, mmdSomeData) - memStore.setup(MMD.histMaxDS.ref, Schemas(MMD.histMaxDS.schema), 0, TestData.storeConf) + memStore.setup(MMD.histMaxDS.ref, Schemas(MMD.histMaxDS.schema), 0, TestData.storeConf, 1) memStore.ingest(MMD.histMaxDS.ref, 0, MMD.records(MMD.histMaxDS, histMaxData)) memStore.refreshIndexForTesting(dsRef) @@ -138,6 +138,7 @@ class MultiSchemaPartitionsExecSpec extends AnyFunSpec with Matchers with ScalaF val execPlan = MultiSchemaPartitionsExec(QueryContext(), dummyDispatcher, dsRef, 0, filters, TimeRangeChunkScan(startTime, endTime), "_metric_") + querySession.queryStats.clear() // so this can be run as a standalone test val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue val result = resp.asInstanceOf[QueryResult] result.result.size shouldEqual 1 @@ -145,6 +146,10 @@ class MultiSchemaPartitionsExecSpec extends AnyFunSpec with Matchers with ScalaF dataRead shouldEqual tuples.take(11) val partKeyRead = result.result(0).key.labelValues.map(lv => (lv._1.asNewString, lv._2.asNewString)) partKeyRead shouldEqual partKeyKVWithMetric + querySession.queryStats.getResultBytesCounter().get() shouldEqual 297 + querySession.queryStats.getCpuNanosCounter().get() > 0 shouldEqual true + querySession.queryStats.getDataBytesScannedCounter().get() shouldEqual 48 + querySession.queryStats.getTimeSeriesScannedCounter().get() shouldEqual 1 } it("should get empty schema if query returns no results") { diff --git a/query/src/test/scala/filodb/query/exec/PromQLGrpcRemoteExecSpec.scala b/query/src/test/scala/filodb/query/exec/PromQLGrpcRemoteExecSpec.scala index 217cae5fd0..5ecf7fea2b 100644 --- a/query/src/test/scala/filodb/query/exec/PromQLGrpcRemoteExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/PromQLGrpcRemoteExecSpec.scala @@ -157,7 +157,7 @@ class PromQLGrpcRemoteExecSpec extends AnyFunSpec with Matchers with ScalaFuture deserializedSrv.numRowsSerialized shouldEqual 4 val res = deserializedSrv.rows.map(r => (r.getLong(0), r.getDouble(1))).toList deserializedSrv.key shouldEqual rvKey - qr.queryStats.getResultBytesCounter(List()).get()shouldEqual 108 + // queryStats ResultBytes counter increment is not done as part of SRV constructor, so skipping that assertion (qr.queryStats.getCpuNanosCounter(List()).get() > 0) shouldEqual true res.length shouldEqual 11 res.map(_._1) shouldEqual (0 to 1000 by 100) diff --git a/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala index 18d92cdda1..a13bbb5291 100644 --- a/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/RemoteMetadataExecSpec.scala @@ -85,7 +85,7 @@ class RemoteMetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures tuples.map { t => SeqRowReader(Seq(t._1, t._2, metric, partTagsUTF8)) } .foreach(builder.addFromReader(_, Schemas.promCounter)) } - memStore.setup(timeseriesDatasetMultipleShardKeys.ref, Schemas(Schemas.promCounter), ishard, TestData.storeConf) + memStore.setup(timeseriesDatasetMultipleShardKeys.ref, Schemas(Schemas.promCounter), ishard, TestData.storeConf, 1) memStore.ingest(timeseriesDatasetMultipleShardKeys.ref, ishard, SomeData(builder.allContainers.head, 0)) } diff --git a/query/src/test/scala/filodb/query/exec/SplitLocalPartitionDistConcatExecSpec.scala b/query/src/test/scala/filodb/query/exec/SplitLocalPartitionDistConcatExecSpec.scala index 652bbc43ed..aab1bc1697 100644 --- a/query/src/test/scala/filodb/query/exec/SplitLocalPartitionDistConcatExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/SplitLocalPartitionDistConcatExecSpec.scala @@ -86,10 +86,10 @@ class SplitLocalPartitionDistConcatExecSpec extends AnyFunSpec with Matchers wit implicit val execTimeout = 5.seconds override def beforeAll(): Unit = { - memStore.setup(dsRef, schemas, 0, TestData.storeConf) + memStore.setup(dsRef, schemas, 0, TestData.storeConf, 1) memStore.ingest(dsRef, 0, SomeData(container, 0)) - memStore.setup(MMD.dataset1.ref, Schemas(MMD.schema1), 0, TestData.storeConf) + memStore.setup(MMD.dataset1.ref, Schemas(MMD.schema1), 0, TestData.storeConf, 1) memStore.ingest(MMD.dataset1.ref, 0, mmdSomeData) memStore.refreshIndexForTesting(dsRef) diff --git a/run_benchmarks.sh b/run_benchmarks.sh index d368e26d59..0572870994 100755 --- a/run_benchmarks.sh +++ b/run_benchmarks.sh @@ -1,7 +1,6 @@ #!/bin/bash -sbt "jmh/jmh:run -rf json -i 15 -wi 10 -f3 -jvmArgsAppend -XX:MaxInlineLevel=20 \ +sbt "jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -XX:MaxInlineLevel=20 \ -jvmArgsAppend -Xmx4g -jvmArgsAppend -XX:MaxInlineSize=99 \ - -prof jfr:dir=/tmp/filo-jmh \ filodb.jmh.QueryHiCardInMemoryBenchmark \ filodb.jmh.QueryInMemoryBenchmark \ filodb.jmh.QueryAndIngestBenchmark \ diff --git a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala index d66d0eec41..6fa2640d82 100644 --- a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala +++ b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala @@ -1412,7 +1412,7 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl settings.filodbConfig) downsampleTSStore.setup(batchDownsampler.rawDatasetRef, settings.filodbSettings.schemas, - 0, rawDataStoreConfig, settings.rawDatasetIngestionConfig.downsampleConfig) + 0, rawDataStoreConfig, 1, settings.rawDatasetIngestionConfig.downsampleConfig) downsampleTSStore.recoverIndex(batchDownsampler.rawDatasetRef, 0).futureValue @@ -1459,7 +1459,7 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl ) downsampleTSStore.setup(batchDownsampler.rawDatasetRef, settings.filodbSettings.schemas, - 0, rawDataStoreConfig, durableIndexSettings.rawDatasetIngestionConfig.downsampleConfig) + 0, rawDataStoreConfig, 1, durableIndexSettings.rawDatasetIngestionConfig.downsampleConfig) val recoveredRecords = downsampleTSStore.recoverIndex(batchDownsampler.rawDatasetRef, 0).futureValue recoveredRecords shouldBe 5 @@ -1474,7 +1474,7 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl ) downsampleTSStore2.setup(batchDownsampler.rawDatasetRef, settings.filodbSettings.schemas, - 0, rawDataStoreConfig, durableIndexSettings.rawDatasetIngestionConfig.downsampleConfig) + 0, rawDataStoreConfig, 1, durableIndexSettings.rawDatasetIngestionConfig.downsampleConfig) val recoveredRecords2 = downsampleTSStore2.recoverIndex(batchDownsampler.rawDatasetRef, 0).futureValue recoveredRecords2 shouldBe 0 @@ -1509,7 +1509,7 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl settings.filodbConfig) downsampleTSStore.setup(batchDownsampler.rawDatasetRef, settings.filodbSettings.schemas, - 0, rawDataStoreConfig, settings.rawDatasetIngestionConfig.downsampleConfig) + 0, rawDataStoreConfig, 1, settings.rawDatasetIngestionConfig.downsampleConfig) downsampleTSStore.recoverIndex(batchDownsampler.rawDatasetRef, 0).futureValue @@ -1540,7 +1540,7 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl settings.filodbConfig) downsampleTSStore.setup(batchDownsampler.rawDatasetRef, settings.filodbSettings.schemas, - 0, rawDataStoreConfig, settings.rawDatasetIngestionConfig.downsampleConfig) + 0, rawDataStoreConfig, 1, settings.rawDatasetIngestionConfig.downsampleConfig) downsampleTSStore.recoverIndex(batchDownsampler.rawDatasetRef, 0).futureValue @@ -1572,7 +1572,7 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl settings.filodbConfig) downsampleTSStore.setup(batchDownsampler.rawDatasetRef, settings.filodbSettings.schemas, - 0, rawDataStoreConfig, settings.rawDatasetIngestionConfig.downsampleConfig) + 0, rawDataStoreConfig, 1, settings.rawDatasetIngestionConfig.downsampleConfig) downsampleTSStore.recoverIndex(batchDownsampler.rawDatasetRef, 0).futureValue @@ -1602,7 +1602,7 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl val downsampleTSStore = new DownsampledTimeSeriesStore(downsampleColStore, rawColStore, settings.filodbConfig) downsampleTSStore.setup(batchDownsampler.rawDatasetRef, settings.filodbSettings.schemas, - 0, rawDataStoreConfig, settings.rawDatasetIngestionConfig.downsampleConfig) + 0, rawDataStoreConfig, 1, settings.rawDatasetIngestionConfig.downsampleConfig) downsampleTSStore.recoverIndex(batchDownsampler.rawDatasetRef, 0).futureValue val colFilters = seriesTags.map { case (t, v) => ColumnFilter(t.toString, Equals(v.toString)) }.toSeq val queryFilters = colFilters :+ ColumnFilter("_metric_", Equals(gaugeName))