From 40386df256454e93d6e9674c5433ff526f119d0e Mon Sep 17 00:00:00 2001 From: Sandeep Agarwalla Date: Mon, 9 Oct 2023 12:12:29 +0530 Subject: [PATCH] Fixes for ClusterV2 --- .../filodb.coordinator/FilodbSettings.scala | 2 ++ .../filodb.coordinator/ShardHealthStats.scala | 4 ++-- .../coordinator/v2/FiloDbClusterDiscovery.scala | 16 +++++++++++++--- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala b/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala index 4b411b61ac..b7595880e0 100755 --- a/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala +++ b/coordinator/src/main/scala/filodb.coordinator/FilodbSettings.scala @@ -56,6 +56,8 @@ final class FilodbSettings(val conf: Config) { lazy val minNumNodes = config.as[Option[Int]]("min-num-nodes-in-cluster") + lazy val akkaPort = allConfig.as[Option[Int]]("akka.remote.netty.tcp.port") + /** * Returns IngestionConfig/dataset configuration from parsing dataset-configs file paths. * If those are empty, then parse the "streams" config key for inline configs. diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardHealthStats.scala b/coordinator/src/main/scala/filodb.coordinator/ShardHealthStats.scala index 9f10697cbc..3d55f39925 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardHealthStats.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardHealthStats.scala @@ -33,8 +33,8 @@ class ShardHealthStats(ref: DatasetRef, def update(mapper: ShardMapper, skipUnassigned: Boolean = false): Unit = { numActive.update(mapper.statuses.count(_ == ShardStatusActive)) numRecovering.update(mapper.statuses.count(_.isInstanceOf[ShardStatusRecovery])) - numUnassigned.update(mapper.statuses.count(_ == ShardStatusUnassigned)) - if (!skipUnassigned) numAssigned.update(mapper.statuses.count(_ == ShardStatusAssigned)) + numAssigned.update(mapper.statuses.count(_ == ShardStatusAssigned)) + if (!skipUnassigned) numUnassigned.update(mapper.statuses.count(_ == ShardStatusUnassigned)) numError.update(mapper.statuses.count(_ == ShardStatusError)) numStopped.update(mapper.statuses.count(_ == ShardStatusStopped)) numDown.update(mapper.statuses.count(_ == ShardStatusDown)) diff --git a/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala b/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala index 9db3f19776..30b0cc088d 100644 --- a/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala +++ b/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala @@ -71,12 +71,22 @@ class FiloDbClusterDiscovery(settings: FilodbSettings, def shardsForLocalhost(numShards: Int): Seq[Int] = shardsForOrdinal(ordinalOfLocalhost, numShards) lazy private val hostNames = { - require(settings.minNumNodes.isDefined, "Minimum Number of Nodes config not provided") + require(settings.minNumNodes.isDefined, "[ClusterV2] Minimum Number of Nodes config not provided") if (settings.k8sHostFormat.isDefined) { - (0 until settings.minNumNodes.get).map(i => String.format(settings.k8sHostFormat.get, i.toString)) + // This is used in kubernetes setup. We read the host format config and resolve the IP address from each host + val hosts = (0 until settings.minNumNodes.get) + .map(i => { + val resolvedIp = InetAddress.getByName(String.format(settings.k8sHostFormat.get, i.toString)) + // return resolvedIpAddress and port combination for akka communication + s"${resolvedIp.getHostAddress()}:${settings.akkaPort}" + }) + logger.info(s"[ClusterV2] hosts to communicate: " + hosts) + hosts.sorted } else if (settings.hostList.isDefined) { + // All the required hosts are provided manually in the config. usually used for local runs/setup + logger.info(s"[ClusterV2] hosts to communicate: ${settings.hostList.get}") settings.hostList.get.sorted // sort to make order consistent on all nodes of cluster - } else throw new IllegalArgumentException("Cluster Discovery mechanism not defined") + } else throw new IllegalArgumentException("[ClusterV2] Cluster Discovery mechanism not defined") } lazy private val nodeCoordActorSelections = {