Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch 'develop' into integration #1673

Merged
merged 29 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7679af3
Revert "maint(core): upgrade lucene to 9.7.0 (#1617)" (#1622)
alextheimer Jul 11, 2023
80245ce
feat(core): Simplify configuration to scale Filodb horizontally (#1610)
vishramachandran Jul 12, 2023
90303aa
filodb(core) add debugging info for empty histogram. (#1613)
yu-shipit Jul 13, 2023
5b05779
fix(core) make the error message more frendly to users. (#1593)
yu-shipit Jul 13, 2023
a93666d
Revert "filodb(core) add debugging info for empty histogram. (#1613)"…
yu-shipit Jul 13, 2023
a37bf5f
Adding logging statement when warning is produced. (#1625)
kvpetrov Jul 14, 2023
8ecf630
perf(query): Remove boxed Double allocations from NaN checks during d…
vishramachandran Jul 17, 2023
6ac0255
fix nullpointer happened in cardinality busting job. (#1631)
yu-shipit Jul 18, 2023
b9ea680
filodb(core) add debugging info for empty histogram. (#1624)
yu-shipit Jul 18, 2023
eebd5f4
fix(query): prevent list.head on empty list (#1632)
alextheimer Jul 20, 2023
6c1693a
maint(kafka): update consumer client id (#1633)
alextheimer Jul 24, 2023
59cae2a
fix(core): Consolidate num-nodes duplicate config (#1635)
vishramachandran Jul 24, 2023
ea1644b
Fix memory alloc config (#1638)
vishramachandran Jul 24, 2023
955814e
fix(query) Regex equals .* must ignore the label and match series eve…
amolnayak311 Jul 28, 2023
f5018ae
fix(core) fix the binary join aggregation across different partitions…
yu-shipit Jul 31, 2023
84a185f
feat(query): Cardinality V2 API Query Plan changes (#1637)
sandeep6189 Aug 2, 2023
c7e26a9
fix(query) Fix regression with regex match (#1640)
amolnayak311 Aug 4, 2023
dd59325
fix(query) support unary operators(+/-) (#1642)
yu-shipit Aug 4, 2023
d84c6c8
fix(core): Bug in calculating size of SerializedRangeVector (#1643)
vishramachandran Aug 9, 2023
38c682c
perf(core): ~Two times throughput improvement for Lucene queries with…
vishramachandran Aug 9, 2023
594ffce
fix(query): Adding user datasets for Cardinality V2 RemoteMetadataExe…
sandeep6189 Aug 17, 2023
89bd678
Fix MultiPartition Card Queries (#1652)
sandeep6189 Aug 18, 2023
89095e2
feat(core): Add Query CPU Time for Index Lookups (#1655)
vishramachandran Aug 21, 2023
1e56ef9
fix(metering): Overriding the cluster name .passed to SingleClusterPl…
sandeep6189 Aug 23, 2023
710c3d2
misc(core): add downsample support for aggregated data (#1661)
sherali42 Aug 25, 2023
6cb5433
maint(core): upgrade to Lucene 9.7.0 (#1662)
alextheimer Aug 30, 2023
7adc382
bug(query): Streaming query execution allocated too much mem via RB (…
vishramachandran Sep 1, 2023
bf8ead0
perf(card): Adding config support for DS card flushCount and perf log…
sandeep6189 Sep 11, 2023
f7d60ac
Merge branch 'develop' into integration
Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
}

it("should flush MemStore data to C*, and be able to read back data from C* directly") {
memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf)
memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf, 1)
memStore.store.sinkStats.chunksetsWritten.get shouldEqual 0

// Flush every ~50 records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -112,7 +112,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -134,7 +134,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -157,7 +157,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand Down
2 changes: 1 addition & 1 deletion conf/promperf-filodb-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ filodb {
"conf/promperf-source.conf"
]

min-num-nodes-in-cluster = 1
cluster-discovery {
num-nodes = 1
failure-detection-interval = 20s
host-list = [
"127.0.0.1:2552"
Expand Down
3 changes: 3 additions & 0 deletions conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
# Should not change once dataset has been set up on the server and data has been persisted to cassandra
num-shards = 4

# deprecated in favor of min-num-nodes-in-cluster config in filodb server config
# To be removed eventually. There is no reason to set a value for this for each dataset
min-num-nodes = 2

# Length of chunks to be written, roughly
sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory"

Expand Down
2 changes: 1 addition & 1 deletion conf/timeseries-filodb-server.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
dataset-prometheus = { include required("timeseries-dev-source.conf") }

filodb {
min-num-nodes-in-cluster = 2
v2-cluster-enabled = false
cluster-discovery {
num-nodes = 2
failure-detection-interval = 20s
host-list = [
"127.0.0.1:2552",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ final class FilodbSettings(val conf: Config) {

lazy val datasetConfPaths = config.as[Seq[String]]("dataset-configs")

lazy val numNodes = config.getInt("cluster-discovery.num-nodes")
lazy val k8sHostFormat = config.as[Option[String]]("cluster-discovery.k8s-stateful-sets-hostname-format")

// used for development mode only
lazy val hostList = config.as[Option[Seq[String]]]("cluster-discovery.host-list")
lazy val localhostOrdinal = config.as[Option[Int]]("cluster-discovery.localhost-ordinal")

lazy val minNumNodes = config.as[Option[Int]]("min-num-nodes-in-cluster")

/**
* Returns IngestionConfig/dataset configuration from parsing dataset-configs file paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ object IngestionActor {
source: NodeClusterActor.IngestionSource,
downsample: DownsampleConfig,
storeConfig: StoreConfig,
numShards: Int,
statusActor: ActorRef): Props =
Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, statusActor))
Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, numShards, statusActor))
}

/**
Expand All @@ -62,6 +63,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
source: NodeClusterActor.IngestionSource,
downsample: DownsampleConfig,
storeConfig: StoreConfig,
numShards: Int,
statusActor: ActorRef) extends BaseActor {

import IngestionActor._
Expand Down Expand Up @@ -170,7 +172,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,

// scalastyle:off method.length
private def startIngestion(shard: Int): Unit = {
try tsStore.setup(ref, schemas, shard, storeConfig, downsample) catch {
try tsStore.setup(ref, schemas, shard, storeConfig, numShards, downsample) catch {
case ShardAlreadySetup(ds, s) =>
logger.warn(s"dataset=$ds shard=$s already setup, skipping....")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,

setupDataset( dataset,
ingestConfig.storeConfig,
ingestConfig.numShards,
IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig),
ingestConfig.downsampleConfig)
}
Expand All @@ -141,6 +142,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
*/
private def setupDataset(dataset: Dataset,
storeConf: StoreConfig,
numShards: Int,
source: IngestionSource,
downsample: DownsampleConfig,
schemaOverride: Boolean = false): Unit = {
Expand All @@ -154,7 +156,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas
if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!")
val props = IngestionActor.props(dataset.ref, schemas, memStore,
source, downsample, storeConf, statusActor.get)
source, downsample, storeConf, numShards, statusActor.get)
val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}")
context.watch(ingester)
ingesters(ref) = ingester
Expand Down Expand Up @@ -187,7 +189,9 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
def ingestHandlers: Receive = LoggingReceive {
case SetupDataset(dataset, resources, source, storeConf, downsample) =>
// used only in unit tests
if (!(ingesters contains dataset.ref)) { setupDataset(dataset, storeConf, source, downsample, true) }
if (!(ingesters contains dataset.ref)) {
setupDataset(dataset, storeConf, resources.numShards, source, downsample, true)
}

case IngestRows(dataset, shard, rows) =>
withIngester(sender(), dataset) { _ ! IngestionActor.IngestRows(sender(), shard, rows) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
ackTo.foreach(_ ! DatasetExists(dataset.ref))
Map.empty
case None =>
val resources = DatasetResourceSpec(ingestConfig.numShards, ingestConfig.minNumNodes)
val minNumNodes = settings.minNumNodes.getOrElse(ingestConfig.minNumNodes)
val resources = DatasetResourceSpec(ingestConfig.numShards, minNumNodes)
val mapper = new ShardMapper(resources.numShards)
_shardMappers(dataset.ref) = mapper
// Access the shardmapper through the HashMap so even if it gets replaced it will update the shard stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
}

def shardsForOrdinal(ordinal: Int, numShards: Int): Seq[Int] = {
require(ordinal < settings.numNodes, s"Ordinal $ordinal was not expected. Number of nodes is ${settings.numNodes}")
val numShardsPerHost = numShards / settings.numNodes
require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided")
require(ordinal < settings.minNumNodes.get, s"Ordinal $ordinal was not expected. " +
s"Number of nodes is ${settings.minNumNodes.get}")
val numShardsPerHost = numShards / settings.minNumNodes.get
// Suppose we have a total of 8 shards and 2 hosts, assuming the hostnames are host-0 and host-1, we will map
// host-0 to shard [0,1,2,3] and host-1 to shard [4,5,6,7]
val numExtraShardsToAssign = numShards % settings.numNodes
val numExtraShardsToAssign = numShards % settings.minNumNodes.get
val (firstShardThisNode, numShardsThisHost) = if (numExtraShardsToAssign != 0) {
logger.warn("For stateful shard assignment, numShards should be a multiple of nodes per shard, " +
"using default strategy")
Expand All @@ -69,8 +71,9 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
def shardsForLocalhost(numShards: Int): Seq[Int] = shardsForOrdinal(ordinalOfLocalhost, numShards)

lazy private val hostNames = {
require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided")
if (settings.k8sHostFormat.isDefined) {
(0 until settings.numNodes).map(i => String.format(settings.k8sHostFormat.get, i.toString))
(0 until settings.minNumNodes.get).map(i => String.format(settings.k8sHostFormat.get, i.toString))
} else if (settings.hostList.isDefined) {
settings.hostList.get.sorted // sort to make order consistent on all nodes of cluster
} else throw new IllegalArgumentException("Cluster Discovery mechanism not defined")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
.foreach { downsampleDataset => memStore.store.initialize(downsampleDataset, ingestConfig.numShards) }

setupDataset( dataset,
ingestConfig.storeConfig,
ingestConfig.storeConfig, ingestConfig.numShards,
IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig),
ingestConfig.downsampleConfig)
initShards(dataset, ingestConfig)
Expand Down Expand Up @@ -121,6 +121,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
*/
private def setupDataset(dataset: Dataset,
storeConf: StoreConfig,
numShards: Int,
source: IngestionSource,
downsample: DownsampleConfig,
schemaOverride: Boolean = false): Unit = {
Expand All @@ -132,7 +133,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas
if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!")
val props = IngestionActor.props(dataset.ref, schemas, memStore,
source, downsample, storeConf, self)
source, downsample, storeConf, numShards, self)
val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}")
context.watch(ingester)
ingestionActors(ref) = ingester
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec {

val config = ConfigFactory.parseString(
"""
|filodb.cluster-discovery.num-nodes = 4
|filodb.min-num-nodes-in-cluster = 4
|""".stripMargin)

val settings = new FilodbSettings(config)
Expand All @@ -29,7 +29,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec {
"Should allocate the extra n shards to first n nodes" in {
val config = ConfigFactory.parseString(
"""
|filodb.cluster-discovery.num-nodes = 5
|filodb.min-num-nodes-in-cluster = 5
|""".stripMargin)

val settings = new FilodbSettings(config)
Expand Down
42 changes: 41 additions & 1 deletion core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
filodb {
v2-cluster-enabled = false

# Number of nodes in cluster; used to calculate per-shard resources based
# on how many shards assigned to node.
# Required config if v2-clustering or automatic memory-alloc is enabled
# min-num-nodes-in-cluster = 2

cluster-discovery {
// set this to a smaller value (like 30s) at thee query entry points
// if FiloDB HTTP API is indeed the query entry point, this should be overridden to small value
// so that failure detection is quick.
failure-detection-interval = 15 minutes
num-nodes = 2

# one of the two below properties should be enabled
# enable this to use the k8s stateful sets mode and its hostname to extract ordinal
Expand Down Expand Up @@ -691,6 +696,41 @@ filodb {
# Note: this memory is shared across all configued datasets on a node.
ingestion-buffer-mem-size = 200MB

memory-alloc {
# automatic memory allocation is enabled if true
automatic-alloc-enabled = false

# if not provided this is calculated as ContainerOrNodeMemory - os-memory-needs - CurrentJVMHeapMemory
# available-memory-bytes = 5GB

# memory dedicated for proper functioning of OS
os-memory-needs = 500MB

# NOTE: In the three configs below,
# lucene-memory-percent + native-memory-manager-percent + block-memory-manager-percent
# should equal 100
##############################################################
# # # #
# LuceneMemPercent # NativeMemPercent # BlockMemPercent #
# # # #
##############################################################

# Memory percent of available-memory reserved for Lucene memory maps.
# Note we do not use this config to explicitly allocate space for lucene.
# But reserving this space ensures that more of the lucene memory maps are stored in memory
lucene-memory-percent = 5

# memory percent of available-memory reserved for native memory manager
# (used for partKeys, chunkMaps, chunkInfos, writeBuffers)
native-memory-manager-percent = 24

# Memory percent of available-memory reserved for block memory manager
# (used for storing chunks)
# This is divvied amongst datasets on the node per configuration for dataset
# The shards of the dataset on the node get even amount of memory from this fraction
block-memory-manager-percent = 71
}

# At the cost of some extra heap memory, we can track queries holding shared lock for a long time
# and starving the exclusive access of lock for eviction
track-queries-holding-eviction-lock = true
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/filodb.core/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filodb.core

import java.lang.management.ManagementFactory

import com.typesafe.config.{Config, ConfigRenderOptions}
import com.typesafe.scalalogging.StrictLogging

object Utils extends StrictLogging {
Expand All @@ -13,4 +14,28 @@ object Utils extends StrictLogging {
if (cpuTimeEnabled) threadMbean.getCurrentThreadCpuTime
else System.nanoTime()
}

def calculateAvailableOffHeapMemory(filodbConfig: Config): Long = {
val containerMemory = ManagementFactory.getOperatingSystemMXBean()
.asInstanceOf[com.sun.management.OperatingSystemMXBean].getTotalPhysicalMemorySize()
val currentJavaHeapMemory = Runtime.getRuntime().maxMemory()
val osMemoryNeeds = filodbConfig.getMemorySize("memstore.memory-alloc.os-memory-needs").toBytes
logger.info(s"Detected available memory containerMemory=$containerMemory" +
s" currentJavaHeapMemory=$currentJavaHeapMemory osMemoryNeeds=$osMemoryNeeds")

logger.info(s"Memory Alloc Options: " +
s"${filodbConfig.getConfig("memstore.memory-alloc").root().render(ConfigRenderOptions.concise())}")

val availableMem = if (filodbConfig.hasPath("memstore.memory-alloc.available-memory-bytes")) {
val avail = filodbConfig.getMemorySize("memstore.memory-alloc.available-memory-bytes").toBytes
logger.info(s"Using automatic-memory-config using overridden memory-alloc.available-memory $avail")
avail
} else {
logger.info(s"Using automatic-memory-config using without available memory override")
containerMemory - currentJavaHeapMemory - osMemoryNeeds
}
logger.info(s"Available memory calculated or configured as $availableMem")
availableMem
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.reflect.ClassTag
import debox.Buffer

import filodb.memory.{BinaryRegionConsumer, BinaryRegionLarge}
import filodb.memory.format.{RowReader, UnsafeUtils}
import filodb.memory.format.UnsafeUtils

/**
* A RecordContainer is a binary, wire-compatible container for BinaryRecords V2.
Expand Down Expand Up @@ -72,13 +72,13 @@ final class RecordContainer(val base: Any, val offset: Long, maxLength: Int,
* Iterates through each BinaryRecord as a RowReader. Results in two allocations: the Iterator
* as well as a BinaryRecordRowReader.
*/
final def iterate(schema: RecordSchema): Iterator[RowReader] = new Iterator[RowReader] {
final def iterate(schema: RecordSchema): Iterator[BinaryRecordRowReader] = new Iterator[BinaryRecordRowReader] {
val reader = new BinaryRecordRowReader(schema, base)
val endOffset = offset + 4 + numBytes
var curOffset = offset + ContainerHeaderLen

final def hasNext: Boolean = curOffset < endOffset
final def next: RowReader = {
final def next: BinaryRecordRowReader = {
val recordLen = BinaryRegionLarge.numBytes(base, curOffset)
reader.recordOffset = curOffset
curOffset += (recordLen + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,12 @@ trait BinaryRecordRowReaderBase extends RowReader {

final class BinaryRecordRowReader(val schema: RecordSchema,
var recordBase: Any = UnsafeUtils.ZeroPointer,
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase {
def recordLength: Int = {
val len = BinaryRegionLarge.numBytes(recordBase, recordOffset)
(len + 7) & ~3 // +4, then aligned/rounded up to next 4 bytes
}
}

final class MultiSchemaBRRowReader(var recordBase: Any = UnsafeUtils.ZeroPointer,
var recordOffset: Long = 0L) extends BinaryRecordRowReaderBase {
Expand Down
Loading
Loading