diff --git a/coordinator/src/main/scala/filodb.coordinator/ActorPlanDispatcher.scala b/coordinator/src/main/scala/filodb.coordinator/ActorPlanDispatcher.scala index 203d3781c1..3c19211382 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ActorPlanDispatcher.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ActorPlanDispatcher.scala @@ -40,7 +40,7 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl // Query Planner sets target as null when shard is down if (target == ActorRef.noSender) { Task.eval({ - qLogger.warn(s"Creating partial result as shard is not available") + qLogger.warn(s"Target Actor is ActorRef.noSender ! Creating partial result as shard is not available") emptyPartialResult }) } else { diff --git a/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala b/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala index 7dea746868..247057e103 100644 --- a/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala +++ b/coordinator/src/main/scala/filodb/coordinator/v2/FiloDbClusterDiscovery.scala @@ -3,16 +3,16 @@ package filodb.coordinator.v2 import java.net.InetAddress import java.util.concurrent.ConcurrentHashMap -import scala.collection.mutable -import scala.concurrent.Future -import scala.concurrent.duration.{DurationInt, FiniteDuration} - import akka.actor.{ActorRef, ActorSystem} import akka.pattern.ask import akka.util.Timeout import com.typesafe.scalalogging.StrictLogging +import kamon.Kamon import monix.execution.{CancelableFuture, Scheduler} import monix.reactive.Observable +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration.{DurationInt, FiniteDuration} import filodb.coordinator.{CurrentShardSnapshot, FilodbSettings, ShardMapper} import filodb.core.DatasetRef @@ -24,6 +24,14 @@ class FiloDbClusterDiscovery(settings: FilodbSettings, private val discoveryJobs = mutable.Map[DatasetRef, CancelableFuture[Unit]]() + + // Metric to track actor resolve failures + val actorResolvedFailedCounter = Kamon.counter("actor-resolve-failed") + // Metric to track cluster discovery runs + val clusterDiscoveryCounter = Kamon.counter("filodb-cluster-discovery") + // Metric to track if we have unassigned shards on a given pod + val unassignedShardsGauge = Kamon.gauge("v2-unassigned-shards") + lazy val ordinalOfLocalhost: Int = { if (settings.localhostOrdinal.isDefined) settings.localhostOrdinal.get else { @@ -99,7 +107,7 @@ class FiloDbClusterDiscovery(settings: FilodbSettings, val empty = CurrentShardSnapshot(ref, new ShardMapper(numShards)) def fut = (nca ? GetShardMapScatter(ref)) (t).asInstanceOf[Future[CurrentShardSnapshot]] Observable.fromFuture(fut).onErrorHandle { e => - logger.error(s"Saw exception on askShardSnapshot: $e") + logger.error(s"[ClusterV2] Saw exception on askShardSnapshot!", e) empty } } @@ -110,7 +118,18 @@ class FiloDbClusterDiscovery(settings: FilodbSettings, val acc = new ShardMapper(numShards) val snapshots = for { nca <- Observable.fromIteratorUnsafe(nodeCoordActorSelections.iterator) - ncaRef <- Observable.fromFuture(nca.resolveOne(settings.ResolveActorTimeout).recover{case _=> ActorRef.noSender}) + ncaRef <- Observable.fromFuture(nca.resolveOne(settings.ResolveActorTimeout) + .recover { + // recovering with ActorRef.noSender + case e => + // log the exception we got while trying to resolve and emit metric + logger.error(s"[ClusterV2] Actor Resolve Failed ! actor: ${nca.toString()}", e) + actorResolvedFailedCounter + .withTag("dataset", ref.dataset) + .withTag("actor", nca.toString()) + .increment() + ActorRef.noSender + }) if ncaRef != ActorRef.noSender snapshot <- askShardSnapshot(ncaRef, ref, numShards, timeout) } yield { @@ -122,13 +141,20 @@ class FiloDbClusterDiscovery(settings: FilodbSettings, private val datasetToMapper = new ConcurrentHashMap[DatasetRef, ShardMapper]() def registerDatasetForDiscovery(dataset: DatasetRef, numShards: Int): Unit = { require(failureDetectionInterval.toMillis > 5000, "failure detection interval should be > 5s") - logger.info(s"Starting discovery pipeline for $dataset") + logger.info(s"[ClusterV2] Starting discovery pipeline for $dataset") datasetToMapper.put(dataset, new ShardMapper(numShards)) val fut = for { _ <- Observable.intervalWithFixedDelay(failureDetectionInterval) mapper <- reduceMappersFromAllNodes(dataset, numShards, failureDetectionInterval - 5.seconds) } { datasetToMapper.put(dataset, mapper) + clusterDiscoveryCounter.withTag("dataset", dataset.dataset).increment() + val unassignedShardsCount = mapper.unassignedShards.length.toDouble + if (unassignedShardsCount > 0.0) { + logger.error(s"[ClusterV2] Unassigned Shards > 0 !! Dataset: ${dataset.dataset} " + + s"Shards Mapping is: ${mapper.prettyPrint} ") + } + unassignedShardsGauge.withTag("dataset", dataset.dataset).update(unassignedShardsCount) } discoveryJobs += (dataset -> fut) } diff --git a/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala b/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala index 170411f855..aff88548e9 100644 --- a/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala +++ b/coordinator/src/main/scala/filodb/coordinator/v2/NewNodeCoordinatorActor.scala @@ -45,10 +45,10 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, private val ingestionConfigs = new mutable.HashMap[DatasetRef, IngestionConfig]() private val shardStats = new mutable.HashMap[DatasetRef, ShardHealthStats]() - logger.info(s"Initializing NodeCoordActor at ${self.path}") + logger.info(s"[ClusterV2] Initializing NodeCoordActor at ${self.path}") private def initialize(): Unit = { - logger.debug(s"Initializing stream configs: ${settings.streamConfigs}") + logger.debug(s"[ClusterV2] Initializing stream configs: ${settings.streamConfigs}") settings.streamConfigs.foreach { config => val dataset = settings.datasetFromStream(config) val ingestion = IngestionConfig(config, NodeClusterActor.noOpSource.streamFactoryClass).get @@ -69,7 +69,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, queryActors.get(dataset).map(func).getOrElse(originator ! UnknownDataset) private def initializeDataset(dataset: Dataset, ingestConfig: IngestionConfig): Unit = { - logger.info(s"Initializing dataset ${dataset.ref}") + logger.info(s"[ClusterV2] Initializing dataset ${dataset.ref}") ingestionConfigs.put(dataset.ref, ingestConfig) localShardMaps.put(dataset.ref, new ShardMapper(ingestConfig.numShards)) shardStats.put(dataset.ref, new ShardHealthStats(dataset.ref)) @@ -100,9 +100,11 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, localShardMaps.get(event.ref).foreach { mapper => mapper.updateFromEvent(event) match { case Failure(l) => - logger.error(s"updateFromShardEvent error for dataset=${event.ref} event $event. Mapper now: $mapper", l) + logger.error(s"[ClusterV2] updateFromShardEvent error for dataset=${event.ref} " + + s"event $event. Mapper now: $mapper", l) case Success(_) => - logger.debug(s"updateFromShardEvent success for dataset=${event.ref} event $event. Mapper now: $mapper") + logger.debug(s"[ClusterV2] updateFromShardEvent success for dataset=${event.ref} " + + s"event $event. Mapper now: $mapper") } // update metrics shardStats(event.ref).update(mapper, skipUnassigned = true) @@ -140,17 +142,17 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, else storeConf.diskTTLSeconds * 1000 def earliestTimestampFn = System.currentTimeMillis() - ttl def clusterShardMapperFn = clusterDiscovery.shardMapper(dataset.ref) - logger.info(s"Creating QueryActor for dataset $ref with dataset ttlMs=$ttl") + logger.info(s"[ClusterV2] Creating QueryActor for dataset $ref with dataset ttlMs=$ttl") val queryRef = context.actorOf(QueryActor.props(memStore, dataset, schemas, clusterShardMapperFn, earliestTimestampFn)) queryActors(ref) = queryRef - logger.info(s"Coordinator set up for ingestion and querying for $ref.") + logger.info(s"[ClusterV2] Coordinator set up for ingestion and querying for $ref.") } private def startTenantIngestionMetering(): Unit = { if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { - logger.info(s"Starting tenant level ingestion cardinality metering...") + logger.info(s"[ClusterV2] Starting tenant level ingestion cardinality metering...") val inst = TenantIngestionMetering( settings, dsIterProducer = () => { localShardMaps.keysIterator }, @@ -174,7 +176,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, case ev: ShardEvent => try { updateFromShardEvent(ev) } catch { case e: Exception => - logger.error(s"Error occurred when processing message $ev", e) + logger.error(s"[ClusterV2] Error occurred when processing message $ev", e) } // requested from CLI and HTTP API @@ -182,7 +184,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, try { sender() ! CurrentShardSnapshot(g.ref, clusterDiscovery.shardMapper(g.ref)) } catch { case e: Exception => - logger.error(s"Error occurred when processing message $g", e) + logger.error(s"[ClusterV2] Error occurred when processing message $g", e) } // requested from peer NewNodeCoordActors upon them receiving GetShardMap call @@ -190,14 +192,14 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, try { sender() ! CurrentShardSnapshot(g.ref, localShardMaps(g.ref)) } catch { case e: Exception => - logger.error(s"Error occurred when processing message $g", e) + logger.error(s"[ClusterV2] Error occurred when processing message $g", e) } case ListRegisteredDatasets => try { sender() ! localShardMaps.keys.toSeq } catch { case e: Exception => - logger.error(s"Error occurred when processing message ListRegisteredDatasets", e) + logger.error(s"[ClusterV2] Error occurred when processing message ListRegisteredDatasets", e) } case LocalShardsHealthRequest => @@ -209,7 +211,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore, }.toSeq sender() ! resp } catch { case e: Exception => - logger.error(s"Error occurred when processing message LocalShardsHealthRequest", e) + logger.error(s"[ClusterV2] Error occurred when processing message LocalShardsHealthRequest", e) } }