Skip to content

Commit

Permalink
Merge pull request #1860 from kvpetrov/shard_failover_metric
Browse files Browse the repository at this point in the history
feat(query):  cherry picking shard level failover metrics
  • Loading branch information
kvpetrov authored Sep 27, 2024
2 parents 46236ea + a685755 commit 95ee5d2
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import scala.jdk.CollectionConverters._

import com.typesafe.scalalogging.StrictLogging
import io.grpc.ManagedChannel
import kamon.Kamon

import filodb.coordinator.GrpcPlanDispatcher
import filodb.coordinator.ShardMapper
Expand All @@ -17,6 +18,9 @@ import filodb.grpc.GrpcCommonUtils
import filodb.query.{LabelNames, LabelValues, LogicalPlan, SeriesKeysByFilters}
import filodb.query.exec._

object HighAvailabilityPlanner {
final val FailoverCounterName = "single-cluster-failovers-materialized"
}
/**
* HighAvailabilityPlanner responsible for using underlying local planner and FailureProvider
* to come up with a plan that orchestrates query execution between multiple
Expand Down Expand Up @@ -48,6 +52,24 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
import QueryFailureRoutingStrategy._
import LogicalPlan._

// legacy failover counter captures failovers when we send a PromQL to the buddy
// cluster
val legacyFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "legacy")

// full failover counter captures failovers when we materialize a plan locally and
// send an entire plan to the buddy cluster for execution
val fullFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "full")

// partial failover counter captures failovers when we materialize a plan locally and
// send some parts of it for execution to the buddy cluster
val partialFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "partial")

// HTTP endpoint is still mandatory as metadata queries still use it.
val remoteHttpEndpoint: String = queryConfig.remoteHttpEndpoint
.getOrElse(throw new IllegalArgumentException("remoteHttpEndpoint config needed"))
Expand Down Expand Up @@ -110,6 +132,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
qContext)
}
case route: RemoteRoute =>
legacyFailoverCounter.increment()
val timeRange = route.timeRange.get
val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
// rootLogicalPlan can be different from queryParams.promQl
Expand Down Expand Up @@ -341,6 +364,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
localActiveShardMapper: ActiveShardMapper,
remoteActiveShardMapper: ActiveShardMapper
): ExecPlan = {
partialFailoverCounter.increment()
// it makes sense to do local planning if we have at least 50% of shards running
// as the query might overload the few shards we have while doing second level aggregation
// Generally, it would be better to ship the entire query to the cluster that has more shards up
Expand All @@ -360,6 +384,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
buddyGrpcEndpoint = Some(remoteGrpcEndpoint.get)
)
val context = qContext.copy(plannerParams = haPlannerParams)
logger.info(context.getQueryLogLine("Using shard level failover"))
val plan = localPlanner.materialize(logicalPlan, context);
plan
}
Expand Down Expand Up @@ -419,6 +444,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
qContext: QueryContext,
localActiveShardMapper: ActiveShardMapper, remoteActiveShardMapper: ActiveShardMapper
): GenericRemoteExec = {
fullFailoverCounter.increment()
val timeout: Long = queryConfig.remoteHttpTimeoutMs.getOrElse(60000)
val plannerParams = qContext.plannerParams.copy(
failoverMode = filodb.core.query.ShardLevelFailoverMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class SingleClusterPlanner(val dataset: Dataset,
private val shardColumns = dsOptions.shardKeyColumns.sorted
private val dsRef = dataset.ref

// failed failover counter captures failovers which are not possible because at least one shard
// is down both on the primary and DR clusters, the query will get executed only when the
// partial results are acceptable otherwise an exception is thrown
val shardUnavailableFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "shardUnavailable")

val numPlansMaterialized = Kamon.counter("single-cluster-plans-materialized")
.withTag("cluster", clusterName)
Expand Down Expand Up @@ -192,10 +198,12 @@ class SingleClusterPlanner(val dataset: Dataset,
if (!shardInfo.active) {
if (queryContext.plannerParams.allowPartialResults)
logger.debug(s"Shard: $shard is not available however query is proceeding as partial results is enabled")
else
else {
shardUnavailableFailoverCounter.increment()
throw new filodb.core.query.ServiceUnavailableException(
s"Remote Buddy Shard: $shard is not available"
)
}
}
val dispatcher = RemoteActorPlanDispatcher(shardInfo.address, clusterName)
dispatcher
Expand Down

0 comments on commit 95ee5d2

Please sign in to comment.