Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): Adding metric to track when actor ref is not getting resolved #1722

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl
// Query Planner sets target as null when shard is down
if (target == ActorRef.noSender) {
Task.eval({
qLogger.warn(s"Creating partial result as shard is not available")
qLogger.warn(s"Target Actor is ActorRef.noSender ! Creating partial result as shard is not available")
emptyPartialResult
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package filodb.coordinator.v2
import java.net.InetAddress
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}

import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import monix.execution.{CancelableFuture, Scheduler}
import monix.reactive.Observable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}

import filodb.coordinator.{CurrentShardSnapshot, FilodbSettings, ShardMapper}
import filodb.core.DatasetRef
Expand All @@ -24,6 +24,14 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,

private val discoveryJobs = mutable.Map[DatasetRef, CancelableFuture[Unit]]()


// Metric to track actor resolve failures
val actorResolvedFailedCounter = Kamon.counter("actor-resolve-failed")
// Metric to track cluster discovery runs
val clusterDiscoveryCounter = Kamon.counter("filodb-cluster-discovery")
// Metric to track if we have unassigned shards on a given pod
val unassignedShardsGauge = Kamon.gauge("v2-unassigned-shards")

lazy val ordinalOfLocalhost: Int = {
if (settings.localhostOrdinal.isDefined) settings.localhostOrdinal.get
else {
Expand Down Expand Up @@ -99,7 +107,7 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
val empty = CurrentShardSnapshot(ref, new ShardMapper(numShards))
def fut = (nca ? GetShardMapScatter(ref)) (t).asInstanceOf[Future[CurrentShardSnapshot]]
Observable.fromFuture(fut).onErrorHandle { e =>
logger.error(s"Saw exception on askShardSnapshot: $e")
sandeep6189 marked this conversation as resolved.
Show resolved Hide resolved
logger.error(s"[ClusterV2] Saw exception on askShardSnapshot!", e)
empty
}
}
Expand All @@ -110,7 +118,18 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
val acc = new ShardMapper(numShards)
val snapshots = for {
nca <- Observable.fromIteratorUnsafe(nodeCoordActorSelections.iterator)
ncaRef <- Observable.fromFuture(nca.resolveOne(settings.ResolveActorTimeout).recover{case _=> ActorRef.noSender})
ncaRef <- Observable.fromFuture(nca.resolveOne(settings.ResolveActorTimeout)
.recover {
// recovering with ActorRef.noSender
case e =>
// log the exception we got while trying to resolve and emit metric
logger.error(s"[ClusterV2] Actor Resolve Failed ! actor: ${nca.toString()}", e)
actorResolvedFailedCounter
.withTag("dataset", ref.dataset)
.withTag("actor", nca.toString())
sandeep6189 marked this conversation as resolved.
Show resolved Hide resolved
.increment()
ActorRef.noSender
})
if ncaRef != ActorRef.noSender
snapshot <- askShardSnapshot(ncaRef, ref, numShards, timeout)
} yield {
Expand All @@ -122,13 +141,20 @@ class FiloDbClusterDiscovery(settings: FilodbSettings,
private val datasetToMapper = new ConcurrentHashMap[DatasetRef, ShardMapper]()
def registerDatasetForDiscovery(dataset: DatasetRef, numShards: Int): Unit = {
require(failureDetectionInterval.toMillis > 5000, "failure detection interval should be > 5s")
logger.info(s"Starting discovery pipeline for $dataset")
logger.info(s"[ClusterV2] Starting discovery pipeline for $dataset")
sandeep6189 marked this conversation as resolved.
Show resolved Hide resolved
datasetToMapper.put(dataset, new ShardMapper(numShards))
val fut = for {
_ <- Observable.intervalWithFixedDelay(failureDetectionInterval)
mapper <- reduceMappersFromAllNodes(dataset, numShards, failureDetectionInterval - 5.seconds)
} {
datasetToMapper.put(dataset, mapper)
clusterDiscoveryCounter.withTag("dataset", dataset.dataset).increment()
val unassignedShardsCount = mapper.unassignedShards.length.toDouble
if (unassignedShardsCount > 0.0) {
logger.error(s"[ClusterV2] Unassigned Shards > 0 !! Dataset: ${dataset.dataset} " +
s"Shards Mapping is: ${mapper.prettyPrint} ")
}
unassignedShardsGauge.withTag("dataset", dataset.dataset).update(unassignedShardsCount)
}
discoveryJobs += (dataset -> fut)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
private val ingestionConfigs = new mutable.HashMap[DatasetRef, IngestionConfig]()
private val shardStats = new mutable.HashMap[DatasetRef, ShardHealthStats]()

logger.info(s"Initializing NodeCoordActor at ${self.path}")
logger.info(s"[ClusterV2] Initializing NodeCoordActor at ${self.path}")

private def initialize(): Unit = {
logger.debug(s"Initializing stream configs: ${settings.streamConfigs}")
logger.debug(s"[ClusterV2] Initializing stream configs: ${settings.streamConfigs}")
settings.streamConfigs.foreach { config =>
val dataset = settings.datasetFromStream(config)
val ingestion = IngestionConfig(config, NodeClusterActor.noOpSource.streamFactoryClass).get
Expand All @@ -69,7 +69,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
queryActors.get(dataset).map(func).getOrElse(originator ! UnknownDataset)

private def initializeDataset(dataset: Dataset, ingestConfig: IngestionConfig): Unit = {
logger.info(s"Initializing dataset ${dataset.ref}")
logger.info(s"[ClusterV2] Initializing dataset ${dataset.ref}")
ingestionConfigs.put(dataset.ref, ingestConfig)
localShardMaps.put(dataset.ref, new ShardMapper(ingestConfig.numShards))
shardStats.put(dataset.ref, new ShardHealthStats(dataset.ref))
Expand Down Expand Up @@ -100,9 +100,11 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
localShardMaps.get(event.ref).foreach { mapper =>
mapper.updateFromEvent(event) match {
case Failure(l) =>
logger.error(s"updateFromShardEvent error for dataset=${event.ref} event $event. Mapper now: $mapper", l)
logger.error(s"[ClusterV2] updateFromShardEvent error for dataset=${event.ref} " +
s"event $event. Mapper now: $mapper", l)
case Success(_) =>
logger.debug(s"updateFromShardEvent success for dataset=${event.ref} event $event. Mapper now: $mapper")
logger.debug(s"[ClusterV2] updateFromShardEvent success for dataset=${event.ref} " +
s"event $event. Mapper now: $mapper")
}
// update metrics
shardStats(event.ref).update(mapper, skipUnassigned = true)
Expand Down Expand Up @@ -140,17 +142,17 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
else storeConf.diskTTLSeconds * 1000
def earliestTimestampFn = System.currentTimeMillis() - ttl
def clusterShardMapperFn = clusterDiscovery.shardMapper(dataset.ref)
logger.info(s"Creating QueryActor for dataset $ref with dataset ttlMs=$ttl")
logger.info(s"[ClusterV2] Creating QueryActor for dataset $ref with dataset ttlMs=$ttl")
val queryRef = context.actorOf(QueryActor.props(memStore, dataset, schemas,
clusterShardMapperFn, earliestTimestampFn))
queryActors(ref) = queryRef

logger.info(s"Coordinator set up for ingestion and querying for $ref.")
logger.info(s"[ClusterV2] Coordinator set up for ingestion and querying for $ref.")
}

private def startTenantIngestionMetering(): Unit = {
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
logger.info(s"Starting tenant level ingestion cardinality metering...")
logger.info(s"[ClusterV2] Starting tenant level ingestion cardinality metering...")
val inst = TenantIngestionMetering(
settings,
dsIterProducer = () => { localShardMaps.keysIterator },
Expand All @@ -174,30 +176,30 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
case ev: ShardEvent => try {
updateFromShardEvent(ev)
} catch { case e: Exception =>
logger.error(s"Error occurred when processing message $ev", e)
logger.error(s"[ClusterV2] Error occurred when processing message $ev", e)
}

// requested from CLI and HTTP API
case g: GetShardMap =>
try {
sender() ! CurrentShardSnapshot(g.ref, clusterDiscovery.shardMapper(g.ref))
} catch { case e: Exception =>
logger.error(s"Error occurred when processing message $g", e)
logger.error(s"[ClusterV2] Error occurred when processing message $g", e)
}

// requested from peer NewNodeCoordActors upon them receiving GetShardMap call
case g: GetShardMapScatter =>
try {
sender() ! CurrentShardSnapshot(g.ref, localShardMaps(g.ref))
} catch { case e: Exception =>
logger.error(s"Error occurred when processing message $g", e)
logger.error(s"[ClusterV2] Error occurred when processing message $g", e)
}

case ListRegisteredDatasets =>
try {
sender() ! localShardMaps.keys.toSeq
} catch { case e: Exception =>
logger.error(s"Error occurred when processing message ListRegisteredDatasets", e)
logger.error(s"[ClusterV2] Error occurred when processing message ListRegisteredDatasets", e)
}

case LocalShardsHealthRequest =>
Expand All @@ -209,7 +211,7 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
}.toSeq
sender() ! resp
} catch { case e: Exception =>
logger.error(s"Error occurred when processing message LocalShardsHealthRequest", e)
logger.error(s"[ClusterV2] Error occurred when processing message LocalShardsHealthRequest", e)
}

}
Expand Down
Loading