Skip to content

Commit

Permalink
Fixes for ClusterV2
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 committed Oct 9, 2023
1 parent bcc124b commit 40386df
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ final class FilodbSettings(val conf: Config) {

lazy val minNumNodes = config.as[Option[Int]]("min-num-nodes-in-cluster")

lazy val akkaPort = allConfig.as[Option[Int]]("akka.remote.netty.tcp.port")

/**
* Returns IngestionConfig/dataset configuration from parsing dataset-configs file paths.
* If those are empty, then parse the "streams" config key for inline configs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class ShardHealthStats(ref: DatasetRef,
def update(mapper: ShardMapper, skipUnassigned: Boolean = false): Unit = {
numActive.update(mapper.statuses.count(_ == ShardStatusActive))
numRecovering.update(mapper.statuses.count(_.isInstanceOf[ShardStatusRecovery]))
numUnassigned.update(mapper.statuses.count(_ == ShardStatusUnassigned))
if (!skipUnassigned) numAssigned.update(mapper.statuses.count(_ == ShardStatusAssigned))
numAssigned.update(mapper.statuses.count(_ == ShardStatusAssigned))
if (!skipUnassigned) numUnassigned.update(mapper.statuses.count(_ == ShardStatusUnassigned))
numError.update(mapper.statuses.count(_ == ShardStatusError))
numStopped.update(mapper.statuses.count(_ == ShardStatusStopped))
numDown.update(mapper.statuses.count(_ == ShardStatusDown))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,22 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
def shardsForLocalhost(numShards: Int): Seq[Int] = shardsForOrdinal(ordinalOfLocalhost, numShards)

lazy private val hostNames = {
require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided")
require(settings.minNumNodes.isDefined, "[ClusterV2] Minimum Number of Nodes config not provided")
if (settings.k8sHostFormat.isDefined) {
(0 until settings.minNumNodes.get).map(i => String.format(settings.k8sHostFormat.get, i.toString))
// This is used in kubernetes setup. We read the host format config and resolve the IP address from each host
val hosts = (0 until settings.minNumNodes.get)
.map(i => {
val resolvedIp = InetAddress.getByName(String.format(settings.k8sHostFormat.get, i.toString))
// return resolvedIpAddress and port combination for akka communication
s"${resolvedIp.getHostAddress()}:${settings.akkaPort}"
})
logger.info(s"[ClusterV2] hosts to communicate: " + hosts)
hosts.sorted
} else if (settings.hostList.isDefined) {
// All the required hosts are provided manually in the config. usually used for local runs/setup
logger.info(s"[ClusterV2] hosts to communicate: ${settings.hostList.get}")
settings.hostList.get.sorted // sort to make order consistent on all nodes of cluster
} else throw new IllegalArgumentException("Cluster Discovery mechanism not defined")
} else throw new IllegalArgumentException("[ClusterV2] Cluster Discovery mechanism not defined")
}

lazy private val nodeCoordActorSelections = {
Expand Down

0 comments on commit 40386df

Please sign in to comment.