Skip to content

Commit

Permalink
Merge branch 'integration'
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Sep 27, 2023
2 parents 6a4ed51 + b5a2e40 commit 4e97f53
Show file tree
Hide file tree
Showing 60 changed files with 824 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
}

it("should flush MemStore data to C*, and be able to read back data from C* directly") {
memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf)
memStore.setup(dataset1.ref, Schemas(dataset1.schema), 0, TestData.storeConf, 1)
memStore.store.sinkStats.chunksetsWritten.get shouldEqual 0

// Flush every ~50 records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -112,7 +112,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -134,7 +134,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand All @@ -157,7 +157,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf, 1)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

Expand Down
2 changes: 1 addition & 1 deletion conf/promperf-filodb-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ filodb {
"conf/promperf-source.conf"
]

min-num-nodes-in-cluster = 1
cluster-discovery {
num-nodes = 1
failure-detection-interval = 20s
host-list = [
"127.0.0.1:2552"
Expand Down
3 changes: 3 additions & 0 deletions conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
# Should not change once dataset has been set up on the server and data has been persisted to cassandra
num-shards = 4

# deprecated in favor of min-num-nodes-in-cluster config in filodb server config
# To be removed eventually. There is no reason to set a value for this for each dataset
min-num-nodes = 2

# Length of chunks to be written, roughly
sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory"

Expand Down
2 changes: 1 addition & 1 deletion conf/timeseries-filodb-server.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
dataset-prometheus = { include required("timeseries-dev-source.conf") }

filodb {
min-num-nodes-in-cluster = 2
v2-cluster-enabled = false
cluster-discovery {
num-nodes = 2
failure-detection-interval = 20s
host-list = [
"127.0.0.1:2552",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ final class FilodbSettings(val conf: Config) {

lazy val datasetConfPaths = config.as[Seq[String]]("dataset-configs")

lazy val numNodes = config.getInt("cluster-discovery.num-nodes")
lazy val k8sHostFormat = config.as[Option[String]]("cluster-discovery.k8s-stateful-sets-hostname-format")

// used for development mode only
lazy val hostList = config.as[Option[Seq[String]]]("cluster-discovery.host-list")
lazy val localhostOrdinal = config.as[Option[Int]]("cluster-discovery.localhost-ordinal")

lazy val minNumNodes = config.as[Option[Int]]("min-num-nodes-in-cluster")

/**
* Returns IngestionConfig/dataset configuration from parsing dataset-configs file paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ object IngestionActor {
source: NodeClusterActor.IngestionSource,
downsample: DownsampleConfig,
storeConfig: StoreConfig,
numShards: Int,
statusActor: ActorRef): Props =
Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, statusActor))
Props(new IngestionActor(ref, schemas, memStore, source, downsample, storeConfig, numShards, statusActor))
}

/**
Expand All @@ -62,6 +63,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,
source: NodeClusterActor.IngestionSource,
downsample: DownsampleConfig,
storeConfig: StoreConfig,
numShards: Int,
statusActor: ActorRef) extends BaseActor {

import IngestionActor._
Expand Down Expand Up @@ -170,7 +172,7 @@ private[filodb] final class IngestionActor(ref: DatasetRef,

// scalastyle:off method.length
private def startIngestion(shard: Int): Unit = {
try tsStore.setup(ref, schemas, shard, storeConfig, downsample) catch {
try tsStore.setup(ref, schemas, shard, storeConfig, numShards, downsample) catch {
case ShardAlreadySetup(ds, s) =>
logger.warn(s"dataset=$ds shard=$s already setup, skipping....")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,

setupDataset( dataset,
ingestConfig.storeConfig,
ingestConfig.numShards,
IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig),
ingestConfig.downsampleConfig)
}
Expand All @@ -141,6 +142,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
*/
private def setupDataset(dataset: Dataset,
storeConf: StoreConfig,
numShards: Int,
source: IngestionSource,
downsample: DownsampleConfig,
schemaOverride: Boolean = false): Unit = {
Expand All @@ -154,7 +156,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas
if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!")
val props = IngestionActor.props(dataset.ref, schemas, memStore,
source, downsample, storeConf, statusActor.get)
source, downsample, storeConf, numShards, statusActor.get)
val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}")
context.watch(ingester)
ingesters(ref) = ingester
Expand Down Expand Up @@ -187,7 +189,9 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
def ingestHandlers: Receive = LoggingReceive {
case SetupDataset(dataset, resources, source, storeConf, downsample) =>
// used only in unit tests
if (!(ingesters contains dataset.ref)) { setupDataset(dataset, storeConf, source, downsample, true) }
if (!(ingesters contains dataset.ref)) {
setupDataset(dataset, storeConf, resources.numShards, source, downsample, true)
}

case IngestRows(dataset, shard, rows) =>
withIngester(sender(), dataset) { _ ! IngestionActor.IngestRows(sender(), shard, rows) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
ackTo.foreach(_ ! DatasetExists(dataset.ref))
Map.empty
case None =>
val resources = DatasetResourceSpec(ingestConfig.numShards, ingestConfig.minNumNodes)
val minNumNodes = settings.minNumNodes.getOrElse(ingestConfig.minNumNodes)
val resources = DatasetResourceSpec(ingestConfig.numShards, minNumNodes)
val mapper = new ShardMapper(resources.numShards)
_shardMappers(dataset.ref) = mapper
// Access the shardmapper through the HashMap so even if it gets replaced it will update the shard stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ object LogicalPlanUtils extends StrictLogging {
*/
def hasDescendantAggregate(lp: LogicalPlan): Boolean = lp match {
case _: Aggregate => true
// consider this BinaryJoin example foo + on(h) + bar.
// partition1 has foo{h=1}, bar{h1=2} and partition2 has foo{h=2}, bar{h1=1}
// the binary join cannot happen on a partition locally. InProcessPlanDispatcher is required.
case _: BinaryJoin => true
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregate(_))
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
}

def shardsForOrdinal(ordinal: Int, numShards: Int): Seq[Int] = {
require(ordinal < settings.numNodes, s"Ordinal $ordinal was not expected. Number of nodes is ${settings.numNodes}")
val numShardsPerHost = numShards / settings.numNodes
require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided")
require(ordinal < settings.minNumNodes.get, s"Ordinal $ordinal was not expected. " +
s"Number of nodes is ${settings.minNumNodes.get}")
val numShardsPerHost = numShards / settings.minNumNodes.get
// Suppose we have a total of 8 shards and 2 hosts, assuming the hostnames are host-0 and host-1, we will map
// host-0 to shard [0,1,2,3] and host-1 to shard [4,5,6,7]
val numExtraShardsToAssign = numShards % settings.numNodes
val numExtraShardsToAssign = numShards % settings.minNumNodes.get
val (firstShardThisNode, numShardsThisHost) = if (numExtraShardsToAssign != 0) {
logger.warn("For stateful shard assignment, numShards should be a multiple of nodes per shard, " +
"using default strategy")
Expand All @@ -69,8 +71,9 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
def shardsForLocalhost(numShards: Int): Seq[Int] = shardsForOrdinal(ordinalOfLocalhost, numShards)

lazy private val hostNames = {
require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided")
if (settings.k8sHostFormat.isDefined) {
(0 until settings.numNodes).map(i => String.format(settings.k8sHostFormat.get, i.toString))
(0 until settings.minNumNodes.get).map(i => String.format(settings.k8sHostFormat.get, i.toString))
} else if (settings.hostList.isDefined) {
settings.hostList.get.sorted // sort to make order consistent on all nodes of cluster
} else throw new IllegalArgumentException("Cluster Discovery mechanism not defined")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
.foreach { downsampleDataset => memStore.store.initialize(downsampleDataset, ingestConfig.numShards) }

setupDataset( dataset,
ingestConfig.storeConfig,
ingestConfig.storeConfig, ingestConfig.numShards,
IngestionSource(ingestConfig.streamFactoryClass, ingestConfig.sourceConfig),
ingestConfig.downsampleConfig)
initShards(dataset, ingestConfig)
Expand Down Expand Up @@ -121,6 +121,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
*/
private def setupDataset(dataset: Dataset,
storeConf: StoreConfig,
numShards: Int,
source: IngestionSource,
downsample: DownsampleConfig,
schemaOverride: Boolean = false): Unit = {
Expand All @@ -132,7 +133,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
val schemas = if (schemaOverride) Schemas(dataset.schema) else settings.schemas
if (schemaOverride) logger.info(s"Overriding schemas from settings: this better be a test!")
val props = IngestionActor.props(dataset.ref, schemas, memStore,
source, downsample, storeConf, self)
source, downsample, storeConf, numShards, self)
val ingester = context.actorOf(props, s"$Ingestion-${dataset.name}")
context.watch(ingester)
ingestionActors(ref) = ingester
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec {

val config = ConfigFactory.parseString(
"""
|filodb.cluster-discovery.num-nodes = 4
|filodb.min-num-nodes-in-cluster = 4
|""".stripMargin)

val settings = new FilodbSettings(config)
Expand All @@ -29,7 +29,7 @@ class FiloDbClusterDiscoverySpec extends AkkaSpec {
"Should allocate the extra n shards to first n nodes" in {
val config = ConfigFactory.parseString(
"""
|filodb.cluster-discovery.num-nodes = 5
|filodb.min-num-nodes-in-cluster = 5
|""".stripMargin)

val settings = new FilodbSettings(config)
Expand Down
Loading

0 comments on commit 4e97f53

Please sign in to comment.